Skip to content

Commit 9d219dc

Browse files
authored
Merge pull request #875 from hchen2020/master
twilio stream
2 parents b3d1bba + bbc2325 commit 9d219dc

35 files changed

+1110
-24
lines changed

src/Infrastructure/BotSharp.Abstraction/Conversations/Enums/MessageTypeName.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ public static class MessageTypeName
44
{
55
public const string Plain = "plain";
66
public const string Notification = "notification";
7+
public const string FunctionCall = "function";
8+
public const string Audio = "audio";
79
}

src/Infrastructure/BotSharp.Abstraction/Infrastructures/Enums/StateConst.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ public class StateConst
1010
public const string AGENT_REDIRECTION_REASON = "agent_redirection_reason";
1111

1212
public const string LANGUAGE = "language";
13+
14+
public const string SUB_CONVERSATION_ID = "sub_conversation_id";
1315
}

src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,25 @@ namespace BotSharp.Abstraction.MLTasks;
55
public interface IRealTimeCompletion
66
{
77
string Provider { get; }
8+
string Model { get; }
89

910
void SetModelName(string model);
1011

12+
Task Connect(RealtimeHubConnection conn,
13+
Action onModelReady,
14+
Action<string> onModelAudioDeltaReceived,
15+
Action onModelAudioResponseDone,
16+
Action<string> onAudioTranscriptDone,
17+
Action<string> onModelResponseDone,
18+
Action onUserInterrupted);
19+
Task AppenAudioBuffer(string message);
20+
21+
Task SendEventToModel(object message);
22+
Task Disconnect();
23+
1124
Task<RealtimeSession> CreateSession(Agent agent, List<RoleDialogModel> conversations);
25+
Task<string> UpdateInitialSession(RealtimeHubConnection conn);
26+
Task<string> InsertConversationItem(RoleDialogModel message);
27+
Task TriggerModelInference(string? instructions = null);
28+
Task<List<RoleDialogModel>> OnResponsedDone(RealtimeHubConnection conn, string response);
1229
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using BotSharp.Abstraction.Realtime.Models;
2+
using System.Net.WebSockets;
3+
4+
namespace BotSharp.Abstraction.Realtime;
5+
6+
/// <summary>
7+
/// Realtime hub interface. Manage the WebSocket connection include User, Agent and Model.
8+
/// </summary>
9+
public interface IRealtimeHub
10+
{
11+
Task Listen(WebSocket userWebSocket, Func<string, RealtimeHubConnection> onUserMessageReceived);
12+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace BotSharp.Abstraction.Realtime.Models;
2+
3+
public class RealtimeHubConnection
4+
{
5+
public string Event { get; set; } = null!;
6+
public string StreamId { get; set; } = null!;
7+
public string ConversationId { get; set; } = null!;
8+
public string Data { get; set; } = string.Empty;
9+
public string Model { get; set; } = null!;
10+
public Func<string, object> OnModelMessageReceived { get; set; } = null!;
11+
public Func<object> OnModelAudioResponseDone { get; set; } = null!;
12+
public Func<object> OnModelUserInterrupted { get; set; } = null!;
13+
}

src/Infrastructure/BotSharp.Core/BotSharpCoreExtensions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
using BotSharp.Abstraction.Templating;
1616
using BotSharp.Core.Templating;
1717
using BotSharp.Abstraction.Infrastructures.Enums;
18+
using BotSharp.Abstraction.Realtime;
19+
using BotSharp.Core.Realtime;
1820

1921
namespace BotSharp.Core;
2022

@@ -171,5 +173,7 @@ public static void RegisterPlugins(IServiceCollection services, IConfiguration c
171173
});
172174

173175
services.AddSingleton(loader);
176+
177+
services.AddScoped<IRealtimeHub, RealtimeHub>();
174178
}
175179
}

src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public async Task<Conversation> NewConversation(Conversation sess)
107107
record.Id = sess.Id.IfNullOrEmptyAs(Guid.NewGuid().ToString());
108108
record.UserId = sess.UserId.IfNullOrEmptyAs(foundUserId);
109109
record.Tags = sess.Tags;
110-
record.Title = "New Conversation";
110+
record.Title = string.IsNullOrEmpty(record.Title) ? "New Conversation" : record.Title;
111111

112112
db.CreateNewConversation(record);
113113

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
using BotSharp.Abstraction.Realtime;
2+
using System.Net.WebSockets;
3+
using System;
4+
using BotSharp.Abstraction.Realtime.Models;
5+
using BotSharp.Abstraction.MLTasks;
6+
using BotSharp.Abstraction.Agents.Models;
7+
8+
namespace BotSharp.Core.Realtime;
9+
10+
public class RealtimeHub : IRealtimeHub
11+
{
12+
private readonly IServiceProvider _services;
13+
private readonly ILogger _logger;
14+
public RealtimeHub(IServiceProvider services, ILogger<RealtimeHub> logger)
15+
{
16+
_services = services;
17+
_logger = logger;
18+
}
19+
20+
public async Task Listen(WebSocket userWebSocket,
21+
Func<string, RealtimeHubConnection> onUserMessageReceived)
22+
{
23+
var buffer = new byte[1024 * 4];
24+
WebSocketReceiveResult result;
25+
26+
var llmProviderService = _services.GetRequiredService<ILlmProviderService>();
27+
var model = llmProviderService.GetProviderModel("openai", "gpt-4",
28+
realTime: true).Name;
29+
30+
var completer = _services.GetServices<IRealTimeCompletion>().First(x => x.Provider == "openai");
31+
completer.SetModelName(model);
32+
33+
do
34+
{
35+
result = await userWebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
36+
string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count);
37+
_logger.LogDebug($"Received from user: {receivedText}");
38+
if (string.IsNullOrEmpty(receivedText))
39+
{
40+
continue;
41+
}
42+
43+
var conn = onUserMessageReceived(receivedText);
44+
conn.Model = model;
45+
46+
if (conn.Event == "user_connected")
47+
{
48+
await ConnectToModel(completer, userWebSocket, conn);
49+
}
50+
else if (conn.Event == "user_data_received")
51+
{
52+
await completer.AppenAudioBuffer(conn.Data);
53+
}
54+
else if (conn.Event == "user_disconnected")
55+
{
56+
await completer.Disconnect();
57+
}
58+
} while (!result.CloseStatus.HasValue);
59+
60+
await userWebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
61+
}
62+
63+
private async Task ConnectToModel(IRealTimeCompletion completer, WebSocket userWebSocket, RealtimeHubConnection conn)
64+
{
65+
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
66+
var storage = _services.GetRequiredService<IConversationStorage>();
67+
var convService = _services.GetRequiredService<IConversationService>();
68+
convService.SetConversationId(conn.ConversationId, []);
69+
var conversation = await convService.GetConversation(conn.ConversationId);
70+
var agentService = _services.GetRequiredService<IAgentService>();
71+
var agent = await agentService.LoadAgent(conversation.AgentId);
72+
var routing = _services.GetRequiredService<IRoutingService>();
73+
var dialogs = convService.GetDialogHistory();
74+
routing.Context.SetDialogs(dialogs);
75+
76+
await completer.Connect(conn,
77+
onModelReady: async () =>
78+
{
79+
// Control initial session
80+
var data = await completer.UpdateInitialSession(conn);
81+
await completer.SendEventToModel(data);
82+
83+
// Add dialog history
84+
foreach (var item in dialogs)
85+
{
86+
var dialogItem = await completer.InsertConversationItem(item);
87+
await completer.SendEventToModel(data);
88+
}
89+
90+
if (dialogs.LastOrDefault()?.Role == AgentRole.Assistant)
91+
{
92+
await completer.TriggerModelInference($"Rephase your last response:\r\n{dialogs.LastOrDefault()?.Content}");
93+
}
94+
else
95+
{
96+
await completer.TriggerModelInference("Reply based on the conversation context.");
97+
}
98+
},
99+
onModelAudioDeltaReceived: async audioDeltaData =>
100+
{
101+
var data = conn.OnModelMessageReceived(audioDeltaData);
102+
await SendEventToUser(userWebSocket, data);
103+
},
104+
onModelAudioResponseDone: async () =>
105+
{
106+
var data = conn.OnModelAudioResponseDone();
107+
await SendEventToUser(userWebSocket, data);
108+
},
109+
onAudioTranscriptDone: async transcript =>
110+
{
111+
var message = new RoleDialogModel(AgentRole.Assistant, transcript);
112+
113+
// append transcript to conversation
114+
storage.Append(conn.ConversationId, message);
115+
116+
foreach (var hook in hookProvider.HooksOrderByPriority)
117+
{
118+
hook.SetAgent(agent)
119+
.SetConversation(conversation);
120+
121+
if (!string.IsNullOrEmpty(transcript))
122+
{
123+
await hook.OnMessageReceived(message);
124+
}
125+
}
126+
},
127+
onModelResponseDone: async response =>
128+
{
129+
var messages = await completer.OnResponsedDone(conn, response);
130+
foreach (var message in messages)
131+
{
132+
// Invoke function
133+
if (message.FunctionName != null)
134+
{
135+
await routing.InvokeFunction(message.FunctionName, message);
136+
var data = await completer.InsertConversationItem(message);
137+
await completer.SendEventToModel(data);
138+
await completer.TriggerModelInference("Reply based on the function's output.");
139+
}
140+
}
141+
},
142+
onUserInterrupted: async () =>
143+
{
144+
var data = conn.OnModelUserInterrupted();
145+
await SendEventToUser(userWebSocket, data);
146+
});
147+
}
148+
149+
private async Task SendEventToUser(WebSocket webSocket, object message)
150+
{
151+
var data = JsonSerializer.Serialize(message);
152+
var buffer = Encoding.UTF8.GetBytes(data);
153+
await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
154+
}
155+
}

src/Infrastructure/BotSharp.Core/Routing/RoutingService.InvokeFunction.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public async Task<bool> InvokeFunction(string name, RoleDialogModel message)
4949
}
5050

5151
// Set result to original message
52-
message.Role = clonedMessage.Role;
52+
message.Role = AgentRole.Function;
5353
message.PostbackFunctionName = clonedMessage.PostbackFunctionName;
5454
message.CurrentAgentId = clonedMessage.CurrentAgentId;
5555
message.Content = clonedMessage.Content;

src/Infrastructure/BotSharp.OpenAPI/BotSharp.OpenAPI.csproj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<TargetFramework>$(TargetFramework)</TargetFramework>
@@ -47,6 +47,8 @@
4747
</ItemGroup>
4848

4949
<ItemGroup>
50+
<ProjectReference Include="..\..\Plugins\BotSharp.Plugin.OpenAI\BotSharp.Plugin.OpenAI.csproj" />
51+
<ProjectReference Include="..\..\Plugins\BotSharp.Plugin.Twilio\BotSharp.Plugin.Twilio.csproj" />
5052
<ProjectReference Include="..\BotSharp.Core\BotSharp.Core.csproj" />
5153
</ItemGroup>
5254

0 commit comments

Comments
 (0)