Skip to content

Commit ae956e0

Browse files
committed
feat: v2 serializer
1 parent ced9ca3 commit ae956e0

File tree

10 files changed

+1168
-41
lines changed

10 files changed

+1168
-41
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
name: Integration Tests
2+
on:
3+
pull_request:
4+
paths:
5+
- "lib/**"
6+
- "test/**"
7+
- "config/**"
8+
- "priv/**"
9+
- "assets/**"
10+
- "rel/**"
11+
- "mix.exs"
12+
- "Dockerfile"
13+
- "run.sh"
14+
- "docker-compose.test.yml"
15+
16+
push:
17+
branches:
18+
- main
19+
20+
concurrency:
21+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
22+
cancel-in-progress: true
23+
24+
jobs:
25+
tests:
26+
name: Tests
27+
runs-on: blacksmith-8vcpu-ubuntu-2404
28+
29+
steps:
30+
- uses: actions/checkout@v2
31+
- name: Run integration test
32+
run: docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from test-runner
33+

docker-compose.tests.yml

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
services:
2+
# Supabase Realtime service
3+
test_db:
4+
image: supabase/postgres:14.1.0.105
5+
container_name: test-realtime-db
6+
ports:
7+
- "5432:5432"
8+
volumes:
9+
- ./dev/postgres:/docker-entrypoint-initdb.d/
10+
command: postgres -c config_file=/etc/postgresql/postgresql.conf
11+
environment:
12+
POSTGRES_HOST: /var/run/postgresql
13+
POSTGRES_PASSWORD: postgres
14+
healthcheck:
15+
test: ["CMD-SHELL", "pg_isready -U postgres"]
16+
interval: 10s
17+
timeout: 5s
18+
retries: 5
19+
test_tenant_db:
20+
image: supabase/postgres:14.1.0.105
21+
container_name: test-tenant-db
22+
ports:
23+
- "5433:5432"
24+
command: postgres -c config_file=/etc/postgresql/postgresql.conf
25+
environment:
26+
POSTGRES_HOST: /var/run/postgresql
27+
POSTGRES_PASSWORD: postgres
28+
test_realtime:
29+
depends_on:
30+
- test_db
31+
build: .
32+
container_name: test-realtime-server
33+
ports:
34+
- "4000:4000"
35+
extra_hosts:
36+
- "host.docker.internal:host-gateway"
37+
environment:
38+
PORT: 4000
39+
DB_HOST: host.docker.internal
40+
DB_PORT: 5432
41+
DB_USER: postgres
42+
DB_PASSWORD: postgres
43+
DB_NAME: postgres
44+
DB_ENC_KEY: 1234567890123456
45+
DB_AFTER_CONNECT_QUERY: 'SET search_path TO _realtime'
46+
API_JWT_SECRET: super-secret-jwt-token-with-at-least-32-characters-long
47+
SECRET_KEY_BASE: UpNVntn3cDxHJpq99YMc1T1AQgQpc8kfYTuRgBiYa15BLrx8etQoXz3gZv1/u2oq
48+
ERL_AFLAGS: -proto_dist inet_tcp
49+
RLIMIT_NOFILE: 1000000
50+
DNS_NODES: "''"
51+
APP_NAME: realtime
52+
RUN_JANITOR: true
53+
JANITOR_INTERVAL: 60000
54+
LOG_LEVEL: "info"
55+
SEED_SELF_HOST: true
56+
networks:
57+
test-network:
58+
aliases:
59+
- realtime-dev.local
60+
- realtime-dev.localhost
61+
healthcheck:
62+
test: ["CMD", "curl", "-f", "http://localhost:4000/"]
63+
interval: 10s
64+
timeout: 5s
65+
retries: 5
66+
start_period: 30s
67+
68+
# Deno test runner
69+
test-runner:
70+
image: denoland/deno:alpine-2.5.6
71+
container_name: deno-test-runner
72+
depends_on:
73+
test_realtime:
74+
condition: service_healthy
75+
test_db:
76+
condition: service_healthy
77+
volumes:
78+
- ./test/integration/tests.ts:/app/tests.ts:ro
79+
working_dir: /app
80+
command: >
81+
sh -c "
82+
echo 'Running tests...' &&
83+
deno test tests.ts --allow-import --no-check --allow-read --allow-net --trace-leaks --allow-env=WS_NO_BUFFER_UTIL
84+
"
85+
networks:
86+
- test-network
87+
extra_hosts:
88+
- "realtime-dev.localhost:host-gateway"
89+
90+
networks:
91+
test-network:
92+
driver: bridge

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
101101
end
102102

103103
defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do
104-
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}
104+
broadcast = build_broadcast(tenant_topic, payload)
105105

106106
if self_broadcast do
107107
TenantBroadcaster.pubsub_broadcast(
@@ -123,6 +123,20 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
123123
end
124124
end
125125

126+
# Message payload was built by V2 Serializer which was originally UserBroadcast
127+
defp build_broadcast(topic, {user_event, user_payload_encoding, user_payload}) do
128+
%RealtimeWeb.Socket.UserBroadcast{
129+
topic: topic,
130+
user_event: user_event,
131+
user_payload_encoding: user_payload_encoding,
132+
user_payload: user_payload
133+
}
134+
end
135+
136+
defp build_broadcast(topic, payload) do
137+
%Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}
138+
end
139+
126140
defp increment_rate_counter(%{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{write: false}}}} = socket) do
127141
socket
128142
end

lib/realtime_web/channels/realtime_channel/message_dispatcher.ex

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,60 +4,74 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
44
"""
55

66
require Logger
7+
alias Phoenix.Socket.Broadcast
8+
alias RealtimeWeb.Socket.UserBroadcast
79

810
def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do
911
{:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids}
1012
end
1113

14+
@presence_diff "presence_diff"
15+
1216
@doc """
1317
This dispatch function caches encoded messages if fastlane is used
1418
It also sends an :update_rate_counter to the subscriber and it can conditionally log
15-
"""
16-
@spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok
17-
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do
18-
# fastlane_pid is the actual socket transport pid
19-
# This reduce caches the serialization and bypasses the channel process going straight to the
20-
# transport process
21-
22-
message_id = message_id(msg.payload)
2319
20+
fastlane_pid is the actual socket transport pid
21+
"""
22+
@spec dispatch(list, pid, Broadcast.t() | UserBroadcast.t()) :: :ok
23+
def dispatch(subscribers, from, %Broadcast{event: @presence_diff} = msg) do
2424
{_cache, count} =
2525
Enum.reduce(subscribers, {%{}, 0}, fn
2626
{pid, _}, {cache, count} when pid == from ->
2727
{cache, count}
2828

29-
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
29+
{_pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, _replayed_message_ids}},
3030
{cache, count} ->
31-
if already_replayed?(message_id, replayed_message_ids) do
32-
# skip already replayed message
33-
{cache, count}
34-
else
35-
if event != "presence_diff", do: send(pid, :update_rate_counter)
31+
maybe_log(log_level, join_topic, msg, tenant_id)
3632

37-
maybe_log(log_level, join_topic, msg, tenant_id)
38-
39-
cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
40-
{cache, count + 1}
41-
end
33+
cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
34+
{cache, count + 1}
4235

4336
{pid, _}, {cache, count} ->
4437
send(pid, msg)
4538
{cache, count}
4639
end)
4740

4841
tenant_id = tenant_id(subscribers)
49-
increment_presence_counter(tenant_id, event, count)
42+
increment_presence_counter(tenant_id, msg.event, count)
5043

5144
:ok
5245
end
5346

54-
defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
55-
tenant_id
56-
|> Realtime.Tenants.presence_events_per_second_key()
57-
|> Realtime.GenCounter.add(count)
58-
end
47+
def dispatch(subscribers, from, msg) do
48+
message_id = message_id(msg)
5949

60-
defp increment_presence_counter(_tenant_id, _event, _count), do: :ok
50+
_ =
51+
Enum.reduce(subscribers, %{}, fn
52+
{pid, _}, cache when pid == from ->
53+
cache
54+
55+
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
56+
cache ->
57+
if already_replayed?(message_id, replayed_message_ids) do
58+
# skip already replayed message
59+
cache
60+
else
61+
send(pid, :update_rate_counter)
62+
63+
maybe_log(log_level, join_topic, msg, tenant_id)
64+
65+
do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
66+
end
67+
68+
{pid, _}, cache ->
69+
send(pid, msg)
70+
cache
71+
end)
72+
73+
:ok
74+
end
6175

6276
defp maybe_log(:info, join_topic, msg, tenant_id) do
6377
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
@@ -66,30 +80,53 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
6680

6781
defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok
6882

69-
defp message_id(%{"meta" => %{"id" => id}}), do: id
70-
defp message_id(_), do: nil
71-
72-
defp already_replayed?(nil, _replayed_message_ids), do: false
73-
defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id)
74-
7583
defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) do
7684
case cache do
77-
%{^serializer => encoded_msg} ->
85+
%{^serializer => {:ok, encoded_msg}} ->
7886
send(fastlane_pid, encoded_msg)
7987
cache
8088

89+
%{^serializer => {:error, reason}} ->
90+
# FIXME: log once when we try to serialize!
91+
raise reason
92+
cache
93+
8194
%{} ->
8295
# Use the original topic that was joined without the external_id
8396
msg = %{msg | topic: join_topic}
84-
encoded_msg = serializer.fastlane!(msg)
85-
send(fastlane_pid, encoded_msg)
86-
Map.put(cache, serializer, encoded_msg)
97+
98+
result =
99+
with {:ok, encoded_msg} <- fastlane!(serializer, msg) do
100+
send(fastlane_pid, encoded_msg)
101+
{:ok, encoded_msg}
102+
end
103+
104+
Map.put(cache, serializer, result)
87105
end
88106
end
89107

90-
defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do
91-
tenant_id
108+
defp fastlane!(Phoenix.Socket.V1.JSONSerializer = serializer, %UserBroadcast{} = msg) do
109+
with {:ok, msg} <- UserBroadcast.convert_to_json_broadcast(msg) do
110+
{:ok, serializer.fastlane!(msg)}
111+
end
92112
end
93113

114+
defp fastlane!(serializer, msg), do: {:ok, serializer.fastlane!(msg)}
115+
116+
defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]), do: tenant_id
94117
defp tenant_id(_), do: nil
118+
119+
defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
120+
tenant_id
121+
|> Realtime.Tenants.presence_events_per_second_key()
122+
|> Realtime.GenCounter.add(count)
123+
end
124+
125+
defp increment_presence_counter(_tenant_id, _event, _count), do: :ok
126+
127+
defp message_id(%Broadcast{payload: %{"meta" => %{"id" => id}}}), do: id
128+
defp message_id(_), do: nil
129+
130+
defp already_replayed?(nil, _replayed_message_ids), do: false
131+
defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id)
95132
end

lib/realtime_web/endpoint.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ defmodule RealtimeWeb.Endpoint do
2525
# the expense of potentially higher memory being used.
2626
active_n: 100,
2727
# Skip validating UTF8 for faster frame processing.
28-
# Currently all text frames as handled only with JSON which already requires UTF-8
28+
# Currently all text frames are handled only with JSON which already requires UTF-8
2929
validate_utf8: false,
3030
serializer: [
3131
{Phoenix.Socket.V1.JSONSerializer, "~> 1.0.0"},
32-
{Phoenix.Socket.V2.JSONSerializer, "~> 2.0.0"}
32+
{RealtimeWeb.Socket.V2Serializer, "~> 2.0.0"}
3333
]
3434
],
3535
longpoll: [

lib/realtime_web/plugs/auth_tenant.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ defmodule RealtimeWeb.AuthTenant do
4242
[] ->
4343
nil
4444

45+
[""] ->
46+
nil
47+
4548
[value | _] ->
4649
[bearer, token] = value |> String.split(" ")
4750
bearer = String.downcase(bearer)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
defmodule RealtimeWeb.Socket.UserBroadcast do
2+
@moduledoc """
3+
Defines a message sent from pubsub to channels and vice-versa.
4+
5+
The message format requires the following keys:
6+
7+
* `:topic` - The string topic or topic:subtopic pair namespace, for example "messages", "messages:123"
8+
* `:user_event`- The string user event name, for example "my-event"
9+
* `:user_payload_encoding`- :json or :binary
10+
* `:user_payload` - The actual message payload
11+
12+
Optionally metadata which is a map to be JSON encoded
13+
"""
14+
15+
alias Phoenix.Socket.Broadcast
16+
17+
@type t :: %__MODULE__{}
18+
defstruct topic: nil, user_event: nil, user_payload: nil, user_payload_encoding: nil, metadata: nil
19+
20+
@spec convert_to_json_broadcast(t) :: {:ok, %Broadcast{}} | {:error, String.t()}
21+
def convert_to_json_broadcast(user_broadcast = %__MODULE__{user_payload_encoding: :json}) do
22+
payload = %{
23+
"event" => user_broadcast.user_event,
24+
"payload" => Jason.Fragment.new(user_broadcast.user_payload),
25+
"type" => "broadcast"
26+
}
27+
28+
payload =
29+
if user_broadcast.metadata do
30+
Map.put(payload, "meta", user_broadcast.metadata)
31+
else
32+
payload
33+
end
34+
35+
{:ok, %Broadcast{event: "broadcast", payload: payload, topic: user_broadcast.topic}}
36+
end
37+
38+
def convert_to_json_broadcast(%__MODULE__{}), do: {:error, "User payload encoding is not JSON"}
39+
end

0 commit comments

Comments
 (0)