Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,37 @@ console.log("Agent response:", llmResponse);
if (llmResponse.status === "Done") console.log("Conversation finished.")
```

#### Stream a Conversation Response
Stream the agent's response in real-time to provide immediate feedback to users.

```javascript
const chat = store.ai.conversation(agent.identifier, "Performers/", {
parameters: {country: "France"}
});

// Register action handler
chat.handle("store-performer-details", async (req, performer) => {
const session = store.openSession();
await session.store(performer);
await session.saveChanges();
session.dispose();
return {success: true};
});

chat.setUserPrompt("Find the employee with largest profit and suggest rewards");

// Stream the "suggestedReward" property
let chunkedText = "";
const answer = await chat.stream("suggestedReward", async (chunk) => {
// Called for each streamed chunk
chunkedText += chunk;
});

console.log("chunkedText", chunkedText);

console.log("Final answer:", answer);
```

## Attachments

#### Store attachments
Expand Down
3 changes: 3 additions & 0 deletions src/Documents/Operations/AI/Agents/AiAgentToolQuery.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { AiAgentToolQueryOptions } from "./AiAgentToolQueryOptions.js";

export interface AiAgentToolQuery {
name: string;
description: string;
query: string;
parametersSampleObject?: string; // JSON example of parameters
parametersSchema?: string; // JSON schema for parameters
options?: AiAgentToolQueryOptions;
}
25 changes: 25 additions & 0 deletions src/Documents/Operations/AI/Agents/AiAgentToolQueryOptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Options for controlling when and how query tools are executed in AI agent conversations.
*/
export interface AiAgentToolQueryOptions {
/**
* When true, the model is allowed to execute this query on demand based on its own judgment.
* When false, the model cannot call this query (unless executed as part of initial context).
* When null/undefined, server-side defaults apply.
*/
allowModelQueries?: boolean;

/**
* When true, the query will be executed during the initial context build and its results provided to the model.
* When false, the query will not be executed for the initial context.
* When null/undefined, server-side defaults apply.
*
* Notes:
* - The query must not require model-supplied parameters (it may use agent-scope parameters).
* - If only addToInitialContext is true and allowModelQueries is false, the query runs only
* during the initial context and is not callable by the model afterward.
* - If both addToInitialContext and allowModelQueries are true, the query will run during
* the initial context and may also be invoked later by the model (e.g., to fetch fresh data).
*/
addToInitialContext?: boolean;
}
81 changes: 75 additions & 6 deletions src/Documents/Operations/AI/Agents/RunConversationOperation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { IMaintenanceOperation, OperationResultType } from "../../OperationAbstractions.js";
import { Stream } from "node:stream";
import { Readable, Stream } from "node:stream";
import { createInterface } from "node:readline";
import type { AiAgentActionResponse } from "./AiAgentActionResponse.js";
import type { AiConversationCreationOptions } from "./AiConversationCreationOptions.js";
import type { ConversationResult } from "./ConversationResult.js";
import type { AiStreamCallback } from "../AiStreamCallback.js";
import { RavenCommand } from "../../../../Http/RavenCommand.js";
import { DocumentConventions } from "../../../Conventions/DocumentConventions.js";
import { IRaftCommand } from "../../../../Http/IRaftCommand.js";
Expand All @@ -21,27 +23,39 @@ export class RunConversationOperation<TAnswer> implements IMaintenanceOperation<
private readonly _actionResponses?: AiAgentActionResponse[];
private readonly _options?: AiConversationCreationOptions;
private readonly _changeVector?: string;
private readonly _streamPropertyPath?: string;
private readonly _streamCallback?: AiStreamCallback;

public constructor(
agentId: string,
conversationId: string,
userPrompt?: string,
actionResponses?: AiAgentActionResponse[],
options?: AiConversationCreationOptions,
changeVector?: string
changeVector?: string,
streamPropertyPath?: string,
streamCallback?: AiStreamCallback
) {
if (StringUtil.isNullOrEmpty(agentId)) {
throwError("InvalidArgumentException", "agentId cannot be null or empty.");
}
if (StringUtil.isNullOrEmpty(conversationId)) {
throwError("InvalidArgumentException", "conversationId cannot be null or empty.");
}

// Both streamPropertyPath and streamCallback must be specified together or neither
if ((streamPropertyPath != null) !== (streamCallback != null)) {
throwError("InvalidOperationException", "Both streamPropertyPath and streamCallback must be specified together or neither.");
}

this._agentId = agentId;
this._conversationId = conversationId;
this._userPrompt = userPrompt;
this._actionResponses = actionResponses;
this._options = options;
this._changeVector = changeVector;
this._streamPropertyPath = streamPropertyPath;
this._streamCallback = streamCallback;
}

public get resultType(): OperationResultType {
Expand All @@ -56,7 +70,9 @@ export class RunConversationOperation<TAnswer> implements IMaintenanceOperation<
this._actionResponses,
this._options,
this._changeVector,
conventions
conventions,
this._streamPropertyPath,
this._streamCallback
);
}
}
Expand All @@ -70,6 +86,8 @@ class RunConversationCommand<TAnswer>
private readonly _actionResponses?: AiAgentActionResponse[];
private readonly _options?: AiConversationCreationOptions;
private readonly _changeVector?: string;
private readonly _streamPropertyPath?: string;
private readonly _streamCallback?: AiStreamCallback;
private _raftId: string;

public constructor(
Expand All @@ -79,7 +97,9 @@ class RunConversationCommand<TAnswer>
actionResponses: AiAgentActionResponse[] | undefined,
options: AiConversationCreationOptions | undefined,
changeVector: string | undefined,
conventions: DocumentConventions
conventions: DocumentConventions,
streamPropertyPath?: string,
streamCallback?: AiStreamCallback
) {
super();
this._conversationId = conversationId;
Expand All @@ -88,6 +108,13 @@ class RunConversationCommand<TAnswer>
this._actionResponses = actionResponses;
this._options = options;
this._changeVector = changeVector;
this._streamPropertyPath = streamPropertyPath;
this._streamCallback = streamCallback;

// When streaming is enabled, we need to handle raw response
if (this._streamPropertyPath && this._streamCallback) {
this._responseType = "Raw";
}

if (this._conversationId && this._conversationId.endsWith("|")) {
this._raftId = RaftIdGenerator.newId();
Expand All @@ -112,12 +139,17 @@ class RunConversationCommand<TAnswer>
uriParams.append("changeVector", this._changeVector);
}

if (this._streamPropertyPath) {
uriParams.append("streaming", "true");
uriParams.append("streamPropertyPath", this._streamPropertyPath);
}

const uri = `${node.url}/databases/${node.database}/ai/agent?${uriParams}`;

const bodyObj = {
ActionResponses: this._actionResponses,
UserPrompt: this._prompt,
CreationOptions: this._options
CreationOptions: this._options ?? {}
};

const headers = this._headers().typeAppJson().build();
Expand All @@ -144,6 +176,43 @@ class RunConversationCommand<TAnswer>
this._throwInvalidResponse();
}

return this._parseResponseDefaultAsync(bodyStream)
if (this._streamPropertyPath && this._streamCallback) {
return await this._processStreamingResponse(bodyStream as Readable);
}

return await this._parseResponseDefaultAsync(bodyStream);
}

private async _processStreamingResponse(bodyStream: Readable): Promise<string> {
const rl = createInterface({
input: bodyStream,
crlfDelay: Infinity
});

for await (const line of rl) {
if (!line || line.trim().length === 0) {
continue;
}

if (line.startsWith("{")) {
const jsonStream = Readable.from([line]);
let body: string = null;
this.result = await this._defaultPipeline(_ => body = _).process(jsonStream);
return body;
}

try {
const unescaped = JSON.parse(line);
await this._streamCallback!(unescaped);
} catch (err) {
await this._streamCallback!(line);
}
}

if (!this.result) {
throwError("InvalidOperationException", "No final result received in streaming response");
}

return null;
}
}
3 changes: 2 additions & 1 deletion src/Documents/Operations/AI/Agents/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./GetAiAgentsOperation.js";
export * from "./AddOrUpdateAiAgentOperation.js";
export * from "./DeleteAiAgentOperation.js";
export * from "./RunConversationOperation.js";
export * from "./RunConversationOperation.js";
export * from "./AiAgentToolQueryOptions.js";
Loading