Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions packages/chain/chainmanager/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import (
var ErrNotInCommittee = errors.New("ErrNotInCommittee")

type Output struct {
cmi *chainMgrImpl
cmi *ChainMgr
}

func (o *Output) LatestActiveAnchor() *isc.StateAnchor {
Expand Down Expand Up @@ -182,18 +182,14 @@ func (npt *NeedPublishTX) String() string {
)
}

type ChainMgr interface {
AsGPA() gpa.GPA
}

type committeeLogInst struct {
committeeAddr cryptolib.Address
dkShare tcrypto.DKShare
gpaInstance gpa.GPA
gpaInstance *gpa.OwnHandler[*committeelog.CommitteeLog]
pendingMsgs []gpa.Message
}

type chainMgrImpl struct {
type ChainMgr struct {
chainID isc.ChainID // This instance is responsible for this chain.
chainStore state.Store // Store of the chain state.
committeeLogs map[cryptolib.AddressKey]*committeeLogInst // All the committee log instances for this chain.
Expand Down Expand Up @@ -222,8 +218,7 @@ type chainMgrImpl struct {
}

var (
_ gpa.GPA = &chainMgrImpl{}
_ ChainMgr = &chainMgrImpl{}
_ gpa.GPA = &ChainMgr{}
)

func New(
Expand All @@ -244,8 +239,8 @@ func New(
postponeRecoveryMilestones int,
metrics *metrics.ChainCommitteeLogMetrics,
log log.Logger,
) (ChainMgr, error) {
cmi := &chainMgrImpl{
) (*ChainMgr, error) {
cmi := &ChainMgr{
chainID: chainID,
chainStore: chainStore,
committeeLogs: map[cryptolib.AddressKey]*committeeLogInst{},
Expand All @@ -269,17 +264,11 @@ func New(
log: log,
}
cmi.output = &Output{cmi: cmi}
cmi.asGPA = gpa.NewOwnHandler(me, cmi)
return cmi, nil
}

// Implements the CommitteeLog interface.
func (cmi *chainMgrImpl) AsGPA() gpa.GPA {
return cmi.asGPA
}

// Implements the gpa.GPA interface.
func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages {
func (cmi *ChainMgr) Input(input gpa.Input) gpa.OutMessages {
switch input := input.(type) {
case *inputAnchorConfirmed:
return cmi.handleInputAnchorConfirmed(input)
Expand All @@ -298,7 +287,7 @@ func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages {
}

// Implements the gpa.GPA interface.
func (cmi *chainMgrImpl) Message(msg gpa.Message) gpa.OutMessages {
func (cmi *ChainMgr) Message(msg gpa.Message) gpa.OutMessages {
switch msg := msg.(type) {
case *msgCommitteeLog:
return cmi.handleMsgCommitteeLog(msg)
Expand All @@ -317,7 +306,7 @@ func (cmi *chainMgrImpl) Message(msg gpa.Message) gpa.OutMessages {
// > Send Suspend to Last Active CommitteeLog; HandleCommitteeLogOutput(LatestActiveCmt)
// > Set LatestActiveCmt <- NIL
// > Set NeedConsensus <- NIL
func (cmi *chainMgrImpl) handleInputAnchorConfirmed(input *inputAnchorConfirmed) gpa.OutMessages {
func (cmi *ChainMgr) handleInputAnchorConfirmed(input *inputAnchorConfirmed) gpa.OutMessages {
cmi.log.LogDebugf("handleInputAnchorConfirmed: %+v", input)
//
// > Set LatestConfirmedAnchor <- ConfirmedAnchor
Expand Down Expand Up @@ -363,7 +352,7 @@ func (cmi *chainMgrImpl) handleInputAnchorConfirmed(input *inputAnchorConfirmed)
// > Forward it to ChainMgr; HandleCommitteeLogOutput.
// > ELSE
// > NOP // Anchor has to be received as Confirmed Anchor.
func (cmi *chainMgrImpl) handleInputChainTxPublishResult(input *inputChainTxPublishResult) gpa.OutMessages {
func (cmi *ChainMgr) handleInputChainTxPublishResult(input *inputChainTxPublishResult) gpa.OutMessages {
cmi.log.LogDebugf("handleInputChainTxPublishResult: %+v", input)
// > Clear the TX from the NeedPublishTX variable.
if cmi.needPublishTX.Has(input.txDigest.HashValue()) {
Expand All @@ -389,7 +378,7 @@ func (cmi *chainMgrImpl) handleInputChainTxPublishResult(input *inputChainTxPubl
// > Add ConsensusOutput.TX to NeedPublishTX
// > Forward the message to the corresponding CommitteeLog; HandleCommitteeLogOutput.
// > Update AccessNodes.
func (cmi *chainMgrImpl) handleInputConsensusOutputDone(input *inputConsensusOutputDone) gpa.OutMessages {
func (cmi *ChainMgr) handleInputConsensusOutputDone(input *inputConsensusOutputDone) gpa.OutMessages {
cmi.log.LogDebugf("handleInputConsensusOutputDone: %+v", input)
msgs := gpa.NoMessages()

Expand Down Expand Up @@ -439,22 +428,22 @@ func (cmi *chainMgrImpl) handleInputConsensusOutputDone(input *inputConsensusOut

// > UPON Reception of Consensus Output/SKIP:
// > Forward the message to the corresponding CommitteeLog; HandleCommitteeLogOutput.
func (cmi *chainMgrImpl) handleInputConsensusOutputSkip(input *inputConsensusOutputSkip) gpa.OutMessages {
func (cmi *ChainMgr) handleInputConsensusOutputSkip(input *inputConsensusOutputSkip) gpa.OutMessages {
return cmi.withCommitteeLog(input.committeeAddr, func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(committeelog.NewInputConsensusOutputSkip(input.logIndex))
})
}

// > UPON Reception of Consensus Timeout:
// > Forward the message to the corresponding CommitteeLog; HandleCommitteeLogOutput.
func (cmi *chainMgrImpl) handleInputConsensusTimeout(input *inputConsensusTimeout) gpa.OutMessages {
func (cmi *ChainMgr) handleInputConsensusTimeout(input *inputConsensusTimeout) gpa.OutMessages {
cmi.log.LogDebugf("handleInputConsensusTimeout: %+v", input)
return cmi.withCommitteeLog(input.committeeAddr, func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(committeelog.NewInputConsensusTimeout(input.logIndex))
})
}

func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages {
func (cmi *ChainMgr) handleInputCanPropose() gpa.OutMessages {
cmi.log.LogDebugf("handleInputCanPropose")
return cmi.withAllCommitteeLogs(func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(committeelog.NewInputCanPropose())
Expand All @@ -463,14 +452,14 @@ func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages {

// > UPON Reception of CommitteeLog.NextLI message:
// > Forward it to the corresponding CommitteeLog; HandleCommitteeLogOutput.
func (cmi *chainMgrImpl) handleMsgCommitteeLog(msg *msgCommitteeLog) gpa.OutMessages {
func (cmi *ChainMgr) handleMsgCommitteeLog(msg *msgCommitteeLog) gpa.OutMessages {
cmi.log.LogDebugf("handleMsgCommitteeLog: %+v", msg)
return cmi.withCommitteeLog(msg.committeeAddr, func(cl gpa.GPA) gpa.OutMessages {
return cl.Message(msg.wrapped)
})
}

func (cmi *chainMgrImpl) handleMsgBlockProduced(msg *msgBlockProduced) gpa.OutMessages {
func (cmi *ChainMgr) handleMsgBlockProduced(msg *msgBlockProduced) gpa.OutMessages {
cmi.log.LogDebugf("handleMsgBlockProduced: %+v", msg)
vsaTip, vsaUpdated, l1Commitment := cmi.varAccessNodeState.BlockProduced(msg.tx)
//
Expand Down Expand Up @@ -504,7 +493,7 @@ func (cmi *chainMgrImpl) handleMsgBlockProduced(msg *msgBlockProduced) gpa.OutMe
// > Suspend(LatestActiveCmt)
// > Set LatestActiveCmt <- cmt
// > Set NeedConsensus <- output.NeedConsensus
func (cmi *chainMgrImpl) handleCommitteeLogOutput(cli *committeeLogInst, cliMsgs gpa.OutMessages) gpa.OutMessages {
func (cmi *ChainMgr) handleCommitteeLogOutput(cli *committeeLogInst, cliMsgs gpa.OutMessages) gpa.OutMessages {
//
// > Wrap out messages.
msgs := gpa.NoMessages()
Expand Down Expand Up @@ -538,7 +527,7 @@ func (cmi *chainMgrImpl) handleCommitteeLogOutput(cli *committeeLogInst, cliMsgs
return msgs
}

func (cmi *chainMgrImpl) ensureNeedConsensus(cli *committeeLogInst, outputUntyped gpa.Output) {
func (cmi *ChainMgr) ensureNeedConsensus(cli *committeeLogInst, outputUntyped gpa.Output) {
wasEmpty := cmi.needConsensus.IsEmpty()
if outputUntyped == nil {
cmi.needConsensus.Clear()
Expand Down Expand Up @@ -596,12 +585,12 @@ func (cmi *chainMgrImpl) ensureNeedConsensus(cli *committeeLogInst, outputUntype
}

// Implements the gpa.GPA interface.
func (cmi *chainMgrImpl) Output() gpa.Output {
func (cmi *ChainMgr) Output() gpa.Output {
return cmi.output
}

// Implements the gpa.GPA interface.
func (cmi *chainMgrImpl) StatusString() string { // TODO: Call it periodically. Show the active committee.
func (cmi *ChainMgr) StatusString() string { // TODO: Call it periodically. Show the active committee.
return "{ChainMgr,...}" // TODO: Add more info.
// return fmt.Sprintf("{ChainMgr,confirmedAnchor=%v,activeAnchor=%v}",
// cmi.output.LatestConfirmedAnchor().GetObjectID().String(),
Expand All @@ -612,15 +601,15 @@ func (cmi *chainMgrImpl) StatusString() string { // TODO: Call it periodically.
////////////////////////////////////////////////////////////////////////////////
// Helper functions.

func (cmi *chainMgrImpl) wrapCommitteeLogMsgs(cli *committeeLogInst, outMsgs gpa.OutMessages) gpa.OutMessages {
func (cmi *ChainMgr) wrapCommitteeLogMsgs(cli *committeeLogInst, outMsgs gpa.OutMessages) gpa.OutMessages {
wrappedMsgs := gpa.NoMessages()
outMsgs.MustIterate(func(msg gpa.Message) {
wrappedMsgs.Add(NewMsgCommitteeLog(cli.committeeAddr, msg))
})
return wrappedMsgs
}

func (cmi *chainMgrImpl) suspendCommittee(committeeAddr *cryptolib.Address) gpa.OutMessages {
func (cmi *ChainMgr) suspendCommittee(committeeAddr *cryptolib.Address) gpa.OutMessages {
for _, cli := range cmi.committeeLogs {
if !cli.committeeAddr.Equals(committeeAddr) {
continue
Expand All @@ -630,7 +619,7 @@ func (cmi *chainMgrImpl) suspendCommittee(committeeAddr *cryptolib.Address) gpa.
return nil
}

func (cmi *chainMgrImpl) withCommitteeLog(committeeAddr cryptolib.Address, handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
func (cmi *ChainMgr) withCommitteeLog(committeeAddr cryptolib.Address, handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
cli, err := cmi.ensureCommitteeLog(committeeAddr)
if err != nil {
cmi.log.LogWarnf("cannot find committee: %v", committeeAddr)
Expand All @@ -639,7 +628,7 @@ func (cmi *chainMgrImpl) withCommitteeLog(committeeAddr cryptolib.Address, handl
return gpa.NoMessages().AddAll(cmi.handleCommitteeLogOutput(cli, handler(cli.gpaInstance)))
}

func (cmi *chainMgrImpl) withAllCommitteeLogs(handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
func (cmi *ChainMgr) withAllCommitteeLogs(handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
msgs := gpa.NoMessages()
for _, cli := range cmi.committeeLogs {
msgs.AddAll(cmi.handleCommitteeLogOutput(cli, handler(cli.gpaInstance)))
Expand All @@ -648,7 +637,7 @@ func (cmi *chainMgrImpl) withAllCommitteeLogs(handler func(cl gpa.GPA) gpa.OutMe
}

// NOTE: ErrNotInCommittee
func (cmi *chainMgrImpl) ensureCommitteeLog(committeeAddr cryptolib.Address) (*committeeLogInst, error) {
func (cmi *ChainMgr) ensureCommitteeLog(committeeAddr cryptolib.Address) (*committeeLogInst, error) {
if cli, ok := cmi.committeeLogs[committeeAddr.Key()]; ok {
return cli, nil
}
Expand Down Expand Up @@ -689,7 +678,7 @@ func (cmi *chainMgrImpl) ensureCommitteeLog(committeeAddr cryptolib.Address) (*c
if err != nil {
return nil, fmt.Errorf("cannot create committeeLog for committeeAddress=%v: %w", committeeAddr, err)
}
clGPA := clInst.AsGPA()
clGPA := gpa.NewOwnHandler(cmi.me, clInst)
cli := &committeeLogInst{
committeeAddr: committeeAddr,
dkShare: dkShare,
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/chainmanager/chain_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func testChainMgrBasic(t *testing.T, n, f int) {
log.NewChildLogger(nid.ShortString()),
)
require.NoError(t, err)
nodes[nid] = cm.AsGPA()
nodes[nid] = gpa.NewOwnHandler(nid, cm)
}
tc := gpa.NewTestContext(nodes)
tc.PrintAllStatusStrings("Started", t.Logf)
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/chainmanager/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
msgTypeBlockProduced
)

func (cmi *chainMgrImpl) UnmarshalMessage(data []byte) (gpa.Message, error) {
func (cmi *ChainMgr) UnmarshalMessage(data []byte) (gpa.Message, error) {
return gpa.UnmarshalMessage(data, gpa.Mapper{
msgTypeCommitteeLog: func() gpa.Message { return new(msgCommitteeLog) },
msgTypeBlockProduced: func() gpa.Message {
Expand Down
43 changes: 16 additions & 27 deletions packages/chain/committeelog/cmt_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ import (
"github.com/iotaledger/wasp/v2/packages/util/byzquorum"
)

// CommitteeLog is the public interface for this algorithm.
type CommitteeLog interface {
AsGPA() gpa.GPA
}

type State struct {
LogIndex LogIndex
}
Expand All @@ -58,7 +53,7 @@ var ErrCommitteeLogStateNotFound = errors.New("errCmtLogStateNotFound")
type Output = map[LogIndex]*isc.StateAnchor

// Protocol implementation.
type committeeLogImpl struct {
type CommitteeLog struct {
chainID isc.ChainID // Chain, for which this log is maintained by this committee.
committeeAddr *cryptolib.Address // Address of the committee running this chain.
consensusStateRegistry ConsensusStateRegistry // Persistent storage.
Expand All @@ -72,7 +67,7 @@ type committeeLogImpl struct {
log log.Logger
}

var _ gpa.GPA = &committeeLogImpl{}
var _ gpa.GPA = &CommitteeLog{}

// New constructs a new node instance for this protocol.
//
Expand All @@ -91,7 +86,7 @@ func New(
pipeliningLimit int,
cclMetrics *metrics.ChainCommitteeLogMetrics,
log log.Logger,
) (CommitteeLog, error) {
) (*CommitteeLog, error) {
//
// Load the last LogIndex we were working on.
var prevLI LogIndex
Expand Down Expand Up @@ -119,7 +114,7 @@ func New(

//
// Create it.
cl := &committeeLogImpl{
cl := &CommitteeLog{
chainID: chainID,
committeeAddr: committeeAddr,
consensusStateRegistry: consensusStateRegistry,
Expand Down Expand Up @@ -149,17 +144,11 @@ func New(
log.LogDebugf("VarLocalView: Output received, %v", ao)
return cl.varConsInsts.LatestL1Anchor(ao, cl.varLogIndex.ConsensusStarted)
}, log.NewChildLogger("VLV"))
cl.asGPA = gpa.NewOwnHandler(me, cl)
return cl, nil
}

// Implements the CommitteeLog interface.
func (cl *committeeLogImpl) AsGPA() gpa.GPA {
return cl.asGPA
}

// Implements the gpa.GPA interface.
func (cl *committeeLogImpl) Input(input gpa.Input) gpa.OutMessages {
func (cl *CommitteeLog) Input(input gpa.Input) gpa.OutMessages {
switch input.(type) {
case *inputCanPropose:
break // Don't log, its periodic.
Expand Down Expand Up @@ -187,7 +176,7 @@ func (cl *committeeLogImpl) Input(input gpa.Input) gpa.OutMessages {
}

// Implements the gpa.GPA interface.
func (cl *committeeLogImpl) Message(msg gpa.Message) gpa.OutMessages {
func (cl *CommitteeLog) Message(msg gpa.Message) gpa.OutMessages {
msgNLI, ok := msg.(*MsgNextLogIndex)
if !ok {
cl.log.LogWarnf("dropping unexpected message %T: %+v", msg, msg)
Expand All @@ -197,32 +186,32 @@ func (cl *committeeLogImpl) Message(msg gpa.Message) gpa.OutMessages {
}

// The latest anchor object's version confirmed at the L1.
func (cl *committeeLogImpl) handleInputAnchorConfirmed(input *inputAnchorConfirmed) gpa.OutMessages {
func (cl *CommitteeLog) handleInputAnchorConfirmed(input *inputAnchorConfirmed) gpa.OutMessages {
cl.suspended = false
return cl.varLocalView.AnchorConfirmed(input.anchor)
}

// Consensus completed with a decision to SKIP/⊥.
func (cl *committeeLogImpl) handleInputConsensusOutputSkip(input *inputConsensusOutputSkip) gpa.OutMessages {
func (cl *CommitteeLog) handleInputConsensusOutputSkip(input *inputConsensusOutputSkip) gpa.OutMessages {
return cl.varConsInsts.ConsOutputSkip(input.logIndex, cl.varLogIndex.ConsensusStarted)
}

// Consensus has decided, produced a TX and it is now confirmed by L1.
func (cl *committeeLogImpl) handleInputConsensusOutputConfirmed(input *inputConsensusOutputConfirmed) gpa.OutMessages {
func (cl *CommitteeLog) handleInputConsensusOutputConfirmed(input *inputConsensusOutputConfirmed) gpa.OutMessages {
return cl.varConsInsts.ConsOutputDone(input.logIndex, input.nextAnchor, cl.varLogIndex.ConsensusStarted)
}

// Consensus has decided, produced a TX but it was rejected by L1.
func (cl *committeeLogImpl) handleInputConsensusOutputRejected(input *inputConsensusOutputRejected) gpa.OutMessages {
func (cl *CommitteeLog) handleInputConsensusOutputRejected(input *inputConsensusOutputRejected) gpa.OutMessages {
return cl.varConsInsts.ConsOutputSkip(input.logIndex, cl.varLogIndex.ConsensusStarted) // This will cause proposal of our latest L1 Anchor.
}

// Consensus tries to decide for too long. Maybe quorum assumption has been violated.
func (cl *committeeLogImpl) handleInputConsensusTimeout(input *inputConsensusTimeout) gpa.OutMessages {
func (cl *CommitteeLog) handleInputConsensusTimeout(input *inputConsensusTimeout) gpa.OutMessages {
return cl.varConsInsts.ConsTimeout(input.logIndex, cl.varLogIndex.ConsensusStarted)
}

func (cl *committeeLogImpl) handleInputCanPropose() gpa.OutMessages {
func (cl *CommitteeLog) handleInputCanPropose() gpa.OutMessages {
msgs := gpa.NoMessages()
msgs.AddAll(cl.varConsInsts.Tick(cl.varLogIndex.ConsensusStarted))

Expand All @@ -238,18 +227,18 @@ func (cl *committeeLogImpl) handleInputCanPropose() gpa.OutMessages {
return msgs
}

func (cl *committeeLogImpl) handleInputSuspend() {
func (cl *CommitteeLog) handleInputSuspend() {
cl.suspended = true
}

// > ON Reception of ⟨NextLI, •⟩ message:
// > ...
func (cl *committeeLogImpl) handleMsgNextLogIndex(msg *MsgNextLogIndex) gpa.OutMessages {
func (cl *CommitteeLog) handleMsgNextLogIndex(msg *MsgNextLogIndex) gpa.OutMessages {
return cl.varLogIndex.MsgNextLogIndexReceived(msg)
}

// Implements the gpa.GPA interface.
func (cl *committeeLogImpl) Output() gpa.Output {
func (cl *CommitteeLog) Output() gpa.Output {
out := cl.output
if out == nil || cl.suspended {
return nil // Untyped nil.
Expand All @@ -258,7 +247,7 @@ func (cl *committeeLogImpl) Output() gpa.Output {
}

// Implements the gpa.GPA interface.
func (cl *committeeLogImpl) StatusString() string {
func (cl *CommitteeLog) StatusString() string {
return fmt.Sprintf(
"{committeeLogImpl, %v, %v, %v}",
cl.varConsInsts.StatusString(),
Expand Down
Loading
Loading