Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@
"editor.tabSize": 2,
"editor.insertSpaces": true,
"editor.formatOnSave": true,
"eslint.format.enable": false
}
"eslint.format.enable": false,
"typescript.tsdk": "node_modules/typescript/lib"
}
24 changes: 18 additions & 6 deletions apps/test-app-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,33 @@
"scripts": {
"test": "jest --passWithNoTests"
},
"files": [
"lib",
"static"
],
"dependencies": {
"@aws-sdk/client-s3": "^3.204.0",
"@aws-sdk/client-dynamodb": "^3.204.0",
"@aws-sdk/client-sqs": "^3.204.0",
"@aws-sdk/client-lambda": "^3.204.0",
"@aws-sdk/lib-dynamodb": "^3.204.0",
"@aws-sdk/client-s3": "^3.208.0",
"@aws-sdk/client-dynamodb": "^3.208.0",
"@aws-sdk/client-sqs": "^3.208.0",
"@aws-sdk/client-lambda": "^3.208.0",
"@aws-sdk/client-apigatewaymanagementapi": "^3.208.0",
"@aws-sdk/lib-dynamodb": "^3.208.0",
"@eventual/aws-runtime": "workspace:^",
"@eventual/core": "workspace:^",
"ulid": "^2.3.0"
"ulid": "^2.3.0",
"websocket": "^1.0.34",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-use-websocket": "^4.2.0"
},
"devDependencies": {
"@eventual/compiler": "workspace:^",
"@types/jest": "^29",
"@types/node": "^16",
"@types/aws-lambda": "^8.10.108",
"@types/websocket": "1.0.5",
"@types/react": "18.0.25",
"@types/react-dom": "^18.0.9",
"jest": "^29",
"ts-jest": "^29",
"ts-node": "^10.9.1",
Expand Down
2 changes: 1 addition & 1 deletion apps/test-app-runtime/src/open-account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface Owner {
lastName: string;
}

interface OpenAccountRequest {
export interface OpenAccountRequest {
accountId: string;
address: PostalAddress;
email: string;
Expand Down
33 changes: 33 additions & 0 deletions apps/test-app-runtime/src/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Handler } from "aws-lambda";
import { InvokeCommand, LambdaClient } from "@aws-sdk/client-lambda";
import { StartWorkflowRequest } from "@eventual/aws-runtime";
import type { OpenAccountRequest } from "./open-account.js";

const lambda = new LambdaClient({});
const workflowStartFunction = process.env.WORKFLOW_STARTER;

export const handler: Handler<{ count: number }> = async ({ count }) => {
for (let i = 0; i < count; i++) {
const bankRequest: OpenAccountRequest = {
accountId: String(i),
address: { address1: "", postalCode: "", address2: "" },
bankDetails: {
accountNumber: String(i),
accountType: "something",
personalOwner: { firstName: "sam", lastName: "sussman" },
routingNumber: "",
},
email: "",
};
const request: StartWorkflowRequest = {
input: bankRequest,
};

await lambda.send(
new InvokeCommand({
FunctionName: workflowStartFunction,
Payload: Buffer.from(JSON.stringify(request)),
})
);
}
};
49 changes: 49 additions & 0 deletions apps/test-app-runtime/src/tester/init-function.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Handler } from "aws-lambda";
import { DynamoDBClient, QueryCommand } from "@aws-sdk/client-dynamodb";
import {
ApiGatewayManagementApiClient,
PostToConnectionCommand,
} from "@aws-sdk/client-apigatewaymanagementapi";
import { InitMessage, ProgressState } from "./messages.js";

const tableName = process.env.TABLE_NAME ?? "";
const dynamo = new DynamoDBClient({});

export interface InitRequest {
connectionId: string;
url: string;
}

export const handler: Handler<InitRequest, void> = async (event) => {
const inProgressesResults = await dynamo.send(
new QueryCommand({
TableName: tableName,
KeyConditionExpression: "pk=:pk and begins_with(sk,:sk)",
FilterExpression: "done=:false",
ExpressionAttributeValues: {
":false": { BOOL: false },
":sk": { S: `P#` },
":pk": { S: "Progress" },
},
})
);

const apig = new ApiGatewayManagementApiClient({
endpoint: event.url,
});

const message: InitMessage = {
action: "init",
progresses:
inProgressesResults.Items?.map(
(p) => JSON.parse(p.state?.S ?? "{}") as ProgressState
) ?? [],
};

await apig.send(
new PostToConnectionCommand({
ConnectionId: event.connectionId,
Data: Buffer.from(JSON.stringify(message)),
})
);
};
26 changes: 26 additions & 0 deletions apps/test-app-runtime/src/tester/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
export interface ReportProgressMessage {
action: "progressUpdate";
progress: ProgressState;
}

export function isReportProgressMessage(
message: any
): message is ReportProgressMessage {
return "action" in message && message.action === "progressUpdate";
}

export interface ProgressState {
value: number;
goal: number;
id: string;
done: boolean;
}

export interface InitMessage {
action: "init";
progresses: ProgressState[];
}

export function isInitMessage(message: any): message is InitMessage {
return "action" in message && message.action === "init";
}
64 changes: 64 additions & 0 deletions apps/test-app-runtime/src/tester/website/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { useCallback, useEffect, useState } from "react";
import ReactDOM from "react-dom/client";
import useWebSocket from "react-use-websocket";
import {
isInitMessage,
isReportProgressMessage,
ProgressState,
} from "../messages.js";

// [WEBSOCKETURL] will be replaced at deployment time.
const websocketUrl = "[WEBSOCKETURL]";

const App = () => {
// TODO, make dynamic
const { lastJsonMessage, readyState, sendJsonMessage } =
useWebSocket.default(websocketUrl);

const startWorkflow = useCallback(() => sendJsonMessage({}), []);
const [progresses, setProgresses] = useState<
Record<string, ProgressState> | undefined
>();

useEffect(() => {
if (lastJsonMessage !== null) {
setProgresses((s) => {
if (isInitMessage(lastJsonMessage)) {
return Object.fromEntries(
lastJsonMessage.progresses.map((p) => [p.id, p])
);
} else if (isReportProgressMessage(lastJsonMessage)) {
return {
...s,
[lastJsonMessage.progress.id]: lastJsonMessage.progress,
};
}
return s;
});
}
}, [lastJsonMessage]);

useEffect(() => {
console.log(readyState);
}, [readyState]);

return (
<div>
<button onClick={startWorkflow}>click me</button>
{progresses ? (
<div>
{Object.values(progresses)
.reverse()
.map((s) => (
<div>
{s.id}:{s.value}/{s.goal}
</div>
))}
</div>
) : null}
</div>
);
};

const root = ReactDOM.createRoot(document.querySelector("#container")!);
root.render(<App />);
104 changes: 104 additions & 0 deletions apps/test-app-runtime/src/tester/websocket-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { APIGatewayProxyWebsocketHandlerV2 } from "aws-lambda";
import {
DeleteItemCommand,
DynamoDBClient,
PutItemCommand,
} from "@aws-sdk/client-dynamodb";
import { ExecutionHistoryClient, WorkflowClient } from "@eventual/aws-runtime";
import { SQSClient } from "@aws-sdk/client-sqs";
import {
InvocationType,
InvokeCommand,
LambdaClient,
} from "@aws-sdk/client-lambda";
import { InitRequest } from "./init-function.js";

const tableName = process.env.TABLE_NAME ?? "";
const workflowTable = process.env.WORKFLOW_TABLE ?? "";
const dynamo = new DynamoDBClient({});
const sqs = new SQSClient({});
const workflowQueueUrl = process.env.WORKFLOW_QUEUE_URL ?? "";
const lambda = new LambdaClient({});
const initFunctionName = process.env.INIT_FUNCTION_NAME;

const workflowClient = new WorkflowClient({
dynamo,
executionHistory: new ExecutionHistoryClient({
dynamo,
tableName: workflowTable,
}),
sqs,
tableName: workflowTable,
workflowQueueUrl,
});

export const handler: APIGatewayProxyWebsocketHandlerV2 = async (event) => {
console.log(JSON.stringify(event, null, 2));

const {
requestContext: { connectionId, routeKey },
} = event;

if (routeKey === "$connect") {
await dynamo.send(
new PutItemCommand({
Item: {
pk: { S: "Connection" },
sk: { S: `C#${connectionId}` },
connectionId: { S: connectionId },
},
TableName: tableName,
})
);

const initRequest: InitRequest = {
connectionId,
url: `https://${event.requestContext.domainName}/${event.requestContext.stage}`,
};

await lambda.send(
new InvokeCommand({
FunctionName: initFunctionName,
InvocationType: InvocationType.Event,
Payload: Buffer.from(JSON.stringify(initRequest)),
})
);

return {
statusCode: 200,
};
} else if (routeKey === "$disconnect") {
await dynamo.send(
new DeleteItemCommand({
Key: {
pk: { S: "Connection" },
sk: { S: `C#${connectionId}` },
},
TableName: tableName,
})
);

return {
statusCode: 200,
};
}

if (routeKey === "$default") {
console.log(event.body);

if (!event.body) {
throw new Error("No body!");
}

const request: Request = JSON.parse(event.body);

const started = await workflowClient.startWorkflow(request);

return {
body: JSON.stringify({ id: started }),
statusCode: 200,
};
}

throw new Error("Unknown route key: " + routeKey);
};
Loading