Skip to content

Commit 7c0b073

Browse files
authored
feat: speed up messages replication by avoiding re-encoding JSON (#1602)
* feat: speed up messages replication by avoiding re-encoding JSON We can use the already JSON encoded payload and pass a Jason.Fragment instead of decoding to then re-encode when the message gets sent to a websocket * fix: include payload id if not defined on realtime.send * chore: increase timeout for socket monitor message
1 parent ced9ca3 commit 7c0b073

File tree

6 files changed

+77
-35
lines changed

6 files changed

+77
-35
lines changed

lib/realtime/tenants/migrations.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ defmodule Realtime.Tenants.Migrations do
7575
SubscriptionIndexBridgingDisabled,
7676
RunSubscriptionIndexBridgingDisabled,
7777
BroadcastSendErrorLogging,
78-
CreateMessagesReplayIndex
78+
CreateMessagesReplayIndex,
79+
BroadcastSendIncludePayloadId
7980
}
8081

8182
@migrations [
@@ -142,7 +143,8 @@ defmodule Realtime.Tenants.Migrations do
142143
{20_250_506_224_012, SubscriptionIndexBridgingDisabled},
143144
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
144145
{20_250_714_121_412, BroadcastSendErrorLogging},
145-
{20_250_905_041_441, CreateMessagesReplayIndex}
146+
{20_250_905_041_441, CreateMessagesReplayIndex},
147+
{20_251_103_001_201, BroadcastSendIncludePayloadId}
146148
]
147149

148150
defstruct [:tenant_external_id, :settings]

lib/realtime/tenants/replication_connection.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
159159

160160
@impl true
161161
def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid} = state) do
162+
Process.flag(:fullsweep_after, 20)
162163
Logger.metadata(external_id: tenant_id, project: tenant_id)
163164
Process.monitor(monitored_pid)
164165

@@ -315,7 +316,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
315316
topic: topic,
316317
event: event,
317318
private: private,
318-
payload: Map.put_new(payload, "id", id)
319+
payload: Jason.Fragment.new(payload)
319320
},
320321
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
321322
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
@@ -381,7 +382,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
381382
|> Enum.zip(columns)
382383
|> Map.new(fn
383384
{nil, %{name: name}} -> {name, nil}
384-
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
385385
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
386386
{value, %{name: name}} -> {name, value}
387387
end)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
defmodule Realtime.Tenants.Migrations.BroadcastSendIncludePayloadId do
2+
@moduledoc false
3+
use Ecto.Migration
4+
5+
# Include ID in the payload if not defined
6+
def change do
7+
execute("""
8+
CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true ) RETURNS void
9+
AS $$
10+
DECLARE
11+
generated_id uuid;
12+
final_payload jsonb;
13+
BEGIN
14+
BEGIN
15+
-- Generate a new UUID for the id
16+
generated_id := gen_random_uuid();
17+
18+
-- Check if payload has an 'id' key, if not, add the generated UUID
19+
IF payload ? 'id' THEN
20+
final_payload := payload;
21+
ELSE
22+
final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));
23+
END IF;
24+
25+
-- Set the topic configuration
26+
EXECUTE format('SET LOCAL realtime.topic TO %L', topic);
27+
28+
-- Attempt to insert the message
29+
INSERT INTO realtime.messages (id, payload, event, topic, private, extension)
30+
VALUES (generated_id, final_payload, event, topic, private, 'broadcast');
31+
EXCEPTION
32+
WHEN OTHERS THEN
33+
-- Capture and notify the error
34+
RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;
35+
END;
36+
END;
37+
$$
38+
LANGUAGE plpgsql;
39+
""")
40+
end
41+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.60.0",
7+
version: "2.61.0",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2589,7 +2589,7 @@ defmodule Realtime.Integration.RtChannelTest do
25892589
Realtime.Tenants.Cache.invalidate_tenant_cache(external_id)
25902590
end
25912591

2592-
defp assert_process_down(pid, timeout \\ 300) do
2592+
defp assert_process_down(pid, timeout \\ 1000) do
25932593
ref = Process.monitor(pid)
25942594
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
25952595
end

test/realtime/tenants/replication_connection_test.exs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
2222
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
2323
name = "supabase_realtime_messages_replication_slot_test"
2424
Postgrex.query(db_conn, "SELECT pg_drop_replication_slot($1)", [name])
25-
Process.exit(db_conn, :normal)
2625

27-
%{tenant: tenant}
26+
%{tenant: tenant, db_conn: db_conn}
2827
end
2928

3029
describe "temporary process" do
@@ -70,7 +69,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
7069
assert {:error, _} = ReplicationConnection.start(tenant, self())
7170
end
7271

73-
test "starts a handler for the tenant and broadcasts", %{tenant: tenant} do
72+
test "starts a handler for the tenant and broadcasts", %{tenant: tenant, db_conn: db_conn} do
7473
start_link_supervised!(
7574
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
7675
restart: :transient
@@ -100,7 +99,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
10099
"event" => "INSERT",
101100
"meta" => %{"id" => row.id},
102101
"payload" => %{
103-
"id" => row.id,
104102
"value" => value
105103
},
106104
"type" => "broadcast"
@@ -122,7 +120,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
122120
})
123121
end
124122

125-
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
126123
{:ok, _} = Realtime.Repo.insert_all_entries(db_conn, messages, Message)
127124

128125
messages_received =
@@ -140,9 +137,8 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
140137
"event" => "broadcast",
141138
"payload" => %{
142139
"event" => "INSERT",
143-
"meta" => %{"id" => id},
140+
"meta" => %{"id" => _id},
144141
"payload" => %{
145-
"id" => id,
146142
"value" => ^value
147143
}
148144
},
@@ -231,7 +227,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
231227
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
232228
end
233229

234-
test "payload without id", %{tenant: tenant} do
230+
test "payload without id", %{tenant: tenant, db_conn: db_conn} do
235231
start_link_supervised!(
236232
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
237233
restart: :transient
@@ -241,15 +237,16 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
241237
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
242238
subscribe(tenant_topic, topic)
243239

244-
fixture =
245-
message_fixture(tenant, %{
246-
"topic" => topic,
247-
"private" => true,
248-
"event" => "INSERT",
249-
"payload" => %{"value" => "something"}
250-
})
240+
value = "something"
241+
event = "INSERT"
242+
243+
Postgrex.query!(
244+
db_conn,
245+
"SELECT realtime.send (json_build_object ('value', $1 :: text)::jsonb, $2 :: text, $3 :: text, TRUE::bool);",
246+
[value, event, topic]
247+
)
251248

252-
fixture_id = fixture.id
249+
{:ok, [%{id: id}]} = Repo.all(db_conn, from(m in Message), Message)
253250

254251
assert_receive {:socket_push, :text, data}, 500
255252
message = data |> IO.iodata_to_binary() |> Jason.decode!()
@@ -258,7 +255,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
258255
"event" => "broadcast",
259256
"payload" => %{
260257
"event" => "INSERT",
261-
"meta" => %{"id" => ^fixture_id},
258+
"meta" => %{"id" => ^id},
262259
"payload" => payload,
263260
"type" => "broadcast"
264261
},
@@ -268,11 +265,11 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
268265

269266
assert payload == %{
270267
"value" => "something",
271-
"id" => fixture_id
268+
"id" => id
272269
}
273270
end
274271

275-
test "payload including id", %{tenant: tenant} do
272+
test "payload including id", %{tenant: tenant, db_conn: db_conn} do
276273
start_link_supervised!(
277274
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
278275
restart: :transient
@@ -282,25 +279,27 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
282279
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
283280
subscribe(tenant_topic, topic)
284281

285-
payload = %{"value" => "something", "id" => "123456"}
282+
id = "123456"
283+
value = "something"
284+
event = "INSERT"
286285

287-
%{id: fixture_id} =
288-
message_fixture(tenant, %{
289-
"topic" => topic,
290-
"private" => true,
291-
"event" => "INSERT",
292-
"payload" => payload
293-
})
286+
Postgrex.query!(
287+
db_conn,
288+
"SELECT realtime.send (json_build_object ('value', $1 :: text, 'id', $2 :: text)::jsonb, $3 :: text, $4 :: text, TRUE::bool);",
289+
[value, id, event, topic]
290+
)
291+
292+
{:ok, [%{id: message_id}]} = Repo.all(db_conn, from(m in Message), Message)
294293

295294
assert_receive {:socket_push, :text, data}, 500
296295
message = data |> IO.iodata_to_binary() |> Jason.decode!()
297296

298297
assert %{
299298
"event" => "broadcast",
300299
"payload" => %{
301-
"meta" => %{"id" => ^fixture_id},
300+
"meta" => %{"id" => ^message_id},
302301
"event" => "INSERT",
303-
"payload" => ^payload,
302+
"payload" => %{"value" => "something", "id" => ^id},
304303
"type" => "broadcast"
305304
},
306305
"ref" => nil,

0 commit comments

Comments
 (0)