Skip to content

Commit 78e7202

Browse files
committed
torrentfs/monitor | record the block number by epoch and file operation
1 parent ea1620f commit 78e7202

File tree

3 files changed

+62
-54
lines changed

3 files changed

+62
-54
lines changed

torrentfs/monitor.go

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func NewMonitor(flag *Config) (m *Monitor, e error) {
128128
m.sizeCache, _ = lru.New(batch)
129129
e = nil
130130

131-
log.Info("Loading storage data ... ...")
131+
log.Info("Loading storage data ... ...", "latest", m.fs.LastListenBlockNumber)
132132

133133
fileMap := make(map[metainfo.Hash]*FileInfo)
134134
//files, err := m.fs.Files()
@@ -186,7 +186,6 @@ func (m *Monitor) taskLoop() {
186186

187187
if err := m.deal(task); err != nil {
188188
log.Warn("Block dealing failed", "err", err)
189-
continue
190189
}
191190
case <-m.exitCh:
192191
log.Info("Monitor task channel closed")
@@ -472,16 +471,18 @@ func (m *Monitor) parseFileMeta(tx *Transaction, meta *FileMeta) error {
472471
return nil
473472
}
474473

475-
func (m *Monitor) parseBlockTorrentInfo(b *Block) error {
474+
func (m *Monitor) parseBlockTorrentInfo(b *Block) (bool, error) {
475+
record := false
476476
if len(b.Txs) > 0 {
477477
start := mclock.Now()
478478
for _, tx := range b.Txs {
479479
if meta := tx.Parse(); meta != nil {
480480
log.Debug("Try to create a file", "meta", meta, "number", b.Number, "infohash", meta.InfoHash)
481481
if err := m.parseFileMeta(&tx, meta); err != nil {
482482
log.Error("Parse file meta error", "err", err, "number", b.Number)
483-
return err
483+
return false, err
484484
}
485+
record = true
485486
} else if tx.IsFlowControl() {
486487
if tx.Recipient == nil {
487488
continue
@@ -495,13 +496,13 @@ func (m *Monitor) parseBlockTorrentInfo(b *Block) error {
495496

496497
remainingSize, err := m.getRemainingSize(addr.String())
497498
if err != nil {
498-
return err
499+
return false, err
499500
}
500501

501502
if file.LeftSize > remainingSize {
502503
file.LeftSize = remainingSize
503504
if err := m.fs.WriteFile(file); err != nil {
504-
return err
505+
return false, err
505506
}
506507

507508
log.Debug("Update storage success", "hash", file.Meta.InfoHash, "left", file.LeftSize)
@@ -519,6 +520,8 @@ func (m *Monitor) parseBlockTorrentInfo(b *Block) error {
519520
} else {
520521
log.Debug("Uploading a file", "addr", addr, "hash", file.Meta.InfoHash.String(), "number", b.Number, "left", file.LeftSize, "remain", remainingSize, "raw", file.Meta.RawSize)
521522
}
523+
524+
record = true
522525
}
523526
}
524527
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
@@ -527,7 +530,7 @@ func (m *Monitor) parseBlockTorrentInfo(b *Block) error {
527530
}
528531
}
529532

530-
return nil
533+
return record, nil
531534
}
532535

533536
func (m *Monitor) Stop() {
@@ -919,44 +922,36 @@ func (m *Monitor) syncLastBlock() uint64 {
919922
return uint64(maxNumber - minNumber)
920923
}
921924

922-
func (m *Monitor) deal(rpcBlock *Block) error {
923-
i := rpcBlock.Number
924-
if hash, suc := m.blockCache.Get(i); !suc || hash != rpcBlock.Hash.Hex() {
925-
926-
/*block := m.fs.GetBlockByNumber(i)
927-
if block == nil {
928-
block = rpcBlock
929-
930-
if err := m.parseAndStore(block, true); err != nil {
931-
log.Error("Fail to parse and storge latest block", "number", i, "error", err)
932-
return err
925+
func (m *Monitor) deal(block *Block) error {
926+
i := block.Number
927+
if hash, suc := m.blockCache.Get(i); !suc || hash != block.Hash.Hex() {
928+
if record, parseErr := m.parseBlockTorrentInfo(block); parseErr != nil {
929+
log.Error("Parse new block", "number", block.Number, "block", block, "error", parseErr)
930+
return parseErr
931+
} else if record {
932+
if storeErr := m.fs.WriteBlock(block); storeErr != nil {
933+
log.Error("Store latest block", "number", block.Number, "error", storeErr)
934+
return storeErr
933935
}
934936

937+
log.Debug("Confirm to seal the fs record", "number", i, "cap", len(m.taskCh), "record", record)
935938
} else {
936-
if block.Hash.Hex() == rpcBlock.Hash.Hex() {
937-
938-
if parseErr := m.parseBlockTorrentInfo(block, true); parseErr != nil { //dirty to do
939-
log.Error("Parse old block", "number", i, "block", block, "error", parseErr)
940-
return parseErr
941-
}
942-
} else {
943-
//dirty tfs
944-
if err := m.parseAndStore(rpcBlock, true); err != nil {
945-
log.Error("Dirty tfs fail to parse and storge latest block", "number", i, "error", err)
946-
return err
939+
if i%batch == 0 {
940+
if storeErr := m.fs.WriteBlock(block); storeErr != nil {
941+
log.Error("Store latest block", "number", block.Number, "error", storeErr)
942+
return storeErr
947943
}
944+
945+
log.Debug("Confirm to seal the fs record", "number", i, "cap", len(m.taskCh))
948946
}
949-
}*/
950-
if err := m.parseAndStore(rpcBlock); err != nil {
951-
log.Error("Fail to parse and storge latest block", "number", i, "error", err)
952-
return err
953947
}
954-
m.blockCache.Add(i, rpcBlock.Hash.Hex())
948+
949+
m.blockCache.Add(i, block.Hash.Hex())
955950
}
956951
return nil
957952
}
958953

959-
func (m *Monitor) parseAndStore(block *Block) error {
954+
/*func (m *Monitor) parseAndStore(block *Block) error {
960955
if parseErr := m.parseBlockTorrentInfo(block); parseErr != nil {
961956
log.Error("Parse new block", "number", block.Number, "block", block, "error", parseErr)
962957
return parseErr
@@ -967,4 +962,4 @@ func (m *Monitor) parseAndStore(block *Block) error {
967962
return storeErr
968963
}
969964
return nil
970-
}
965+
}*/

torrentfs/storage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,13 @@ func (fs *FileStorage) WriteBlock(b *Block) error {
388388
//if err == nil && b.Number > fs.LastListenBlockNumber {
389389
//if err == nil {
390390
//fs.bnLock.Lock()
391+
if b.Number < fs.LastListenBlockNumber {
392+
return nil
393+
}
394+
391395
fs.LastListenBlockNumber = b.Number
392396
return fs.writeBlockNumber()
397+
//return nil
393398
//fs.bnLock.Unlock()
394399
//}
395400

torrentfs/torrentClient.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -780,13 +780,17 @@ func (tm *TorrentManager) pendingTorrentLoop() {
780780
ih := t.Torrent.InfoHash()
781781
t.loop += 1
782782
if t.Seeding() {
783-
delete(tm.pendingTorrents, ih)
784-
t.loop = 0
785-
tm.seedingChan <- t
783+
if len(tm.seedingChan) < cap(tm.seedingChan) {
784+
delete(tm.pendingTorrents, ih)
785+
t.loop = 0
786+
tm.seedingChan <- t
787+
}
786788
} else if !t.Pending() {
787-
delete(tm.pendingTorrents, ih)
788-
t.loop = 0
789-
tm.activeChan <- t
789+
if len(tm.activeChan) < cap(tm.activeChan) {
790+
delete(tm.pendingTorrents, ih)
791+
t.loop = 0
792+
tm.activeChan <- t
793+
}
790794
} else if t.Torrent.Info() != nil {
791795
t.WriteTorrent()
792796
} else if t.loop > torrentWaitingTime/queryTimeInterval {
@@ -859,12 +863,14 @@ func (tm *TorrentManager) activeTorrentLoop() {
859863
if t.Finished() {
860864
tm.lock.Lock()
861865
if _, err := os.Stat(path.Join(tm.DataDir, t.InfoHash())); err == nil {
862-
log.Debug("Path exist", "hash", t.Torrent.InfoHash(), "path", path.Join(tm.DataDir, t.InfoHash()))
863-
delete(tm.activeTorrents, ih)
864-
tm.seedingChan <- t
865-
t.loop = defaultSeedInterval / queryTimeInterval
866-
total_size += uint64(t.bytesCompleted)
867-
current_size += uint64(t.bytesCompleted)
866+
if len(tm.seedingChan) < cap(tm.seedingChan) {
867+
log.Debug("Path exist", "hash", t.Torrent.InfoHash(), "path", path.Join(tm.DataDir, t.InfoHash()))
868+
delete(tm.activeTorrents, ih)
869+
tm.seedingChan <- t
870+
t.loop = defaultSeedInterval / queryTimeInterval
871+
total_size += uint64(t.bytesCompleted)
872+
current_size += uint64(t.bytesCompleted)
873+
}
868874
} else {
869875
err := os.Symlink(
870876
path.Join(defaultTmpFilePath, t.InfoHash()),
@@ -878,11 +884,13 @@ func (tm *TorrentManager) activeTorrentLoop() {
878884
log.Debug("Fix path success", "hash", t.Torrent.InfoHash(), "size", t.bytesCompleted, "miss", t.bytesMissing, "loop", log_counter)
879885
}
880886
} else {
881-
delete(tm.activeTorrents, ih)
882-
tm.seedingChan <- t
883-
t.loop = defaultSeedInterval / queryTimeInterval
884-
total_size += uint64(t.bytesCompleted)
885-
current_size += uint64(t.bytesCompleted)
887+
if len(tm.seedingChan) < cap(tm.seedingChan) {
888+
delete(tm.activeTorrents, ih)
889+
tm.seedingChan <- t
890+
t.loop = defaultSeedInterval / queryTimeInterval
891+
total_size += uint64(t.bytesCompleted)
892+
current_size += uint64(t.bytesCompleted)
893+
}
886894
}
887895
}
888896

@@ -974,7 +982,7 @@ func (tm *TorrentManager) activeTorrentLoop() {
974982
}
975983

976984
if counter >= loops {
977-
log.Info("Torrent status", "pending", len(tm.pendingTorrents), "active", len(tm.activeTorrents), "wait", active_wait, "downloading", active_running, "paused", active_paused, "boost", active_boost, "seeding", len(tm.seedingTorrents), "pieces", all, "size", common.StorageSize(total_size), "speed_a", common.StorageSize(total_size/log_counter*queryTimeInterval).String()+"/s", "speed_b", common.StorageSize(current_size/counter*queryTimeInterval).String()+"/s")
985+
log.Info("Torrent status", "pending", len(tm.pendingTorrents), "active", len(tm.activeTorrents), "wait", active_wait, "downloading", active_running, "paused", active_paused, "boost", active_boost, "seeding", len(tm.seedingTorrents), "pieces", all, "size", common.StorageSize(total_size), "speed_a", common.StorageSize(total_size/log_counter*queryTimeInterval).String()+"/s", "speed_b", common.StorageSize(current_size/counter*queryTimeInterval).String()+"/s", "channel", len(tm.updateTorrent))
978986
counter = 0
979987
current_size = 0
980988
}

0 commit comments

Comments
 (0)