Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 90 additions & 8 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/go-statemachine/fsm"
provider "github.com/filecoin-project/index-provider"
metadata2 "github.com/filecoin-project/index-provider/metadata"
"github.com/filecoin-project/specs-actors/actors/builtin/market"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -58,6 +59,20 @@ type MeshCreator interface {
Connect(context.Context) error
}

type BoostDealGetter interface {
Get(proposalCid cid.Cid) (storagemarket.MinerDeal, error)
GetAll() ([]storagemarket.MinerDeal, error)
}

type NoOpBoostDealGetter struct{}

func (n *NoOpBoostDealGetter) Get(proposalCid cid.Cid) (storagemarket.MinerDeal, error) {
return storagemarket.MinerDeal{}, xerrors.New("not found")
}
func (n *NoOpBoostDealGetter) GetAll() ([]storagemarket.MinerDeal, error) {
return nil, nil
}

// Provider is the production implementation of the StorageProvider interface
type Provider struct {
net network.StorageMarketNetwork
Expand All @@ -79,9 +94,11 @@ type Provider struct {

unsubDataTransfer datatransfer.Unsubscribe

dagStore stores.DAGStoreWrapper
indexProvider provider.Interface
stores *stores.ReadWriteBlockstores
dagStore stores.DAGStoreWrapper
indexProvider provider.Interface
stores *stores.ReadWriteBlockstores
env *providerDealEnvironment
boostDealGetter BoostDealGetter
}

// StorageProviderOption allows custom configuration of a storage provider
Expand Down Expand Up @@ -124,6 +141,7 @@ func NewProvider(net network.StorageMarketNetwork,
minerAddress address.Address,
storedAsk StoredAsk,
meshCreator MeshCreator,
bdg BoostDealGetter,
options ...StorageProviderOption,
) (storagemarket.StorageProvider, error) {
h := &Provider{
Expand All @@ -142,14 +160,18 @@ func NewProvider(net network.StorageMarketNetwork,
stores: stores.NewReadWriteBlockstores(),
awaitTransferRestartTimeout: defaultAwaitRestartTimeout,
indexProvider: indexer,
boostDealGetter: bdg,
}

h.env = &providerDealEnvironment{h}

storageMigrations, err := migrations.ProviderMigrations.Build()
if err != nil {
return nil, err
}
h.deals, h.migrateDeals, err = newProviderStateMachine(
ds,
&providerDealEnvironment{h},
h.env,
h.dispatch,
storageMigrations,
versioning.VersionKey("1"),
Expand Down Expand Up @@ -474,7 +496,12 @@ func (p *Provider) SetAsk(price abi.TokenAmount, verifiedPrice abi.TokenAmount,
func (p *Provider) AnnounceDealToIndexer(ctx context.Context, proposalCid cid.Cid) error {
var deal storagemarket.MinerDeal
if err := p.deals.Get(proposalCid).Get(&deal); err != nil {
return xerrors.Errorf("failed getting deal %s: %w", proposalCid, err)
if bd, err := p.boostDealGetter.Get(proposalCid); err != nil {
return xerrors.Errorf("failed getting deal %s: %w", proposalCid, err)
} else {
log.Infow("found deal in Boost", "proposalCid", proposalCid.String())
deal = bd
}
}

fm := metadata2.FilecoinV1Data{
Expand Down Expand Up @@ -516,6 +543,13 @@ func (p *Provider) AnnounceAllDealsToIndexer(ctx context.Context) error {
return fmt.Errorf("failed to list deals: %w", err)
}

// announce all boost deals as well to the network Indexer
boostOut, err := p.boostDealGetter.GetAll()
if err != nil {
return fmt.Errorf("failed to list boost deals: %w", err)
}
out = append(out, boostOut...)

shards := make(map[string]struct{})
var nSuccess int
var merr error
Expand Down Expand Up @@ -635,8 +669,12 @@ func (p *Provider) processDealStatusRequest(ctx context.Context, request *networ
// fetch deal state
var md = storagemarket.MinerDeal{}
if err := p.deals.Get(request.Proposal).Get(&md); err != nil {
log.Errorf("proposal doesn't exist in state store: %s", err)
return nil, xerrors.Errorf("no such proposal")
if bd, err := p.boostDealGetter.Get(request.Proposal); err != nil {
log.Errorf("proposal doesn't exist in state store: %s", err)
return nil, xerrors.Errorf("no such proposal")
} else {
md = bd
}
}

// verify query signature
Expand Down Expand Up @@ -728,7 +766,12 @@ func (p *Provider) start(ctx context.Context) error {

var deal storagemarket.MinerDeal
if err := p.deals.Get(proposalCid).Get(&deal); err != nil {
return nil, xerrors.Errorf("failed getting deal %s: %w", proposalCid, err)
// could be a boost deal too
if md, err := p.boostDealGetter.Get(proposalCid); err != nil {
return nil, xerrors.Errorf("failed getting deal %s: %w", proposalCid, err)
} else {
deal = md
}
}

ii, err := p.dagStore.GetIterableIndexForPiece(deal.Proposal.PieceCID)
Expand Down Expand Up @@ -759,6 +802,12 @@ func (p *Provider) runMigrations(ctx context.Context) ([]storagemarket.MinerDeal
return nil, xerrors.Errorf("failed to fetch deals during startup: %w", err)
}

boostDeals, err := p.boostDealGetter.GetAll()
if err != nil {
return nil, fmt.Errorf("failed to fetch boost deals: %w", err)
}
deals = append(deals, boostDeals...)

// migrate deals to the dagstore if still not migrated.
if ok, err := p.dagStore.MigrateDeals(ctx, deals); err != nil {
return nil, fmt.Errorf("failed to migrate deals to DAG store: %w", err)
Expand Down Expand Up @@ -810,6 +859,39 @@ func (p *Provider) resendProposalResponse(s network.StorageDealStream, md *stora
return err
}

type BoostDeal struct {
market.ClientDealProposal
ChainDealID abi.DealID
PackingInfo *storagemarket.PackingResult
CARFilePath string
ProposalCid cid.Cid
}

func (p *Provider) MakeBoostDealRetrievable(ctx context.Context, bd *BoostDeal) (annCid cid.Cid, err error) {
if err := p.pieceStore.AddDealForPiece(bd.Proposal.PieceCID, piecestore.DealInfo{
DealID: bd.ChainDealID,
SectorID: bd.PackingInfo.SectorNumber,
Offset: bd.PackingInfo.Offset,
Length: bd.PackingInfo.Size,
}); err != nil {
return cid.Undef, fmt.Errorf("failed to add deal to piecestore: %w", err)
}

// Register the deal data as a "shard" with the DAG store. Later it can be
// fetched from the DAG store during retrieval.
if err := stores.RegisterShardSync(ctx, p.dagStore, bd.Proposal.PieceCID, bd.CARFilePath, true); err != nil {
return cid.Undef, fmt.Errorf("failed to register deal with dagstore")
}

// announce the deal to the network indexer
annCid, err = p.env.AnnounceIndex(ctx, bd.ProposalCid, bd.Proposal)
if err != nil {
return cid.Undef, fmt.Errorf("failed to announce deal to network indexer: %w", err)
}

return annCid, nil
}

func newProviderStateMachine(ds datastore.Batching, env fsm.Environment, notifier fsm.Notifier, storageMigrations versioning.VersionedMigrationList, target versioning.VersionKey) (fsm.Group, func(context.Context) error, error) {
return versionedfsm.NewVersionedFSM(ds, fsm.Parameters{
Environment: env,
Expand Down
11 changes: 6 additions & 5 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
metadata2 "github.com/filecoin-project/index-provider/metadata"
"github.com/filecoin-project/specs-actors/actors/builtin/market"

"github.com/filecoin-project/go-fil-markets/commp"
"github.com/filecoin-project/go-fil-markets/filestore"
Expand All @@ -39,11 +40,11 @@ func (p *providerDealEnvironment) RegisterShard(ctx context.Context, pieceCid ci

// AnnounceIndex informs indexer nodes that a new deal was received,
// so they can download its index
func (p *providerDealEnvironment) AnnounceIndex(ctx context.Context, deal storagemarket.MinerDeal) (advertCid cid.Cid, err error) {
func (p *providerDealEnvironment) AnnounceIndex(ctx context.Context, proposalCid cid.Cid, proposal market.DealProposal) (advertCid cid.Cid, err error) {
fm := metadata2.FilecoinV1Data{
PieceCID: deal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.Proposal.VerifiedDeal,
PieceCID: proposal.PieceCID,
FastRetrieval: true,
VerifiedDeal: proposal.VerifiedDeal,
}
dtm, err := fm.Encode(metadata2.GraphSyncV1)
if err != nil {
Expand All @@ -55,7 +56,7 @@ func (p *providerDealEnvironment) AnnounceIndex(ctx context.Context, deal storag
return cid.Undef, fmt.Errorf("cannot publish index record as indexer host failed to connect to the full node: %w", err)
}

return p.p.indexProvider.NotifyPut(ctx, deal.ProposalCid.Bytes(), dtm.ToIndexerMetadata())
return p.p.indexProvider.NotifyPut(ctx, proposalCid.Bytes(), dtm.ToIndexerMetadata())
}

func (p *providerDealEnvironment) RemoveIndex(ctx context.Context, proposalCid cid.Cid) error {
Expand Down
2 changes: 2 additions & 0 deletions storagemarket/impl/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestProvider_Migrations(t *testing.T) {
deps.ProviderAddr,
deps.StoredAsk,
&testharness.MeshCreatorStub{},
&testharness.BoostDealGetter{},
)
require.NoError(t, err)

Expand Down Expand Up @@ -224,6 +225,7 @@ func TestHandleDealStream(t *testing.T) {
deps.ProviderAddr,
deps.StoredAsk,
&testharness.MeshCreatorStub{},
&testharness.BoostDealGetter{},
)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ProviderDealEnvironment interface {
ReadCAR(path string) (*carv2.Reader, error)

RegisterShard(ctx context.Context, pieceCid cid.Cid, path string, eagerInit bool) error
AnnounceIndex(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error)
AnnounceIndex(ctx context.Context, proposalCid cid.Cid, proposal market.DealProposal) (cid.Cid, error)
RemoveIndex(ctx context.Context, proposalCid cid.Cid) error

FinalizeBlockstore(proposalCid cid.Cid) error
Expand Down Expand Up @@ -393,7 +393,7 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor
}

// announce the deal to the network indexer
annCid, err := environment.AnnounceIndex(ctx.Context(), deal)
annCid, err := environment.AnnounceIndex(ctx.Context(), deal.ProposalCid, deal.Proposal)
if err != nil {
log.Errorw("failed to announce index via reference provider", "proposalCid", deal.ProposalCid, "err", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ func (fe *fakeEnvironment) AwaitRestartTimeout() <-chan time.Time {
return fe.awaitRestartTimeout
}

func (fe *fakeEnvironment) AnnounceIndex(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) {
func (fe *fakeEnvironment) AnnounceIndex(ctx context.Context, _ cid.Cid, _ market.DealProposal) (cid.Cid, error) {
return cid.Undef, nil
}

Expand Down
12 changes: 12 additions & 0 deletions storagemarket/testharness/testharness.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func (m *MeshCreatorStub) Connect(context.Context) error {
return nil
}

type BoostDealGetter struct{}

func (bdg *BoostDealGetter) Get(proposalCid cid.Cid) (storagemarket.MinerDeal, error) {
return storagemarket.MinerDeal{}, nil
}

func (bdg *BoostDealGetter) GetAll() ([]storagemarket.MinerDeal, error) {
return nil, nil
}

func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, deps *dependencies.StorageDependencies, useStore bool, disableNewDeals bool, files ...string) *StorageHarness {
var file string
if len(files) == 0 {
Expand Down Expand Up @@ -125,6 +135,7 @@ func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, de
deps.ProviderAddr,
deps.StoredAsk,
&MeshCreatorStub{},
&BoostDealGetter{},
)
assert.NoError(t, err)

Expand Down Expand Up @@ -164,6 +175,7 @@ func (h *StorageHarness) CreateNewProvider(t *testing.T, ctx context.Context, td
h.ProviderAddr,
h.StoredAsk,
&MeshCreatorStub{},
&BoostDealGetter{},
)
require.NoError(t, err)
return provider
Expand Down