Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,57 @@ export class PendulumToMoonbeamXCMPhaseHandler extends BasePhaseHandler {
return balance.gte(expectedOutputAmountRaw);
};

const waitForMoonbeamArrival = async (timeoutMs: number = 120000): Promise<boolean> => {
const startTime = Date.now();
const pollIntervalMs = 5000;

while (Date.now() - startTime < timeoutMs) {
if (await didTokensArriveOnMoonbeam()) {
return true;
}
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
}
return false;
};

try {
// We have to check if the input token already arrived on Moonbeam and if it left Pendulum.
// If we'd only check if it arrived on Moonbeam, we might miss transferring them if the target account already has some tokens.
if ((await didTokensLeavePendulum()) && (await didTokensArriveOnMoonbeam())) {
// Check if we already have a stored XCM hash (XCM was submitted in a previous attempt)
if (state.state.pendulumToMoonbeamXcmHash) {
logger.info(
`PendulumToMoonbeamPhaseHandler: XCM already submitted (hash: ${state.state.pendulumToMoonbeamXcmHash}) for ramp ${state.id}. Waiting for arrival on Moonbeam...`
);

if (await didTokensArriveOnMoonbeam()) {
logger.info(`PendulumToMoonbeamPhaseHandler: Tokens already arrived on Moonbeam for ramp ${state.id}.`);
return this.transitionToNextPhase(state, this.nextPhaseSelector(state));
}

const arrived = await waitForMoonbeamArrival();
if (!arrived) {
throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after XCM was already submitted");
}
return this.transitionToNextPhase(state, this.nextPhaseSelector(state));
}

// Check if tokens already left Pendulum (XCM was submitted but hash wasn't stored due to crash)
if (await didTokensLeavePendulum()) {
logger.info(
`PendulumToMoonbeamPhaseHandler: Input token already arrived on Moonbeam, skipping XCM transfer for ramp ${state.id}.`
`PendulumToMoonbeamPhaseHandler: Tokens already left Pendulum for ramp ${state.id}. XCM likely submitted but hash not stored. Waiting for arrival on Moonbeam...`
);

if (await didTokensArriveOnMoonbeam()) {
logger.info(`PendulumToMoonbeamPhaseHandler: Tokens already arrived on Moonbeam for ramp ${state.id}.`);
return this.transitionToNextPhase(state, this.nextPhaseSelector(state));
}

const arrived = await waitForMoonbeamArrival();
if (!arrived) {
throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after tokens left Pendulum");
}
return this.transitionToNextPhase(state, this.nextPhaseSelector(state));
}

// No previous XCM submission detected, proceed with transfer
const { txData: pendulumToMoonbeamTransaction } = this.getPresignedTransaction(state, "pendulumToMoonbeamXcm");

if (typeof pendulumToMoonbeamTransaction !== "string") {
Expand All @@ -111,19 +152,24 @@ export class PendulumToMoonbeamXCMPhaseHandler extends BasePhaseHandler {
logger.info(
`PendulumToMoonbeamPhaseHandler: XCM transfer submitted with hash ${hash} for ramp ${state.id}. Waiting for the token to arrive on Moonbeam...`
);
await didTokensArriveOnMoonbeam();

// XCM is payed by the ephemeral, in GLMR, with a fixed value of MOONBEAM_XCM_FEE_GLMR
const subsidyAmount = nativeToDecimal(MOONBEAM_XCM_FEE_GLMR, 18).toNumber();
const hashToStore = hash ?? "0x";
await this.createSubsidy(state, subsidyAmount, SubsidyToken.GLMR, substrateEphemeralAddress, hashToStore);

// Store the hash immediately after submission to minimize crash window
state.state = {
...state.state,
pendulumToMoonbeamXcmHash: hash
};
await state.update({ state: state.state });

const arrived = await waitForMoonbeamArrival();
if (!arrived) {
throw this.createRecoverableError("Timeout waiting for tokens to arrive on Moonbeam after XCM submission");
}

// XCM is payed by the ephemeral, in GLMR, with a fixed value of MOONBEAM_XCM_FEE_GLMR
const subsidyAmount = nativeToDecimal(MOONBEAM_XCM_FEE_GLMR, 18).toNumber();
const hashToStore = hash ?? "0x";
await this.createSubsidy(state, subsidyAmount, SubsidyToken.GLMR, substrateEphemeralAddress, hashToStore);

return this.transitionToNextPhase(state, this.nextPhaseSelector(state));
} catch (e) {
console.error("Error in PendulumToMoonbeamPhase:", e);
Expand Down
63 changes: 33 additions & 30 deletions apps/api/src/api/services/phases/phase-processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import httpStatus from "http-status";
import logger from "../../../config/logger";
import { runWithRampContext } from "../../../config/ramp-context";
import RampState from "../../../models/rampState.model";
import { APIError } from "../../errors/api-error";
import { PhaseError, RecoverablePhaseError } from "../../errors/phase-error";
Expand Down Expand Up @@ -30,41 +31,43 @@ export class PhaseProcessor {
* @param rampId The ID of the ramping process
*/
public async processRamp(rampId: string): Promise<void> {
const state = await RampState.findByPk(rampId);
if (!state) {
throw new APIError({
message: `Ramp with ID ${rampId} not found`,
status: httpStatus.NOT_FOUND
});
}
return runWithRampContext(rampId, async () => {
const state = await RampState.findByPk(rampId);
if (!state) {
throw new APIError({
message: `Ramp with ID ${rampId} not found`,
status: httpStatus.NOT_FOUND
});
}

// Try to acquire the lock
let lockAcquired = await this.acquireLock(state);
if (!lockAcquired) {
if (this.isLockExpired(state)) {
logger.info(`Lock for ramp ${rampId} has expired. Ignoring previous lock and continue processing...`);
// Force release the expired lock and try to acquire it again
await this.releaseLock(state);
lockAcquired = await this.acquireLock(state);
if (!lockAcquired) {
logger.warn(`Failed to acquire lock for ramp ${rampId} even after clearing expired lock`);
// Try to acquire the lock
let lockAcquired = await this.acquireLock(state);
if (!lockAcquired) {
if (this.isLockExpired(state)) {
logger.info(`Lock for ramp ${rampId} has expired. Ignoring previous lock and continue processing...`);
// Force release the expired lock and try to acquire it again
await this.releaseLock(state);
lockAcquired = await this.acquireLock(state);
if (!lockAcquired) {
logger.warn(`Failed to acquire lock for ramp ${rampId} even after clearing expired lock`);
return;
}
} else {
logger.info(`Skipping processing for ramp ${rampId} as it's already being processed`);
return;
}
} else {
logger.info(`Skipping processing for ramp ${rampId} as it's already being processed`);
return;
}
}

try {
await this.processPhase(state);
// We just return, since the error management should be handled in the processPhase method.
// We do not want to crash the whole process if one ramp fails.
} catch (error) {
logger.error(`Error processing ramp ${rampId}: ${error}`);
} finally {
await this.releaseLock(state);
}
try {
await this.processPhase(state);
// We just return, since the error management should be handled in the processPhase method.
// We do not want to crash the whole process if one ramp fails.
} catch (error) {
logger.error(`Error processing ramp ${rampId}: ${error}`);
} finally {
await this.releaseLock(state);
}
});
}

/**
Expand Down
19 changes: 11 additions & 8 deletions apps/api/src/api/workers/cleanup.worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CronJob } from "cron";
import { Op } from "sequelize";
import logger from "../../config/logger";
import { runWithRampContext } from "../../config/ramp-context";
import RampState from "../../models/rampState.model";
import { postProcessHandlers } from "../services/phases/post-process";
import { BaseRampService } from "../services/ramp/base.service";
Expand Down Expand Up @@ -170,14 +171,16 @@ class CleanupWorker {
logger.info(`Found ${states.length} completed RampStates that need post-processing`);

const processPromises = states.map(async state => {
try {
await this.processCleanup(state);
return { stateId: state.id, status: "fulfilled" };
} catch (error) {
logger.error(`Error processing cleanup for state ${state.id}:`, error);
// Don't update the state here, processCleanup handles its own updates
return { reason: error, stateId: state.id, status: "rejected" };
}
return runWithRampContext(state.id, async () => {
try {
await this.processCleanup(state);
return { stateId: state.id, status: "fulfilled" };
} catch (error) {
logger.error(`Error processing cleanup:`, error);
// Don't update the state here, processCleanup handles its own updates
return { reason: error, stateId: state.id, status: "rejected" };
}
});
});

// Use allSettled to allow individual state processing to fail without stopping others
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/api/workers/ramp-recovery.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class RampRecoveryWorker {
// Process each stale state concurrently
const recoveryPromises = staleStates.map(async state => {
try {
logger.info(`Attempting recovery for ramp state ${state.id} in phase ${state.currentPhase}`);
// Process the state
logger.info(`Attempting recovery in phase ${state.currentPhase} for ramp ${state.id}`);
// Process the state (processRamp already wraps execution with runWithRampContext)
await phaseProcessor.processRamp(state.id);
logger.info(`Successfully processed ramp state ${state.id}`);
return { stateId: state.id, status: "fulfilled" };
Expand All @@ -101,7 +101,7 @@ class RampRecoveryWorker {
const updateError = updateE as Error;
logger.error(`Failed to update ramp state ${state.id} with error log:`, updateError);
// Log the original error as well if the update fails
logger.error(`Original recovery error for state ${state.id}:`, error);
logger.error(`Original recovery error for ${state.id}:`, error);
}
// Return a rejected status for Promise.allSettled
return { reason: error, stateId: state.id, status: "rejected" };
Expand Down
12 changes: 8 additions & 4 deletions apps/api/src/config/logger.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { StreamOptions } from "morgan";
import winston, { format } from "winston";
import { getRampId } from "./ramp-context";

const customFormat = winston.format.printf(
({ timestamp, level, message, label = "" }) => `[${timestamp}] ${level}\t ${label} ${message}`
);
const customFormat = winston.format.printf(({ timestamp, level, message, label = "" }) => {
const rampId = getRampId();
const rampPrefix = rampId ? `[${rampId}] ` : "";
const timestampPrefix = timestamp ? `[${timestamp}]` : "";
return `${timestampPrefix} ${level}${label ? ` ${label}` : ""} ${rampPrefix}${message}`;
});

const logger = winston.createLogger({
level: "info",
Expand All @@ -18,7 +22,7 @@ const logger = winston.createLogger({
format: format.combine(format.timestamp({ format: "MMM D, YYYY HH:mm:ss" }), format.prettyPrint(), customFormat)
}),
new winston.transports.Console({
format: format.combine(format.colorize(), winston.format.simple())
format: format.combine(format.colorize(), format.prettyPrint(), customFormat)
})
]
});
Expand Down
48 changes: 48 additions & 0 deletions apps/api/src/config/ramp-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { AsyncLocalStorage } from "async_hooks";

/**
* Context that is available during ramp processing.
* This can be extended with additional fields as needed.
*/
interface RampProcessingContext {
rampId: string;
}

/**
* AsyncLocalStorage instance for storing ramp processing context.
* This allows us to automatically propagate the rampId through async call chains
* without explicitly passing it as a parameter.
*/
const rampContextStorage = new AsyncLocalStorage<RampProcessingContext>();

/**
* Run a function within a ramp context.
* All async operations within the callback will have access to the rampId.
*
* @param rampId The ID of the ramp being processed
* @param fn The async function to run within the context
* @returns The result of the async function
*/
export function runWithRampContext<T>(rampId: string, fn: () => Promise<T>): Promise<T> {
return rampContextStorage.run({ rampId }, fn);
}

/**
* Get the current ramp ID from the AsyncLocalStorage context.
* Returns undefined if not running within a ramp context.
*
* @returns The current ramp ID or undefined
*/
export function getRampId(): string | undefined {
return rampContextStorage.getStore()?.rampId;
}

/**
* Get the full ramp context from AsyncLocalStorage.
* Returns undefined if not running within a ramp context.
*
* @returns The current ramp context or undefined
*/
export function getRampContext(): RampProcessingContext | undefined {
return rampContextStorage.getStore();
}