diff --git a/packages/cli/src/modules/chat-hub/chat-hub.constants.ts b/packages/cli/src/modules/chat-hub/chat-hub.constants.ts index 22f28ad7d3f01..d27e581b6f324 100644 --- a/packages/cli/src/modules/chat-hub/chat-hub.constants.ts +++ b/packages/cli/src/modules/chat-hub/chat-hub.constants.ts @@ -1,3 +1,6 @@ +import type { ChatHubLLMProvider } from '@n8n/api-types'; +import type { INodeTypeNameVersion } from 'n8n-workflow'; + export const CONVERSATION_TITLE_GENERATION_PROMPT = `Generate a concise, descriptive title for this conversation based on the user's message. Requirements: @@ -7,3 +10,37 @@ Requirements: - Only output the title, nothing else - Use the same language as the user's message `; + +export const PROVIDER_NODE_TYPE_MAP: Record = { + openai: { + name: '@n8n/n8n-nodes-langchain.lmChatOpenAi', + version: 1.2, + }, + anthropic: { + name: '@n8n/n8n-nodes-langchain.lmChatAnthropic', + version: 1.3, + }, + google: { + name: '@n8n/n8n-nodes-langchain.lmChatGoogleGemini', + version: 1.2, + }, +}; + +export const NODE_NAMES = { + CHAT_TRIGGER: 'When chat message received', + REPLY_AGENT: 'AI Agent', + TITLE_GENERATOR_AGENT: 'Title Generator Agent', + CHAT_MODEL: 'Chat Model', + MEMORY: 'Memory', + RESTORE_CHAT_MEMORY: 'Restore Chat Memory', + CLEAR_CHAT_MEMORY: 'Clear Chat Memory', +} as const; + +/* eslint-disable @typescript-eslint/naming-convention */ +export const JSONL_STREAM_HEADERS = { + 'Content-Type': 'application/json-lines; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', +}; +/* eslint-enable @typescript-eslint/naming-convention */ diff --git a/packages/cli/src/modules/chat-hub/chat-hub.service.ts b/packages/cli/src/modules/chat-hub/chat-hub.service.ts index 0399eaab2d59c..10e65b0557490 100644 --- a/packages/cli/src/modules/chat-hub/chat-hub.service.ts +++ b/packages/cli/src/modules/chat-hub/chat-hub.service.ts @@ -36,14 +36,14 @@ import { type IConnections, type INode, type INodeCredentials, - type INodeTypeNameVersion, - type ITaskData, type IWorkflowBase, type IWorkflowExecuteAdditionalData, type IRun, jsonParse, StructuredChunk, RESPOND_TO_CHAT_NODE_TYPE, + IExecuteData, + IRunExecutionData, } from 'n8n-workflow'; import { v4 as uuidv4 } from 'uuid'; @@ -61,7 +61,12 @@ import { WorkflowService } from '@/workflows/workflow.service'; import { ChatHubAgentService } from './chat-hub-agent.service'; import { ChatHubCredentialsService } from './chat-hub-credentials.service'; import type { ChatHubMessage } from './chat-hub-message.entity'; -import { CONVERSATION_TITLE_GENERATION_PROMPT } from './chat-hub.constants'; +import { + CONVERSATION_TITLE_GENERATION_PROMPT, + JSONL_STREAM_HEADERS, + NODE_NAMES, + PROVIDER_NODE_TYPE_MAP, +} from './chat-hub.constants'; import type { HumanMessagePayload, RegenerateMessagePayload, @@ -74,40 +79,6 @@ import { ChatHubSessionRepository } from './chat-session.repository'; import { getMaxContextWindowTokens } from './context-limits'; import { interceptResponseWrites, createStructuredChunkAggregator } from './stream-capturer'; -const providerNodeTypeMapping: Record = { - openai: { - name: '@n8n/n8n-nodes-langchain.lmChatOpenAi', - version: 1.2, - }, - anthropic: { - name: '@n8n/n8n-nodes-langchain.lmChatAnthropic', - version: 1.3, - }, - google: { - name: '@n8n/n8n-nodes-langchain.lmChatGoogleGemini', - version: 1.2, - }, -}; - -const NODE_NAMES = { - CHAT_TRIGGER: 'When chat message received', - REPLY_AGENT: 'AI Agent', - TITLE_GENERATOR_AGENT: 'Title Generator Agent', - CHAT_MODEL: 'Chat Model', - MEMORY: 'Memory', - RESTORE_CHAT_MEMORY: 'Restore Chat Memory', - CLEAR_CHAT_MEMORY: 'Clear Chat Memory', -} as const; - -/* eslint-disable @typescript-eslint/naming-convention */ -const JSONL_STREAM_HEADERS = { - 'Content-Type': 'application/json-lines; charset=utf-8', - 'Transfer-Encoding': 'chunked', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', -}; -/* eslint-enable @typescript-eslint/naming-convention */ - @Service() export class ChatHubService { constructor( @@ -219,7 +190,7 @@ export class ChatHubService { 'searchModels', 'parameters.model', additionalData, - providerNodeTypeMapping.openai, + PROVIDER_NODE_TYPE_MAP.openai, {}, credentials, ); @@ -244,7 +215,7 @@ export class ChatHubService { 'searchModels', 'parameters.model', additionalData, - providerNodeTypeMapping.anthropic, + PROVIDER_NODE_TYPE_MAP.anthropic, {}, credentials, ); @@ -307,7 +278,7 @@ export class ChatHubService { }, }, additionalData, - providerNodeTypeMapping.google, + PROVIDER_NODE_TYPE_MAP.google, {}, credentials, ); @@ -372,6 +343,7 @@ export class ChatHubService { } private async createChatWorkflow( + user: User, sessionId: ChatSessionId, projectId: string, history: ChatHubMessage[], @@ -381,12 +353,10 @@ export class ChatHubService { generateConversationTitle: boolean, trx?: EntityManager, systemMessage?: string, - ): Promise<{ - workflowData: IWorkflowBase; - triggerToStartFrom: { name: string; data: ITaskData }; - }> { + ) { return await withTransaction(this.workflowRepository.manager, trx, async (em) => { - const { nodes, connections, triggerToStartFrom } = this.prepareChatWorkflow({ + const { nodes, connections, executionData } = this.prepareChatWorkflow({ + user, sessionId, history, humanMessage, @@ -413,14 +383,16 @@ export class ChatHubService { }), ); + const workflowData = { + ...workflow, + nodes, + connections, + versionId: uuidv4(), + }; + return { - workflowData: { - ...workflow, - nodes, - connections, - versionId: uuidv4(), - }, - triggerToStartFrom, + workflowData, + executionData, }; }); } @@ -484,54 +456,64 @@ export class ChatHubService { const selectedModel = this.getModelWithCredentials(model, credentials); - const workflow = await this.messageRepository.manager.transaction(async (trx) => { - const session = await this.getChatSession(user, sessionId, selectedModel, true, trx); - await this.ensurePreviousMessage(previousMessageId, sessionId, trx); - const messages = Object.fromEntries((session.messages ?? []).map((m) => [m.id, m])); - const history = this.buildMessageHistory(messages, previousMessageId); - - await this.saveHumanMessage(payload, user, previousMessageId, selectedModel, undefined, trx); - - // generate title on receiving the first human message only - const generateTitle = previousMessageId === null; + const { executionData, workflowData } = await this.messageRepository.manager.transaction( + async (trx) => { + const session = await this.getChatSession(user, sessionId, selectedModel, true, trx); + await this.ensurePreviousMessage(previousMessageId, sessionId, trx); + const messages = Object.fromEntries((session.messages ?? []).map((m) => [m.id, m])); + const history = this.buildMessageHistory(messages, previousMessageId); - if (provider === 'n8n') { - return await this.prepareCustomAgentWorkflow( + await this.saveHumanMessage( + payload, user, - sessionId, - payload.model.workflowId, - message, + previousMessageId, + selectedModel, + undefined, + trx, ); - } - if (provider === 'custom-agent') { - return await this.prepareChatAgentWorkflow( - payload.model.agentId, + // generate title on receiving the first human message only + const generateTitle = previousMessageId === null; + + if (provider === 'n8n') { + return await this.prepareCustomAgentWorkflow( + user, + sessionId, + payload.model.workflowId, + message, + ); + } + + if (provider === 'custom-agent') { + return await this.prepareChatAgentWorkflow( + payload.model.agentId, + user, + sessionId, + history, + message, + generateTitle, + trx, + ); + } + + return await this.prepareBaseChatWorkflow( user, sessionId, + credentials, + model, history, message, generateTitle, trx, ); - } - - return await this.prepareBaseChatWorkflow( - user, - sessionId, - credentials, - model, - history, - message, - generateTitle, - trx, - ); - }); + }, + ); await this.executeChatWorkflowWithCleanup( res, user, - workflow, + workflowData, + executionData, sessionId, messageId, selectedModel, @@ -614,10 +596,13 @@ export class ChatHubService { return; } + const { workflowData, executionData } = workflow; + await this.executeChatWorkflowWithCleanup( res, user, - workflow, + workflowData, + executionData, sessionId, messageId, selectedModel, @@ -631,77 +616,81 @@ export class ChatHubService { const selectedModel = this.getModelWithCredentials(model, credentials); - const { workflow, retryOfMessageId, previousMessageId } = - await this.messageRepository.manager.transaction(async (trx) => { - const session = await this.getChatSession(user, sessionId, undefined, false, trx); - const messageToRetry = await this.getChatMessage(session.id, retryId, [], trx); + const { + workflow: { workflowData, executionData }, + retryOfMessageId, + previousMessageId, + } = await this.messageRepository.manager.transaction(async (trx) => { + const session = await this.getChatSession(user, sessionId, undefined, false, trx); + const messageToRetry = await this.getChatMessage(session.id, retryId, [], trx); - if (messageToRetry.type !== 'ai') { - throw new BadRequestError('Can only retry AI messages'); - } + if (messageToRetry.type !== 'ai') { + throw new BadRequestError('Can only retry AI messages'); + } - const messages = Object.fromEntries((session.messages ?? []).map((m) => [m.id, m])); - const history = this.buildMessageHistory(messages, messageToRetry.previousMessageId); + const messages = Object.fromEntries((session.messages ?? []).map((m) => [m.id, m])); + const history = this.buildMessageHistory(messages, messageToRetry.previousMessageId); - const lastHumanMessage = history.filter((m) => m.type === 'human').pop(); - if (!lastHumanMessage) { - throw new BadRequestError('No human message found to base the retry on'); - } + const lastHumanMessage = history.filter((m) => m.type === 'human').pop(); + if (!lastHumanMessage) { + throw new BadRequestError('No human message found to base the retry on'); + } - // Remove any (AI) messages that came after the last human message - const lastHumanMessageIndex = history.indexOf(lastHumanMessage); - if (lastHumanMessageIndex !== -1) { - history.splice(lastHumanMessageIndex + 1); - } + // Remove any (AI) messages that came after the last human message + const lastHumanMessageIndex = history.indexOf(lastHumanMessage); + if (lastHumanMessageIndex !== -1) { + history.splice(lastHumanMessageIndex + 1); + } - // Rerun the workflow, replaying the last human message + // Rerun the workflow, replaying the last human message - // If the message being retried is itself a retry, we want to point to the original message - const retryOfMessageId = messageToRetry.retryOfMessageId ?? messageToRetry.id; - const message = lastHumanMessage ? lastHumanMessage.content : ''; + // If the message being retried is itself a retry, we want to point to the original message + const retryOfMessageId = messageToRetry.retryOfMessageId ?? messageToRetry.id; + const message = lastHumanMessage ? lastHumanMessage.content : ''; - let workflow; - if (provider === 'n8n') { - workflow = await this.prepareCustomAgentWorkflow( - user, - sessionId, - payload.model.workflowId, - message, - ); - } else if (provider === 'custom-agent') { - workflow = await this.prepareChatAgentWorkflow( - payload.model.agentId, - user, - sessionId, - history, - message, - false, - trx, - ); - } else { - workflow = await this.prepareBaseChatWorkflow( - user, - sessionId, - credentials, - model, - history, - message, - false, - trx, - ); - } + let workflow; + if (provider === 'n8n') { + workflow = await this.prepareCustomAgentWorkflow( + user, + sessionId, + payload.model.workflowId, + message, + ); + } else if (provider === 'custom-agent') { + workflow = await this.prepareChatAgentWorkflow( + payload.model.agentId, + user, + sessionId, + history, + message, + false, + trx, + ); + } else { + workflow = await this.prepareBaseChatWorkflow( + user, + sessionId, + credentials, + model, + history, + message, + false, + trx, + ); + } - return { - workflow, - previousMessageId: lastHumanMessage.id, - retryOfMessageId, - }; - }); + return { + workflow, + previousMessageId: lastHumanMessage.id, + retryOfMessageId, + }; + }); await this.executeChatWorkflowWithCleanup( res, user, - workflow, + workflowData, + executionData, sessionId, previousMessageId, selectedModel, @@ -724,6 +713,7 @@ export class ChatHubService { const credential = await this.ensureCredentials(user, model, credentials, trx); return await this.createChatWorkflow( + user, sessionId, credential.projectId, history, @@ -816,6 +806,8 @@ export class ChatHubService { throw new BadRequestError('Workflow must have exactly one chat trigger'); } + const chatTriggerNode = chatTriggers[0]; + const chatResponseNodes = workflowEntity.nodes.filter( (node) => node.type === RESPOND_TO_CHAT_NODE_TYPE, ); @@ -826,38 +818,49 @@ export class ChatHubService { ); } - return { - workflowData: { - ...workflowEntity, - // Since this mechanism executes workflows as manual one-off executions - // we need to clear any pinData the WF might have. - // TODO: Implement a separate execution mode for chats to avoid such workarounds. - pinData: {}, - }, - triggerToStartFrom: { - name: chatTriggers[0].name, + const nodeExecutionStack: IExecuteData[] = [ + { + node: chatTriggerNode, data: { - startTime: Date.now(), - executionTime: 0, - executionIndex: 0, - executionStatus: 'success', - data: { - main: [ - [ - { - json: { - sessionId, - action: 'sendMessage', - chatInput: message, - }, + main: [ + [ + { + json: { + sessionId, + action: 'sendMessage', + chatInput: message, }, - ], + }, ], - }, - source: [null], - } satisfies ITaskData, + ], + }, + source: null, + }, + ]; + + const executionData: IRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + manualData: { + userId: user.id, }, }; + + return { + workflowData: { + ...workflowEntity, + }, + executionData, + }; } private async ensurePreviousMessage( @@ -906,17 +909,13 @@ export class ChatHubService { private async executeChatWorkflow( res: Response, user: User, - workflow: { - workflowData: IWorkflowBase; - triggerToStartFrom: { name: string; data?: ITaskData }; - }, + workflowData: IWorkflowBase, + executionData: IRunExecutionData, sessionId: ChatSessionId, previousMessageId: ChatMessageId, selectedModel: ModelWithCredentials, retryOfMessageId: ChatMessageId | null = null, ) { - const { workflowData, triggerToStartFrom } = workflow; - this.logger.debug( `Starting execution of workflow "${workflowData.name}" with ID ${workflowData.id}`, ); @@ -1012,16 +1011,14 @@ export class ChatHubService { stream.writeHead(200, JSONL_STREAM_HEADERS); stream.flushHeaders(); - const execution = await this.workflowExecutionService.executeManually( - { - workflowData, - triggerToStartFrom, - }, + const execution = await this.workflowExecutionService.executeChatWorkflow( + workflowData, + executionData, user, - undefined, - true, stream, + true, ); + executionId = execution.executionId; if (!executionId) { @@ -1073,10 +1070,8 @@ export class ChatHubService { private async executeChatWorkflowWithCleanup( res: Response, user: User, - workflow: { - workflowData: IWorkflowBase; - triggerToStartFrom: { name: string; data?: ITaskData }; - }, + workflowData: IWorkflowBase, + executionData: IRunExecutionData, sessionId: ChatSessionId, previousMessageId: ChatMessageId, selectedModel: ModelWithCredentials, @@ -1087,7 +1082,8 @@ export class ChatHubService { await this.executeChatWorkflow( res, user, - workflow, + workflowData, + executionData, sessionId, previousMessageId, selectedModel, @@ -1099,12 +1095,13 @@ export class ChatHubService { // that happens after executions might fail to find the workflow. // Once/if we add the new workflow flag to keep these WFs around this wouldn't be needed. await new Promise((resolve) => setTimeout(resolve, 3000)); - await this.deleteChatWorkflow(workflow.workflowData.id); + await this.deleteChatWorkflow(workflowData.id); } } } private prepareChatWorkflow({ + user, sessionId, history, humanMessage, @@ -1113,6 +1110,7 @@ export class ChatHubService { generateConversationTitle, systemMessage, }: { + user: User; sessionId: ChatSessionId; history: ChatHubMessage[]; humanMessage: string; @@ -1121,20 +1119,18 @@ export class ChatHubService { generateConversationTitle: boolean; systemMessage?: string; }) { + const chatTriggerNode: INode = { + parameters: {}, + type: CHAT_TRIGGER_NODE_TYPE, + typeVersion: 1.3, + position: [0, 0], + id: uuidv4(), + name: NODE_NAMES.CHAT_TRIGGER, + webhookId: uuidv4(), + }; + const nodes: INode[] = [ - { - parameters: { - public: true, - mode: 'webhook', - options: { responseMode: 'streaming' }, - }, - type: CHAT_TRIGGER_NODE_TYPE, - typeVersion: 1.3, - position: [0, 0], - id: uuidv4(), - name: NODE_NAMES.CHAT_TRIGGER, - webhookId: uuidv4(), - }, + chatTriggerNode, { parameters: { promptType: 'define', @@ -1275,16 +1271,9 @@ export class ChatHubService { }, }; - const triggerToStartFrom: { - name: string; - data: ITaskData; - } = { - name: NODE_NAMES.CHAT_TRIGGER, - data: { - startTime: Date.now(), - executionTime: 0, - executionIndex: 0, - executionStatus: 'success', + const nodeExecutionStack: IExecuteData[] = [ + { + node: chatTriggerNode, data: { main: [ [ @@ -1298,11 +1287,28 @@ export class ChatHubService { ], ], }, - source: [null], + source: null, + }, + ]; + + const executionData: IRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + manualData: { + userId: user.id, }, }; - return { nodes, connections, triggerToStartFrom }; + return { nodes, connections, executionData }; } private async saveHumanMessage( @@ -1465,10 +1471,10 @@ export class ChatHubService { const common = { position: [600, 500] as [number, number], id: uuidv4(), - name: 'Chat Model', + name: NODE_NAMES.CHAT_MODEL, credentials, - type: providerNodeTypeMapping[provider].name, - typeVersion: providerNodeTypeMapping[provider].version, + type: PROVIDER_NODE_TYPE_MAP[provider].name, + typeVersion: PROVIDER_NODE_TYPE_MAP[provider].version, }; switch (provider) { diff --git a/packages/cli/src/modules/insights/insights-collection.service.ts b/packages/cli/src/modules/insights/insights-collection.service.ts index 5ee16e99d4b14..0e22b9374a911 100644 --- a/packages/cli/src/modules/insights/insights-collection.service.ts +++ b/packages/cli/src/modules/insights/insights-collection.service.ts @@ -40,6 +40,9 @@ const shouldSkipMode: Record = { internal: true, manual: true, + + // n8n Chat hub messages + chat: true, }; const MIN_RUNTIME = 0; diff --git a/packages/cli/src/services/workflow-statistics.service.ts b/packages/cli/src/services/workflow-statistics.service.ts index 0b381c9ea995f..fdfbadedf9d90 100644 --- a/packages/cli/src/services/workflow-statistics.service.ts +++ b/packages/cli/src/services/workflow-statistics.service.ts @@ -42,6 +42,9 @@ const isModeRootExecution = { internal: false, manual: false, + + // n8n Chat hub messages + chat: true, } satisfies Record; type WorkflowStatisticsEvents = { diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index f2ad946123083..d35e1dfd02472 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -180,7 +180,8 @@ export class WorkflowRunner { if ( this.executionsConfig.mode !== 'queue' || this.instanceSettings.instanceType === 'worker' || - data.executionMode === 'manual' + data.executionMode === 'manual' || + data.executionMode === 'chat' ) { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); postExecutePromise.catch((error) => { diff --git a/packages/cli/src/workflows/workflow-execution.service.ts b/packages/cli/src/workflows/workflow-execution.service.ts index 58369bdad5dab..0337e86c1a216 100644 --- a/packages/cli/src/workflows/workflow-execution.service.ts +++ b/packages/cli/src/workflows/workflow-execution.service.ts @@ -233,6 +233,29 @@ export class WorkflowExecutionService { }; } + async executeChatWorkflow( + workflowData: IWorkflowBase, + executionData: IRunExecutionData, + user: User, + httpResponse: Response, + streamingEnabled: boolean = true, + ) { + const data: IWorkflowExecutionDataProcess = { + executionMode: 'chat', + workflowData, + userId: user.id, + executionData, + streamingEnabled, + httpResponse, + }; + + const executionId = await this.workflowRunner.run(data, undefined, true); + + return { + executionId, + }; + } + /** Executes an error workflow */ async executeErrorWorkflow( workflowId: string, diff --git a/packages/core/src/execution-engine/node-execution-context/execute-context.ts b/packages/core/src/execution-engine/node-execution-context/execute-context.ts index 7ad823f636a61..a0af6bcbb6c39 100644 --- a/packages/core/src/execution-engine/node-execution-context/execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/execute-context.ts @@ -142,7 +142,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti const streamingEnabled = this.additionalData.streamingEnabled === true; // Check current execution mode supports streaming - const executionModeSupportsStreaming = ['manual', 'webhook', 'integrated']; + const executionModeSupportsStreaming = ['manual', 'webhook', 'integrated', 'chat']; const isStreamingMode = executionModeSupportsStreaming.includes(this.mode); return hasHandlers && isStreamingMode && streamingEnabled; diff --git a/packages/frontend/editor-ui/src/features/execution/executions/components/global/GlobalExecutionsListItem.vue b/packages/frontend/editor-ui/src/features/execution/executions/components/global/GlobalExecutionsListItem.vue index bff2961c672fc..c6c1c59fea37f 100644 --- a/packages/frontend/editor-ui/src/features/execution/executions/components/global/GlobalExecutionsListItem.vue +++ b/packages/frontend/editor-ui/src/features/execution/executions/components/global/GlobalExecutionsListItem.vue @@ -263,7 +263,12 @@ async function handleActionItemClick(commandData: Command) { - + + + + + +