diff --git a/apps/api/src/api/services/phases/handlers/pendulum-to-moonbeam-xcm-handler.ts b/apps/api/src/api/services/phases/handlers/pendulum-to-moonbeam-xcm-handler.ts index 9d1832268..ea0a7fc0b 100644 --- a/apps/api/src/api/services/phases/handlers/pendulum-to-moonbeam-xcm-handler.ts +++ b/apps/api/src/api/services/phases/handlers/pendulum-to-moonbeam-xcm-handler.ts @@ -85,16 +85,57 @@ export class PendulumToMoonbeamXCMPhaseHandler extends BasePhaseHandler { return balance.gte(expectedOutputAmountRaw); }; + const waitForMoonbeamArrival = async (timeoutMs: number = 120000): Promise => { + const startTime = Date.now(); + const pollIntervalMs = 5000; + + while (Date.now() - startTime < timeoutMs) { + if (await didTokensArriveOnMoonbeam()) { + return true; + } + await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + } + return false; + }; + try { - // We have to check if the input token already arrived on Moonbeam and if it left Pendulum. - // If we'd only check if it arrived on Moonbeam, we might miss transferring them if the target account already has some tokens. - if ((await didTokensLeavePendulum()) && (await didTokensArriveOnMoonbeam())) { + // Check if we already have a stored XCM hash (XCM was submitted in a previous attempt) + if (state.state.pendulumToMoonbeamXcmHash) { + logger.info( + `PendulumToMoonbeamPhaseHandler: XCM already submitted (hash: ${state.state.pendulumToMoonbeamXcmHash}) for ramp ${state.id}. Waiting for arrival on Moonbeam...` + ); + + if (await didTokensArriveOnMoonbeam()) { + logger.info(`PendulumToMoonbeamPhaseHandler: Tokens already arrived on Moonbeam for ramp ${state.id}.`); + return this.transitionToNextPhase(state, this.nextPhaseSelector(state)); + } + + const arrived = await waitForMoonbeamArrival(); + if (!arrived) { + throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after XCM was already submitted"); + } + return this.transitionToNextPhase(state, this.nextPhaseSelector(state)); + } + + // Check if tokens already left Pendulum (XCM was submitted but hash wasn't stored due to crash) + if (await didTokensLeavePendulum()) { logger.info( - `PendulumToMoonbeamPhaseHandler: Input token already arrived on Moonbeam, skipping XCM transfer for ramp ${state.id}.` + `PendulumToMoonbeamPhaseHandler: Tokens already left Pendulum for ramp ${state.id}. XCM likely submitted but hash not stored. Waiting for arrival on Moonbeam...` ); + + if (await didTokensArriveOnMoonbeam()) { + logger.info(`PendulumToMoonbeamPhaseHandler: Tokens already arrived on Moonbeam for ramp ${state.id}.`); + return this.transitionToNextPhase(state, this.nextPhaseSelector(state)); + } + + const arrived = await waitForMoonbeamArrival(); + if (!arrived) { + throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after tokens left Pendulum"); + } return this.transitionToNextPhase(state, this.nextPhaseSelector(state)); } + // No previous XCM submission detected, proceed with transfer const { txData: pendulumToMoonbeamTransaction } = this.getPresignedTransaction(state, "pendulumToMoonbeamXcm"); if (typeof pendulumToMoonbeamTransaction !== "string") { @@ -111,19 +152,24 @@ export class PendulumToMoonbeamXCMPhaseHandler extends BasePhaseHandler { logger.info( `PendulumToMoonbeamPhaseHandler: XCM transfer submitted with hash ${hash} for ramp ${state.id}. Waiting for the token to arrive on Moonbeam...` ); - await didTokensArriveOnMoonbeam(); - - // XCM is payed by the ephemeral, in GLMR, with a fixed value of MOONBEAM_XCM_FEE_GLMR - const subsidyAmount = nativeToDecimal(MOONBEAM_XCM_FEE_GLMR, 18).toNumber(); - const hashToStore = hash ?? "0x"; - await this.createSubsidy(state, subsidyAmount, SubsidyToken.GLMR, substrateEphemeralAddress, hashToStore); + // Store the hash immediately after submission to minimize crash window state.state = { ...state.state, pendulumToMoonbeamXcmHash: hash }; await state.update({ state: state.state }); + const arrived = await waitForMoonbeamArrival(); + if (!arrived) { + throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after XCM submission"); + } + + // XCM is payed by the ephemeral, in GLMR, with a fixed value of MOONBEAM_XCM_FEE_GLMR + const subsidyAmount = nativeToDecimal(MOONBEAM_XCM_FEE_GLMR, 18).toNumber(); + const hashToStore = hash ?? "0x"; + await this.createSubsidy(state, subsidyAmount, SubsidyToken.GLMR, substrateEphemeralAddress, hashToStore); + return this.transitionToNextPhase(state, this.nextPhaseSelector(state)); } catch (e) { console.error("Error in PendulumToMoonbeamPhase:", e); diff --git a/apps/api/src/api/services/phases/phase-processor.ts b/apps/api/src/api/services/phases/phase-processor.ts index b825e16b3..82e6cb266 100644 --- a/apps/api/src/api/services/phases/phase-processor.ts +++ b/apps/api/src/api/services/phases/phase-processor.ts @@ -1,5 +1,6 @@ import httpStatus from "http-status"; import logger from "../../../config/logger"; +import { runWithRampContext } from "../../../config/ramp-context"; import RampState from "../../../models/rampState.model"; import { APIError } from "../../errors/api-error"; import { PhaseError, RecoverablePhaseError } from "../../errors/phase-error"; @@ -30,41 +31,43 @@ export class PhaseProcessor { * @param rampId The ID of the ramping process */ public async processRamp(rampId: string): Promise { - const state = await RampState.findByPk(rampId); - if (!state) { - throw new APIError({ - message: `Ramp with ID ${rampId} not found`, - status: httpStatus.NOT_FOUND - }); - } + return runWithRampContext(rampId, async () => { + const state = await RampState.findByPk(rampId); + if (!state) { + throw new APIError({ + message: `Ramp with ID ${rampId} not found`, + status: httpStatus.NOT_FOUND + }); + } - // Try to acquire the lock - let lockAcquired = await this.acquireLock(state); - if (!lockAcquired) { - if (this.isLockExpired(state)) { - logger.info(`Lock for ramp ${rampId} has expired. Ignoring previous lock and continue processing...`); - // Force release the expired lock and try to acquire it again - await this.releaseLock(state); - lockAcquired = await this.acquireLock(state); - if (!lockAcquired) { - logger.warn(`Failed to acquire lock for ramp ${rampId} even after clearing expired lock`); + // Try to acquire the lock + let lockAcquired = await this.acquireLock(state); + if (!lockAcquired) { + if (this.isLockExpired(state)) { + logger.info(`Lock for ramp ${rampId} has expired. Ignoring previous lock and continue processing...`); + // Force release the expired lock and try to acquire it again + await this.releaseLock(state); + lockAcquired = await this.acquireLock(state); + if (!lockAcquired) { + logger.warn(`Failed to acquire lock for ramp ${rampId} even after clearing expired lock`); + return; + } + } else { + logger.info(`Skipping processing for ramp ${rampId} as it's already being processed`); return; } - } else { - logger.info(`Skipping processing for ramp ${rampId} as it's already being processed`); - return; } - } - try { - await this.processPhase(state); - // We just return, since the error management should be handled in the processPhase method. - // We do not want to crash the whole process if one ramp fails. - } catch (error) { - logger.error(`Error processing ramp ${rampId}: ${error}`); - } finally { - await this.releaseLock(state); - } + try { + await this.processPhase(state); + // We just return, since the error management should be handled in the processPhase method. + // We do not want to crash the whole process if one ramp fails. + } catch (error) { + logger.error(`Error processing ramp ${rampId}: ${error}`); + } finally { + await this.releaseLock(state); + } + }); } /** diff --git a/apps/api/src/api/workers/cleanup.worker.ts b/apps/api/src/api/workers/cleanup.worker.ts index 3e3e3622d..5dc2d4692 100644 --- a/apps/api/src/api/workers/cleanup.worker.ts +++ b/apps/api/src/api/workers/cleanup.worker.ts @@ -1,6 +1,7 @@ import { CronJob } from "cron"; import { Op } from "sequelize"; import logger from "../../config/logger"; +import { runWithRampContext } from "../../config/ramp-context"; import RampState from "../../models/rampState.model"; import { postProcessHandlers } from "../services/phases/post-process"; import { BaseRampService } from "../services/ramp/base.service"; @@ -170,14 +171,16 @@ class CleanupWorker { logger.info(`Found ${states.length} completed RampStates that need post-processing`); const processPromises = states.map(async state => { - try { - await this.processCleanup(state); - return { stateId: state.id, status: "fulfilled" }; - } catch (error) { - logger.error(`Error processing cleanup for state ${state.id}:`, error); - // Don't update the state here, processCleanup handles its own updates - return { reason: error, stateId: state.id, status: "rejected" }; - } + return runWithRampContext(state.id, async () => { + try { + await this.processCleanup(state); + return { stateId: state.id, status: "fulfilled" }; + } catch (error) { + logger.error(`Error processing cleanup:`, error); + // Don't update the state here, processCleanup handles its own updates + return { reason: error, stateId: state.id, status: "rejected" }; + } + }); }); // Use allSettled to allow individual state processing to fail without stopping others diff --git a/apps/api/src/api/workers/ramp-recovery.worker.ts b/apps/api/src/api/workers/ramp-recovery.worker.ts index d2cc7eeb1..61b048d1e 100644 --- a/apps/api/src/api/workers/ramp-recovery.worker.ts +++ b/apps/api/src/api/workers/ramp-recovery.worker.ts @@ -75,8 +75,8 @@ class RampRecoveryWorker { // Process each stale state concurrently const recoveryPromises = staleStates.map(async state => { try { - logger.info(`Attempting recovery for ramp state ${state.id} in phase ${state.currentPhase}`); - // Process the state + logger.info(`Attempting recovery in phase ${state.currentPhase} for ramp ${state.id}`); + // Process the state (processRamp already wraps execution with runWithRampContext) await phaseProcessor.processRamp(state.id); logger.info(`Successfully processed ramp state ${state.id}`); return { stateId: state.id, status: "fulfilled" }; @@ -101,7 +101,7 @@ class RampRecoveryWorker { const updateError = updateE as Error; logger.error(`Failed to update ramp state ${state.id} with error log:`, updateError); // Log the original error as well if the update fails - logger.error(`Original recovery error for state ${state.id}:`, error); + logger.error(`Original recovery error for ${state.id}:`, error); } // Return a rejected status for Promise.allSettled return { reason: error, stateId: state.id, status: "rejected" }; diff --git a/apps/api/src/config/logger.ts b/apps/api/src/config/logger.ts index 3bfdfe6d7..635265fec 100644 --- a/apps/api/src/config/logger.ts +++ b/apps/api/src/config/logger.ts @@ -1,9 +1,13 @@ import { StreamOptions } from "morgan"; import winston, { format } from "winston"; +import { getRampId } from "./ramp-context"; -const customFormat = winston.format.printf( - ({ timestamp, level, message, label = "" }) => `[${timestamp}] ${level}\t ${label} ${message}` -); +const customFormat = winston.format.printf(({ timestamp, level, message, label = "" }) => { + const rampId = getRampId(); + const rampPrefix = rampId ? `[${rampId}] ` : ""; + const timestampPrefix = timestamp ? `[${timestamp}]` : ""; + return `${timestampPrefix} ${level}${label ? ` ${label}` : ""} ${rampPrefix}${message}`; +}); const logger = winston.createLogger({ level: "info", @@ -18,7 +22,7 @@ const logger = winston.createLogger({ format: format.combine(format.timestamp({ format: "MMM D, YYYY HH:mm:ss" }), format.prettyPrint(), customFormat) }), new winston.transports.Console({ - format: format.combine(format.colorize(), winston.format.simple()) + format: format.combine(format.colorize(), format.prettyPrint(), customFormat) }) ] }); diff --git a/apps/api/src/config/ramp-context.ts b/apps/api/src/config/ramp-context.ts new file mode 100644 index 000000000..6cea0c8d6 --- /dev/null +++ b/apps/api/src/config/ramp-context.ts @@ -0,0 +1,48 @@ +import { AsyncLocalStorage } from "async_hooks"; + +/** + * Context that is available during ramp processing. + * This can be extended with additional fields as needed. + */ +interface RampProcessingContext { + rampId: string; +} + +/** + * AsyncLocalStorage instance for storing ramp processing context. + * This allows us to automatically propagate the rampId through async call chains + * without explicitly passing it as a parameter. + */ +const rampContextStorage = new AsyncLocalStorage(); + +/** + * Run a function within a ramp context. + * All async operations within the callback will have access to the rampId. + * + * @param rampId The ID of the ramp being processed + * @param fn The async function to run within the context + * @returns The result of the async function + */ +export function runWithRampContext(rampId: string, fn: () => Promise): Promise { + return rampContextStorage.run({ rampId }, fn); +} + +/** + * Get the current ramp ID from the AsyncLocalStorage context. + * Returns undefined if not running within a ramp context. + * + * @returns The current ramp ID or undefined + */ +export function getRampId(): string | undefined { + return rampContextStorage.getStore()?.rampId; +} + +/** + * Get the full ramp context from AsyncLocalStorage. + * Returns undefined if not running within a ramp context. + * + * @returns The current ramp context or undefined + */ +export function getRampContext(): RampProcessingContext | undefined { + return rampContextStorage.getStore(); +}