Skip to content

Commit 9058832

Browse files
committed
torrent/cvm_infer_server dirty cache removed & torrent file storage bug fix & etc
1 parent f833614 commit 9058832

File tree

6 files changed

+69
-47
lines changed

6 files changed

+69
-47
lines changed

cmd/cortex/cvm_handler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package main
22

33
import (
44
"encoding/binary"
5-
"fmt"
5+
//"fmt"
66
"net/http"
77

88
"github.com/CortexFoundation/CortexTheseus/inference"
@@ -87,12 +87,12 @@ func inputContentHandler(w http.ResponseWriter, inferWork *inference.ICWork) {
8787
model, input := inferWork.Model, inferWork.Input
8888

8989
log.Debug("Infer Work", "Model Hash", model)
90-
var cacheKey = synapse.RLPHashString(fmt.Sprintf("%s:%x", model, input))
90+
/*var cacheKey = synapse.RLPHashString(fmt.Sprintf("%s:%x", model, input))
9191
if v, ok := simpleCache.Load(cacheKey); ok && !(IsNotCache) {
9292
log.Debug("Infer succeed via cache", "cache key", cacheKey, "label", v.([]byte))
9393
RespInfoText(w, v.([]byte))
9494
return
95-
}
95+
}*/
9696

9797
// Fixed bugs, ctx_getSolidityBytes returns 0x which stands for state invalid
9898
// if len(input) == 0 {
@@ -108,9 +108,9 @@ func inputContentHandler(w http.ResponseWriter, inferWork *inference.ICWork) {
108108
return
109109
}
110110

111-
if !(IsNotCache) {
111+
/*if !(IsNotCache) {
112112
simpleCache.Store(cacheKey, label)
113-
}
113+
}*/
114114

115115
RespInfoText(w, label)
116116
}

cmd/cortex/cvm_infer_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"encoding/json"
55
"io/ioutil"
66
"net/http"
7-
"sync"
7+
//"sync"
88

99
"github.com/CortexFoundation/CortexTheseus/inference"
1010
//"github.com/CortexFoundation/CortexTheseus/rpc"
1111
)
1212

1313
//var rpcClient *rpc.Client
14-
var simpleCache sync.Map
14+
//var simpleCache sync.Map
1515

1616
func handler(w http.ResponseWriter, r *http.Request) {
1717
if r.Method != "POST" {

inference/synapse/remote_infer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ func (s *Synapse) remoteInferByInputContent(modelInfoHash string, inputContent [
8686
}
8787

8888
func (s *Synapse) sendRequest(requestBody, uri string) ([]byte, error) {
89-
cacheKey := RLPHashString(requestBody)
89+
/*cacheKey := RLPHashString(requestBody)
9090
if v, ok := s.simpleCache.Load(cacheKey); ok && !s.config.IsNotCache {
9191
log.Debug("Infer Succeed via Cache", "result", v.([]byte))
9292
return v.([]byte), nil
93-
}
93+
}*/
9494

9595
resp, err := client.SetTimeout(time.Duration(15*time.Second)).R().
9696
SetHeader("Content-Type", "application/json").
@@ -114,9 +114,9 @@ func (s *Synapse) sendRequest(requestBody, uri string) ([]byte, error) {
114114

115115
if res.Info == inference.RES_OK {
116116
var data = []byte(res.Data)
117-
if !s.config.IsNotCache {
117+
/*if !s.config.IsNotCache {
118118
s.simpleCache.Store(cacheKey, data)
119-
}
119+
}*/
120120
return data, nil
121121
}
122122
// res.Info == inference.RES_ERROR

torrentfs/monitor.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type Monitor struct {
6767

6868
listenID rpc.ID
6969

70-
uncheckedCh chan uint64
70+
//uncheckedCh chan uint64
7171

7272
exitCh chan struct{}
7373
terminated int32
@@ -108,24 +108,30 @@ func NewMonitor(flag *Config) (m *Monitor, e error) {
108108
log.Info("Torrent manager initialized")
109109

110110
m = &Monitor{
111-
config: flag,
112-
cl: nil,
113-
fs: fs,
114-
dl: tMana,
115-
uncheckedCh: make(chan uint64, 20),
116-
exitCh: make(chan struct{}),
117-
terminated: 0,
118-
lastNumber: uint64(0),
119-
dirty: false,
120-
taskCh: make(chan *Block, batch*2),
111+
config: flag,
112+
cl: nil,
113+
fs: fs,
114+
dl: tMana,
115+
//uncheckedCh: make(chan uint64, 20),
116+
exitCh: make(chan struct{}),
117+
terminated: 0,
118+
lastNumber: uint64(0),
119+
dirty: false,
120+
taskCh: make(chan *Block, batch*2),
121121
}
122122
e = nil
123123

124124
log.Info("Loading storage data ... ...")
125125

126126
fileMap := make(map[metainfo.Hash]*FileInfo)
127127
for _, file := range m.fs.Files() {
128-
fileMap[file.Meta.InfoHash] = file
128+
if f, ok := fileMap[file.Meta.InfoHash]; ok {
129+
if f.LeftSize > file.LeftSize {
130+
fileMap[file.Meta.InfoHash] = file
131+
}
132+
} else {
133+
fileMap[file.Meta.InfoHash] = file
134+
}
129135
}
130136
capcity := uint64(0)
131137
seed := 0
@@ -400,16 +406,18 @@ func (m *Monitor) parseFileMeta(tx *Transaction, meta *FileMeta) error {
400406

401407
info.LeftSize = meta.RawSize
402408
info.ContractAddr = receipt.ContractAddr
403-
err := m.fs.AddFile(info)
409+
index, err := m.fs.AddFile(info)
404410
if err != nil {
405411
return err
412+
} else {
413+
if index > 0 {
414+
m.dl.UpdateTorrent(FlowControlMeta{
415+
InfoHash: meta.InfoHash,
416+
BytesRequested: 0,
417+
IsCreate: true,
418+
})
419+
}
406420
}
407-
408-
m.dl.UpdateTorrent(FlowControlMeta{
409-
InfoHash: meta.InfoHash,
410-
BytesRequested: 0,
411-
IsCreate: true,
412-
})
413421
/*var _remainingSize string
414422
if err := m.cl.Call(&_remainingSize, "ctxc_getUpload", receipt.ContractAddr.String(), "latest"); err != nil {
415423
log.Warn("Failed to call get upload", "addr", receipt.ContractAddr.String())
@@ -467,24 +475,27 @@ func (m *Monitor) parseBlockTorrentInfo(b *Block, flowCtrl bool) error {
467475
addr := *tx.Recipient
468476
file := m.fs.GetFileByAddr(addr)
469477
if file == nil {
478+
//log.Warn("Uploading a nonexist file", "addr", addr.String(), "number", b.Number)
470479
continue
471480
}
472481

473-
log.Debug("Try to upload a file", "addr", addr, "infohash", file.Meta.InfoHash.String(), "number", b.Number)
482+
//log.Info("Try to upload a file", "addr", addr, "infohash", file.Meta.InfoHash.String(), "number", b.Number, "left", file.LeftSize)
474483

475484
remainingSize, err := m.getRemainingSize(addr.String())
476485
if err != nil {
477486
return err
478487
}
479488

489+
log.Info("Try to upload a file", "addr", addr, "infohash", file.Meta.InfoHash.String(), "number", b.Number, "left", file.LeftSize, "remain", remainingSize, "raw", file.Meta.RawSize)
490+
480491
if file.LeftSize > remainingSize {
481492
file.LeftSize = remainingSize
482493
err := m.fs.WriteFile(file)
483494
if err != nil {
484495
return err
485496
}
486497

487-
log.Debug("Update storage success", "hash", file.Meta.InfoHash, "left", file.LeftSize)
498+
log.Info("Update storage success", "hash", file.Meta.InfoHash, "left", file.LeftSize)
488499
var bytesRequested uint64
489500
if file.Meta.RawSize > file.LeftSize {
490501
bytesRequested = file.Meta.RawSize - file.LeftSize
@@ -644,6 +655,12 @@ func (m *Monitor) validateStorage() error {
644655
log.Warn("Torrent fs status", "dirty", m.dirty)
645656
}
646657

658+
if m.lastNumber > batch {
659+
m.lastNumber = m.lastNumber - batch
660+
} else {
661+
m.lastNumber = 0
662+
}
663+
647664
/*for i := uint64(0); i < m.fs.LastFileIndex; i++ {
648665
file := m.fs.GetFileByNumber(i)
649666
if file == nil {

torrentfs/storage.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ func (fs *FileStorage) CurrentTorrentManager() *TorrentManager {
126126
return CurrentTorrentManager
127127
}
128128

129-
func (fs *FileStorage) AddFile(x *FileInfo) error {
129+
func (fs *FileStorage) AddFile(x *FileInfo) (uint64, error) {
130130
addr := *x.ContractAddr
131131
if _, ok := fs.filesContractAddr[addr]; ok {
132-
//return errors.New("file already existed")
133-
return nil
132+
return 0 , nil
133+
//return nil
134134
}
135135

136136
x.Index = fs.LastFileIndex
@@ -140,11 +140,11 @@ func (fs *FileStorage) AddFile(x *FileInfo) error {
140140
err := fs.WriteFile(x)
141141
if err != nil {
142142
fs.LastFileIndex -= 1
143-
return err
143+
return 0, err
144144
}
145145
fs.filesContractAddr[addr] = x
146146
fs.files = append(fs.files, x)
147-
return nil
147+
return x.Index, nil
148148
}
149149

150150
func (fs *FileStorage) GetFileByAddr(addr common.Address) *FileInfo {

torrentfs/torrentClient.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333

3434
const (
3535
removeTorrentChanBuffer = 1
36-
updateTorrentChanBuffer = 1000
36+
updateTorrentChanBuffer = 2048
3737

3838
torrentPending = iota //2
3939
torrentPaused
@@ -365,6 +365,7 @@ func (tm *TorrentManager) RemoveTorrent(input metainfo.Hash) error {
365365
}
366366

367367
func (tm *TorrentManager) UpdateTorrent(input interface{}) error {
368+
//go func() {tm.updateTorrent <- input}()
368369
tm.updateTorrent <- input
369370
return nil
370371
}
@@ -532,7 +533,7 @@ func (tm *TorrentManager) AddInfoHash(ih metainfo.Hash, BytesRequested int64) *T
532533
} else if _, err := os.Stat(torrentPath); err == nil {
533534
return tm.AddTorrent(torrentPath, BytesRequested)
534535
}
535-
log.Debug("Get torrent from infohash", "InfoHash", ih.HexString())
536+
//log.Info("Get torrent from infohash", "InfoHash", ih.HexString())
536537

537538
spec := &torrent.TorrentSpec{
538539
Trackers: [][]string{},
@@ -544,25 +545,25 @@ func (tm *TorrentManager) AddInfoHash(ih metainfo.Hash, BytesRequested int64) *T
544545
for _, tracker := range tm.trackers {
545546
spec.Trackers = append(spec.Trackers, tracker)
546547
}
547-
log.Trace("Torrent specific info", "spec", spec)
548+
//log.Info("Torrent specific info", "spec", spec)
548549

549550
t, _, err := tm.client.AddTorrentSpec(spec)
550551
if err != nil {
551552
return nil
552553
}
553554
tt := tm.CreateTorrent(t, BytesRequested, torrentPending, ih)
554555
//tm.mu.Unlock()
555-
log.Trace("Torrent is waiting for gotInfo", "InfoHash", ih.HexString())
556+
//log.Info("Torrent is waiting for gotInfo", "InfoHash", ih.HexString())
556557
return tt
557558
}
558559

559560
// UpdateInfoHash ...
560561
func (tm *TorrentManager) UpdateInfoHash(ih metainfo.Hash, BytesRequested int64) {
561562
log.Debug("Update torrent", "InfoHash", ih, "bytes", BytesRequested)
563+
tm.lock.Lock()
564+
defer tm.lock.Unlock()
562565
if t, ok := tm.bytes[ih]; !ok || t < BytesRequested {
563-
tm.lock.Lock()
564566
tm.bytes[ih] = BytesRequested
565-
tm.lock.Unlock()
566567
}
567568
/*if t := tm.GetTorrent(ih); t != nil {
568569
if BytesRequested < t.bytesRequested {
@@ -658,6 +659,7 @@ func NewTorrentManager(config *Config) *TorrentManager {
658659
closeAll: make(chan struct{}),
659660
removeTorrent: make(chan metainfo.Hash, removeTorrentChanBuffer),
660661
updateTorrent: make(chan interface{}, updateTorrentChanBuffer),
662+
//updateTorrent: make(chan interface{}),
661663
}
662664

663665
if len(config.DefaultTrackers) > 0 {
@@ -694,11 +696,12 @@ func (tm *TorrentManager) mainLoop() {
694696
case msg := <-tm.updateTorrent:
695697
meta := msg.(FlowControlMeta)
696698
if meta.IsCreate {
699+
//log.Info("TorrentManager", "newTorrent", meta.InfoHash.String())
700+
//go tm.AddInfoHash(meta.InfoHash, int64(meta.BytesRequested))
697701
counter := 0
698-
log.Debug("TorrentManager", "newTorrent", meta.InfoHash.String())
699702
for {
700703
if t := tm.AddInfoHash(meta.InfoHash, int64(meta.BytesRequested)); t != nil {
701-
log.Debug("Torrent success", "hash", meta.InfoHash, "request", meta.BytesRequested)
704+
log.Info("Torrent success", "hash", meta.InfoHash, "request", meta.BytesRequested)
702705
break
703706
} else {
704707
if counter > 10 {
@@ -709,7 +712,7 @@ func (tm *TorrentManager) mainLoop() {
709712
}
710713
}
711714
} else {
712-
log.Debug("TorrentManager", "updateTorrent", meta.InfoHash.String(), "bytes", meta.BytesRequested)
715+
//log.Info("TorrentManager", "updateTorrent", meta.InfoHash.String(), "bytes", meta.BytesRequested)
713716
tm.UpdateInfoHash(meta.InfoHash, int64(meta.BytesRequested))
714717
}
715718
case <-tm.closeAll:
@@ -786,7 +789,7 @@ func (tm *TorrentManager) listenTorrentProgress() {
786789
t.isBoosting = true
787790
go func(t *Torrent) {
788791
defer t.BoostOff()
789-
log.Debug("Try to boost torrent", "infohash", t.infohash)
792+
log.Info("Try to boost torrent", "infohash", t.infohash)
790793
if data, err := tm.boostFetcher.GetTorrent(t.infohash); err == nil {
791794
if t.Torrent.Info() != nil {
792795
return
@@ -819,7 +822,9 @@ func (tm *TorrentManager) listenTorrentProgress() {
819822
active_boost := 0
820823
for _, t := range activeTorrentsCandidate {
821824
ih := t.Torrent.InfoHash()
825+
tm.lock.RLock()
822826
BytesRequested := tm.bytes[ih]
827+
tm.lock.RUnlock()
823828
if t.bytesRequested < BytesRequested {
824829
t.bytesRequested = BytesRequested
825830
t.bytesLimitation = GetLimitation(BytesRequested)

0 commit comments

Comments
 (0)