Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@
/**
* The well-known path for the agent card
*/
export const AGENT_CARD_PATH = ".well-known/agent-card.json";
export const AGENT_CARD_PATH = ".well-known/agent-card.json";

/**
* Default value of page size if undefined
*/
export const DEFAULT_PAGE_SIZE = 50;
7 changes: 7 additions & 0 deletions src/server/request_handler/a2a_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
GetTaskPushNotificationConfigParams,
ListTaskPushNotificationConfigParams,
DeleteTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
} from "../../types.js";

export interface A2ARequestHandler {
Expand Down Expand Up @@ -61,4 +63,9 @@ export interface A2ARequestHandler {
void,
undefined
>;

listTasks(
params: ListTasksParams
): Promise<ListTasksResult>;

}
34 changes: 33 additions & 1 deletion src/server/request_handler/default_request_handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { v4 as uuidv4 } from 'uuid'; // For generating unique IDs

import { Message, AgentCard, PushNotificationConfig, Task, MessageSendParams, TaskState, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, TaskQueryParams, TaskIdParams, TaskPushNotificationConfig, DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams, ListTaskPushNotificationConfigParams } from "../../types.js";
import { Message, AgentCard, PushNotificationConfig, Task, MessageSendParams, TaskState, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, TaskQueryParams, TaskIdParams, TaskPushNotificationConfig, DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams, ListTaskPushNotificationConfigParams, ListTasksParams, ListTasksResult } from "../../types.js";
import { AgentExecutor } from "../agent_execution/agent_executor.js";
import { RequestContext } from "../agent_execution/request_context.js";
import { A2AError } from "../error.js";
Expand All @@ -13,6 +13,8 @@ import { A2ARequestHandler } from "./a2a_request_handler.js";
import { InMemoryPushNotificationStore, PushNotificationStore } from '../push_notification/push_notification_store.js';
import { PushNotificationSender } from '../push_notification/push_notification_sender.js';
import { DefaultPushNotificationSender } from '../push_notification/default_push_notification_sender.js';
import { DEFAULT_PAGE_SIZE } from '../../constants.js';
import { isValidUnixTimestampMs } from '../utils.js';

const terminalStates: TaskState[] = ["completed", "failed", "canceled", "rejected"];

Expand Down Expand Up @@ -338,6 +340,15 @@ export class DefaultRequestHandler implements A2ARequestHandler {
return task;
}

async listTasks(
params: ListTasksParams
): Promise<ListTasksResult> {
if (!this.paramsTasksListAreValid(params)) {
throw A2AError.invalidParams(`Invalid method parameters.`);
}
return await this.taskStore.list(params);
}

async cancelTask(params: TaskIdParams): Promise<Task> {
const task = await this.taskStore.load(params.id);
if (!task) {
Expand Down Expand Up @@ -566,4 +577,25 @@ export class DefaultRequestHandler implements A2ARequestHandler {
// Send push notification in the background.
this.pushNotificationSender?.send(task);
}

// Check if the params for the TasksList function are valid
private paramsTasksListAreValid(params: ListTasksParams): boolean {
if(params.pageSize !== undefined && (params.pageSize > 100 || params.pageSize < 1)) {
return false;
}
if(params.pageToken !== undefined && Buffer.from(params.pageToken, 'base64').toString('base64') !== params.pageToken){
return false;
}
if(params.historyLength !== undefined && params.historyLength<0){
return false;
}
if(params.lastUpdatedAfter !== undefined && !isValidUnixTimestampMs(params.lastUpdatedAfter)){
return false;
}
const terminalStates: string[] = ["completed", "failed", "canceled", "rejected"];
if(params.status !== undefined && !terminalStates.includes(params.status)){
return false;
}
return true;
}
}
77 changes: 75 additions & 2 deletions src/server/store.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import fs from "fs/promises";
import fs, { lutimes } from "fs/promises";
import path from "path";
import {Task} from "../types.js";
import {ListTasksParams, ListTasksResult, Task} from "../types.js";
import { A2AError } from "./error.js";
import {
getCurrentTimestamp,
isArtifactUpdate,
isTaskStatusUpdate,
} from "./utils.js";
import { DEFAULT_PAGE_SIZE } from "../constants.js";

/**
* Simplified interface for task storage providers.
Expand All @@ -27,6 +28,15 @@ export interface TaskStore {
* @returns A promise resolving to an object containing the Task, or undefined if not found.
*/
load(taskId: string): Promise<Task | undefined>;

/**
* Retrieves a paginated and filtered list of tasks from the store.
*
* @param params An object containing criteria for filtering, sorting, and pagination.
* @returns A promise resolving to a `ListTasksResult` object, which includes the filtered and paginated tasks,
* the total number of tasks matching the criteria, the actual page size, and a token for the next page (if available).
*/
list(params: ListTasksParams): Promise<ListTasksResult>;
}

// ========================
Expand All @@ -47,4 +57,67 @@ export class InMemoryTaskStore implements TaskStore {
// Store copies to prevent internal mutation if caller reuses objects
this.store.set(task.id, {...task});
}

async list(params: ListTasksParams): Promise<ListTasksResult> {
// Returns the list of saved tasks
const lastUpdatedAfterDate = params.lastUpdatedAfter ? new Date(params.lastUpdatedAfter) : undefined;
const pageTokenDate = params.pageToken ? new Date(Buffer.from(params.pageToken, 'base64').toString('utf-8')) : undefined;
const filteredTasks = Array.from(this.store.values())
// Apply filters
.filter(task => !params.contextId || task.contextId === params.contextId)
.filter(task => !params.status || task.status.state === params.status)
.filter(task => {
if (!params.lastUpdatedAfter) return true;
if (!task.status.timestamp) return false; // Tasks without timestamp don't match 'lastUpdatedAfter'
return new Date(task.status.timestamp) > lastUpdatedAfterDate;
})
.filter(task => {
if (!params.pageToken) return true;
if (!task.status.timestamp) return false; // Tasks without timestamp don't match 'pageToken'
// pageToken is a timestamp, so we want tasks older than the pageToken
return new Date(task.status.timestamp) < pageTokenDate;
})

// Sort by timestamp in descending order (most recently updated tasks first)
filteredTasks.sort((t1, t2) => {
const ts1 = t1.status.timestamp ? new Date(t1.status.timestamp).getTime() : 0;
const ts2 = t2.status.timestamp ? new Date(t2.status.timestamp).getTime() : 0;
return ts2 - ts1;
});

// Apply pagination
let paginatedTasks = filteredTasks.slice(0, params.pageSize ?? DEFAULT_PAGE_SIZE);
let nextPageToken = '';
if (filteredTasks.length > paginatedTasks.length) {
const lastTaskOnPage = paginatedTasks[paginatedTasks.length - 1];
if (lastTaskOnPage && lastTaskOnPage.status.timestamp) {
nextPageToken = lastTaskOnPage.status.timestamp;
}
}

paginatedTasks = paginatedTasks.map(task => {
// Make a copy of the tasks to avoid modification of original stored object
const newTask = {
...task,
history: task.history ? [...task.history] : undefined,
artifacts: task.artifacts ? [...task.artifacts] : undefined,
};

const historyLength = params.historyLength ?? 0;
newTask.history = historyLength > 0 ? newTask.history?.slice(-historyLength) : [];

if (!params.includeArtifacts && newTask.artifacts){
newTask.artifacts = [];
}
return newTask;
})

return {
tasks: paginatedTasks,
totalSize: filteredTasks.length,
pageSize: paginatedTasks.length,
nextPageToken: Buffer.from(nextPageToken).toString('base64'), // Convert to base64
};
}

}
5 changes: 4 additions & 1 deletion src/server/transports/jsonrpc_transport_handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JSONRPCErrorResponse, MessageSendParams, TaskQueryParams, TaskIdParams, TaskPushNotificationConfig, A2ARequest, JSONRPCResponse, DeleteTaskPushNotificationConfigParams, ListTaskPushNotificationConfigParams } from "../../types.js";
import { JSONRPCErrorResponse, MessageSendParams, TaskQueryParams, TaskIdParams, TaskPushNotificationConfig, A2ARequest, JSONRPCResponse, DeleteTaskPushNotificationConfigParams, ListTaskPushNotificationConfigParams, ListTasksParams, TaskState } from "../../types.js";
import { A2AError } from "../error.js";
import { A2ARequestHandler } from "../request_handler/a2a_request_handler.js";

Expand Down Expand Up @@ -101,6 +101,9 @@ export class JsonRpcTransportHandler {
case 'tasks/get':
result = await this.requestHandler.getTask(rpcRequest.params);
break;
case 'tasks/list':
result = await this.requestHandler.listTasks(rpcRequest.params);
break;
case 'tasks/cancel':
result = await this.requestHandler.cancelTask(rpcRequest.params);
break;
Expand Down
16 changes: 16 additions & 0 deletions src/server/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,19 @@ export function isArtifactUpdate(
// Check if it has 'parts'
return isObject(update) && "parts" in update;
}

/**
* Checks if a given number is a valid Unix timestamp in milliseconds.
* A valid timestamp is a positive integer representing milliseconds since the Unix epoch.
* @param timestamp The number to validate.
* @returns True if the number is a valid Unix timestamp in milliseconds, false otherwise.
*/
export function isValidUnixTimestampMs(timestamp: number): boolean {
if (typeof timestamp !== 'number' || !Number.isInteger(timestamp)) {
return false;
}
if (timestamp <= 0) {
return false;
}
return true;
}
Loading