Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
741ba6d
removes the enabled flag on the protocols table, adds a new migration…
aristidesstaffieri Feb 19, 2026
2b3dd2a
updates diagram for checkpoint population flow to better reflect the …
aristidesstaffieri Feb 19, 2026
904fbc1
updates live ingestion classification diagram to better reflect the d…
aristidesstaffieri Feb 19, 2026
6452345
removes incorrect details about live ingestions relationship to proto…
aristidesstaffieri Feb 19, 2026
648e9d3
Updates the migration design to be aware of the history retention win…
aristidesstaffieri Feb 19, 2026
29d1aee
updates live ingestion state production diagram to better reflect the…
aristidesstaffieri Feb 19, 2026
43ee022
removes migration status checks from current state queries, exposes m…
aristidesstaffieri Feb 19, 2026
c7d35ce
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri Feb 20, 2026
6fd6fc5
renames known_wasms to protocol_wasms
aristidesstaffieri Feb 26, 2026
f5d6cb0
Add unit tests for tokenProcessor.ProcessContractCode
aristidesstaffieri Feb 27, 2026
905b425
services/wasm_ingestion: remove ProtocolValidator execution from Wasm…
Copilot Feb 27, 2026
eb533a4
Simplify ProcessContractCode to pass only WASM hashes, refactor Token…
aristidesstaffieri Mar 9, 2026
3b3d83a
Add protocol_contracts table and populate during checkpoint
aristidesstaffieri Mar 9, 2026
b2d2b61
Populate protocol_wasms and protocol_contracts during live ingestion …
aristidesstaffieri Mar 9, 2026
27a95ce
Fix FK violation when persisting protocol contracts with evicted WASMs
aristidesstaffieri Mar 9, 2026
e005c50
renames known_wasms to protocol_wasms for missing references in desig…
aristidesstaffieri Mar 10, 2026
1a504f5
Change protocol_wasms and protocol_contracts columns from TEXT to BYTEA
aristidesstaffieri Mar 10, 2026
499b106
Remove redundant protocol_id column from protocol_contracts table
aristidesstaffieri Mar 10, 2026
4edfbae
Use HashBytea type for protocol wasm/contract bytea fields
aristidesstaffieri Mar 11, 2026
0e38de7
replaces remaining known_wasms references in diagrams with protocol_w…
aristidesstaffieri Mar 11, 2026
00ba7de
Rename ProtocolContract to ProtocolContracts to match table name conv…
aristidesstaffieri Mar 12, 2026
5d82367
runs fmt and tidy to abide by lint rules
aristidesstaffieri Mar 12, 2026
5ef546c
Consolidate WasmIngestionService and checkpoint token logic into Chec…
aristidesstaffieri Mar 13, 2026
aa7fc0f
Fix checkpoint test mock expectations for consolidated CheckpointService
aristidesstaffieri Mar 13, 2026
3b91e05
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri Feb 20, 2026
78d7ae1
Remove dead known_wasms model, tests, and migration
aristidesstaffieri Mar 11, 2026
31ca62f
Rename Protocol and ProtocolWasm models to plural form matching table…
aristidesstaffieri Mar 13, 2026
c2aa73e
Add live protocol state production pipeline with dual CAS gating
aristidesstaffieri Mar 17, 2026
e131c27
test: consolidate data migration integration coverage
aristidesstaffieri Mar 17, 2026
f09fff4
Remove non-transactional reads from PersistLedgerData transaction
aristidesstaffieri Mar 18, 2026
656449c
Fix TOCTOU gap, missing metrics, and lock-during-IO in live ingestion
aristidesstaffieri Mar 18, 2026
cdf7709
Fix query storm from partial protocol contract cache refresh failure
aristidesstaffieri Mar 18, 2026
08716a9
Add protocol history migration service and asymmetric CAS integration…
aristidesstaffieri Mar 19, 2026
df962a2
Extract shared helpers between history migration and live ingestion
aristidesstaffieri Mar 20, 2026
f3717d4
Fix hardcoded cursor names in protocol history migration
aristidesstaffieri Mar 20, 2026
e1f7350
tweaks style and formatting to abide by the linter
aristidesstaffieri Mar 23, 2026
c47776e
Extract generic utilities from ingest helpers and eliminate helper file
aristidesstaffieri Mar 23, 2026
90c6dfd
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri Feb 20, 2026
0718c70
Add live protocol state production pipeline with dual CAS gating
aristidesstaffieri Mar 17, 2026
baab1ad
test: consolidate data migration integration coverage
aristidesstaffieri Mar 17, 2026
09d81b6
Fix query storm from partial protocol contract cache refresh failure
aristidesstaffieri Mar 18, 2026
aa71d5d
Add protocol-migrate current-state command and extract shared migrati…
aristidesstaffieri Mar 25, 2026
4d5f3e3
Validate RetryWithBackoff inputs to prevent wrapped-nil errors and bu…
aristidesstaffieri Mar 30, 2026
afc2286
Batch cursor reads in protocolProcessorsEligibleForProduction
aristidesstaffieri Mar 30, 2026
c195db3
Add BatchGetByProtocolIDs to ProtocolContractsModelInterface
aristidesstaffieri Apr 14, 2026
988ed8a
Remove stale known_wasms and wasm_ingestion files re-introduced by re…
aristidesstaffieri Apr 14, 2026
aec5901
removes extra blank line
aristidesstaffieri Apr 16, 2026
1e3482d
merge stale block together to remove redundancy
aristidesstaffieri Apr 16, 2026
96841cc
Hard-code latest ledger cursor name; deprecate configuration flag
aristidesstaffieri Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (c *ingestCmd) Command() *cobra.Command {
var sentryDSN string
var stellarEnvironment string
var ledgerBackendType string
var deprecatedLatestLedgerCursorName string
cfgOpts := config.ConfigOptions{
utils.DatabaseURLOption(&cfg.DatabaseURL),
utils.LogLevelOption(&cfg.LogLevel),
Expand All @@ -45,11 +46,11 @@ func (c *ingestCmd) Command() *cobra.Command {
},
{
Name: "latest-ledger-cursor-name",
Usage: "Name of last synced ledger cursor, used to keep track of the last ledger ingested by the service. When starting up, ingestion will resume from the ledger number stored in this record. It should be an unique name per container as different containers would overwrite the cursor value of its peers when using the same cursor name.",
Usage: "DEPRECATED: ignored. The latest ledger cursor name is now hard-coded and no longer configurable.",
OptType: types.String,
ConfigKey: &cfg.LatestLedgerCursorName,
FlagDefault: "latest_ingest_ledger",
Required: true,
ConfigKey: &deprecatedLatestLedgerCursorName,
FlagDefault: "",
Required: false,
},
{
Name: "oldest-ledger-cursor-name",
Expand Down Expand Up @@ -192,6 +193,10 @@ func (c *ingestCmd) Command() *cobra.Command {
return fmt.Errorf("setting values of config options: %w", err)
}

if deprecatedLatestLedgerCursorName != "" {
log.Warnf("--latest-ledger-cursor-name (LATEST_LEDGER_CURSOR_NAME) is deprecated and ignored; the cursor name is now hard-coded.")
}

// Convert ledger backend type string to typed value
switch ledgerBackendType {
case string(ingest.LedgerBackendTypeRPC):
Expand Down
164 changes: 127 additions & 37 deletions cmd/protocol_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/lib/pq"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
"github.com/stellar/go-stellar-sdk/support/config"
"github.com/stellar/go-stellar-sdk/support/log"

Expand Down Expand Up @@ -36,25 +37,33 @@ func (c *protocolMigrateCmd) Command() *cobra.Command {
}

cmd.AddCommand(c.historyCommand())
cmd.AddCommand(c.currentStateCommand())

return cmd
}

// historyCmdOpts holds the resolved flag values for `protocol-migrate history`.
type historyCmdOpts struct {
databaseURL string
rpcURL string
networkPassphrase string
protocolIDs []string
logLevel string
oldestLedgerCursorName string
ledgerBackendType string
datastoreConfigPath string
getLedgersLimit int
// migrationCommandOpts captures the shared flags for migration subcommands.
type migrationCommandOpts struct {
databaseURL string
rpcURL string
networkPassphrase string
protocolIDs []string
logLevel string
ledgerBackendType string
datastoreConfigPath string
getLedgersLimit int
}

func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
var opts historyCmdOpts
// buildMigrationCommand creates a cobra.Command with shared migration flags and validation.
// addFlags adds strategy-specific flags. extraValidate runs strategy-specific validation.
// runE receives the shared opts and executes the strategy-specific logic.
func buildMigrationCommand(
use, short, long string,
addFlags func(cmd *cobra.Command, opts *migrationCommandOpts),
extraValidate func() error,
runE func(opts *migrationCommandOpts) error,
) *cobra.Command {
var opts migrationCommandOpts

cfgOpts := config.ConfigOptions{
utils.DatabaseURLOption(&opts.databaseURL),
Expand Down Expand Up @@ -95,9 +104,9 @@ func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
}

cmd := &cobra.Command{
Use: "history",
Short: "Backfill protocol history state from oldest to latest ingested ledger",
Long: "Processes historical ledgers from oldest_ingest_ledger to the tip, producing protocol state changes and converging with live ingestion via CAS-gated cursors.",
Use: use,
Short: short,
Long: long,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if err := cfgOpts.RequireE(); err != nil {
return fmt.Errorf("requiring values of config options: %w", err)
Expand Down Expand Up @@ -131,10 +140,14 @@ func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
default:
return fmt.Errorf("invalid --ledger-backend-type %q, must be 'rpc' or 'datastore'", opts.ledgerBackendType)
}

if extraValidate != nil {
return extraValidate()
}
return nil
},
RunE: func(_ *cobra.Command, _ []string) error {
return c.RunHistory(opts)
return runE(&opts)
},
}

Expand All @@ -144,12 +157,32 @@ func (c *protocolMigrateCmd) historyCommand() *cobra.Command {

cmd.Flags().StringSliceVar(&opts.protocolIDs, "protocol-id", nil, "Protocol ID(s) to migrate (required, repeatable)")
cmd.Flags().StringVar(&opts.logLevel, "log-level", "", `Log level: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "PANIC"`)
cmd.Flags().StringVar(&opts.oldestLedgerCursorName, "oldest-ledger-cursor-name", data.OldestLedgerCursorName, "Name of the oldest ledger cursor in the ingest store. Must match the value used by the ingest service.")
var deprecatedLatestLedgerCursorName string
cmd.Flags().StringVar(&deprecatedLatestLedgerCursorName, "latest-ledger-cursor-name", "", "DEPRECATED: ignored. The latest ledger cursor name is now hard-coded.")
if err := cmd.Flags().MarkDeprecated("latest-ledger-cursor-name", "ignored; the cursor name is now hard-coded"); err != nil {
log.Fatalf("marking latest-ledger-cursor-name deprecated: %s", err.Error())
}

if addFlags != nil {
addFlags(cmd, &opts)
}

return cmd
}

func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error {
// runMigration handles the shared setup (processors, DB, models, ledger backend) and
// delegates to createAndRun for strategy-specific service creation and execution.
func runMigration(
label string,
opts *migrationCommandOpts,
createAndRun func(
ctx context.Context,
dbPool db.ConnectionPool,
ledgerBackend ledgerbackend.LedgerBackend,
models *data.Models,
processors []services.ProtocolProcessor,
) error,
) error {
ctx := context.Background()

// Build processors from protocol IDs using the dynamic registry
Expand All @@ -171,7 +204,7 @@ func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error {
if err != nil {
return fmt.Errorf("opening database connection: %w", err)
}
defer internalutils.DeferredClose(ctx, dbPool, "closing dbPool in protocol migrate history")
defer internalutils.DeferredClose(ctx, dbPool, fmt.Sprintf("closing dbPool in protocol migrate %s", label))

// Create models
sqlxDB, err := dbPool.SqlxDB(ctx)
Expand Down Expand Up @@ -205,23 +238,80 @@ func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error {
}
}()

service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
NetworkPassphrase: opts.networkPassphrase,
Processors: processors,
OldestLedgerCursorName: opts.oldestLedgerCursorName,
})
if err != nil {
return fmt.Errorf("creating protocol migrate history service: %w", err)
}
return createAndRun(ctx, dbPool, ledgerBackend, models, processors)
}

if err := service.Run(ctx, opts.protocolIDs); err != nil {
return fmt.Errorf("running protocol migrate history: %w", err)
}
func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
var oldestLedgerCursorName string

return buildMigrationCommand(
"history",
"Backfill protocol history state from oldest to latest ingested ledger",
"Processes historical ledgers from oldest_ingest_ledger to the tip, producing protocol state changes and converging with live ingestion via CAS-gated cursors.",
func(cmd *cobra.Command, opts *migrationCommandOpts) {
cmd.Flags().StringVar(&oldestLedgerCursorName, "oldest-ledger-cursor-name", data.OldestLedgerCursorName, "Name of the oldest ledger cursor in the ingest store. Must match the value used by the ingest service.")
Comment thread
aristidesstaffieri marked this conversation as resolved.
},
nil,
func(opts *migrationCommandOpts) error {
return runMigration("history", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
NetworkPassphrase: opts.networkPassphrase,
Processors: processors,
OldestLedgerCursorName: oldestLedgerCursorName,
})
if err != nil {
return fmt.Errorf("creating protocol migrate history service: %w", err)
}
if err := service.Run(ctx, opts.protocolIDs); err != nil {
return fmt.Errorf("running protocol migrate history: %w", err)
}
return nil
})
},
)
}

func (c *protocolMigrateCmd) currentStateCommand() *cobra.Command {
var startLedger uint32

return nil
return buildMigrationCommand(
"current-state",
"Build protocol current state from a start ledger forward",
"Processes ledgers from --start-ledger to the tip, building protocol current state and converging with live ingestion via CAS-gated cursors.",
func(cmd *cobra.Command, opts *migrationCommandOpts) {
cmd.Flags().Uint32Var(&startLedger, "start-ledger", 0, "Ledger sequence to begin current-state migration from (required)")
},
func() error {
if startLedger == 0 {
return fmt.Errorf("--start-ledger is required and must be > 0")
}
return nil
},
func(opts *migrationCommandOpts) error {
return runMigration("current-state", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
service, err := services.NewProtocolMigrateCurrentStateService(services.ProtocolMigrateCurrentStateConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
NetworkPassphrase: opts.networkPassphrase,
Processors: processors,
StartLedger: startLedger,
})
if err != nil {
return fmt.Errorf("creating protocol migrate current-state service: %w", err)
}
if err := service.Run(ctx, opts.protocolIDs); err != nil {
return fmt.Errorf("running protocol migrate current-state: %w", err)
}
return nil
})
},
)
}
39 changes: 39 additions & 0 deletions internal/data/ingest_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,45 @@ func (m *IngestStoreModel) Get(ctx context.Context, cursorName string) (uint32,
return lastSyncedLedger, nil
}

func (m *IngestStoreModel) GetMany(ctx context.Context, keys []string) (map[string]uint32, error) {
if len(keys) == 0 {
return nil, nil
}

const query = `SELECT key, value FROM ingest_store WHERE key = ANY($1)`

start := time.Now()
rows, err := m.DB.PgxPool().Query(ctx, query, keys)
if err != nil {
m.MetricsService.IncDBQueryError("GetMany", "ingest_store", utils.GetDBErrorType(err))
return nil, fmt.Errorf("getting values for keys: %w", err)
}
defer rows.Close()

result := make(map[string]uint32, len(keys))
for rows.Next() {
var key, value string
if err := rows.Scan(&key, &value); err != nil {
m.MetricsService.IncDBQueryError("GetMany", "ingest_store", utils.GetDBErrorType(err))
return nil, fmt.Errorf("scanning ingest_store row: %w", err)
}
parsed, err := strconv.ParseUint(value, 10, 32)
if err != nil {
m.MetricsService.IncDBQueryError("GetMany", "ingest_store", utils.GetDBErrorType(err))
return nil, fmt.Errorf("parsing value for key %s: %w", key, err)
}
result[key] = uint32(parsed)
}
if err := rows.Err(); err != nil {
m.MetricsService.IncDBQueryError("GetMany", "ingest_store", utils.GetDBErrorType(err))
return nil, fmt.Errorf("iterating ingest_store rows: %w", err)
}

m.MetricsService.ObserveDBQueryDuration("GetMany", "ingest_store", time.Since(start).Seconds())
m.MetricsService.IncDBQuery("GetMany", "ingest_store")
return result, nil
}

func (m *IngestStoreModel) Update(ctx context.Context, dbTx pgx.Tx, cursorName string, ledger uint32) error {
const query = `
INSERT INTO ingest_store (key, value) VALUES ($1, $2)
Expand Down
Loading
Loading