Skip to content

Commit dd2e734

Browse files
committed
fix: egress measurement
1 parent 7c0b073 commit dd2e734

File tree

9 files changed

+251
-52
lines changed

9 files changed

+251
-52
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
194194
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |
195195
| POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. |
196196
| USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. |
197+
| NO_CHANNEL_TIMEOUT_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it. Defaults to 10 minutes. |
198+
| EGRESS_TELEMETRY_INTERVAL_IN_MS | number | Time in ms to send telemetry for egress. Defaults to 15 seconds. |
197199

198200
The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).
199201

config/runtime.exs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_
7373
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
7474
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
7575
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
76+
egress_telemetry_interval_in_ms = Env.get_integer("EGRESS_TELEMETRY_INTERVAL_IN_MS", :timer.seconds(15))
7677
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))
7778

7879
if !(db_version in [nil, "ipv6", "ipv4"]),
@@ -122,13 +123,14 @@ config :realtime,
122123
tenant_cache_expiration: tenant_cache_expiration,
123124
rpc_timeout: rpc_timeout,
124125
max_gen_rpc_clients: max_gen_rpc_clients,
125-
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
126126
platform: platform,
127127
pubsub_adapter: pubsub_adapter,
128128
broadcast_pool_size: broadcast_pool_size,
129129
users_scope_shards: users_scope_shards,
130130
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
131-
regional_broadcasting: regional_broadcasting
131+
regional_broadcasting: regional_broadcasting,
132+
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms,
133+
no_channel_timeout_in_ms: no_channel_timeout_in_ms
132134

133135
if config_env() != :test && run_janitor? do
134136
config :realtime,

lib/realtime/application.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ defmodule Realtime.Application do
5555
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
5656
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
5757
no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms)
58+
egress_telemetry_interval_in_ms = Application.get_env(:realtime, :egress_telemetry_interval_in_ms)
5859

5960
children =
6061
[
@@ -94,7 +95,9 @@ defmodule Realtime.Application do
9495
strategy: :one_for_one,
9596
name: Connect.DynamicSupervisor,
9697
partitions: connect_partition_slots},
97-
{RealtimeWeb.RealtimeChannel.Tracker, check_interval_in_ms: no_channel_timeout_in_ms},
98+
{RealtimeWeb.RealtimeChannel.Tracker,
99+
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
100+
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms},
98101
RealtimeWeb.Endpoint,
99102
RealtimeWeb.Presence
100103
] ++ extensions_supervisors() ++ janitor_tasks()

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,26 @@ defmodule Realtime.PromEx.Plugins.Tenant do
2323
channel_events(),
2424
replication_metrics(),
2525
subscription_metrics(),
26-
payload_size_metrics()
26+
payload_size_metrics(),
27+
output_bytes_metrics()
2728
]
2829
end
2930

31+
defp output_bytes_metrics do
32+
Event.build(
33+
:realtime_connections_output_bytes_metrics,
34+
[
35+
last_value(
36+
[:realtime, :connections, :output_bytes],
37+
event_name: [:realtime, :connections, :output_bytes],
38+
description: "The total count of output bytes for a tenant.",
39+
measurement: :output_bytes,
40+
tags: [:tenant]
41+
)
42+
]
43+
)
44+
end
45+
3046
defp payload_size_metrics do
3147
Event.build(
3248
:realtime_tenant_payload_size_metrics,

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule RealtimeWeb.RealtimeChannel do
4545

4646
Process.flag(:max_heap_size, max_heap_size())
4747
Process.flag(:fullsweep_after, @fullsweep_after)
48-
Tracker.track(socket.transport_pid)
48+
Tracker.track(socket.transport_pid, tenant_id)
4949
Logger.metadata(external_id: tenant_id, project: tenant_id)
5050
Logger.put_process_level(self(), log_level)
5151

@@ -499,10 +499,10 @@ defmodule RealtimeWeb.RealtimeChannel do
499499
end
500500

501501
@impl true
502-
def terminate(reason, %{transport_pid: transport_pid}) do
502+
def terminate(reason, %{transport_pid: transport_pid, assigns: %{tenant: tenant_id}}) do
503503
Logger.debug("Channel terminated with reason: #{reason}")
504504
:telemetry.execute([:prom_ex, :plugin, :realtime, :disconnected], %{})
505-
Tracker.untrack(transport_pid)
505+
Tracker.untrack(transport_pid, tenant_id)
506506
:ok
507507
end
508508

lib/realtime_web/channels/realtime_channel/tracker.ex

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,41 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do
88
"""
99
use GenServer
1010
require Logger
11+
alias Realtime.Helpers
12+
13+
defstruct [
14+
:no_channel_timeout_in_ms,
15+
:no_channel_timeout_ref,
16+
:egress_telemetry_interval_in_ms,
17+
:egress_telemetry_ref
18+
]
1119

1220
@table :channel_tracker
13-
@zero_count_match [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}]
14-
@zero_count_delete [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [true]}]
21+
@zero_count_match [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}]
22+
@zero_count_delete [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [true]}]
23+
@egress_telemetry_match [{{{:"$1", :"$2"}, :"$3"}, [{:>, :"$3", 0}], [[:"$1", :"$2"]]}]
24+
1525
@doc """
1626
Tracks a transport pid.
1727
"""
18-
@spec track(pid()) :: integer()
19-
def track(pid), do: :ets.update_counter(@table, pid, 1, {pid, 0})
28+
@spec track(pid(), binary()) :: integer()
29+
def track(pid, tenant_external_id),
30+
do: :ets.update_counter(@table, {pid, tenant_external_id}, 1, {{pid, tenant_external_id}, 0})
2031

2132
@doc """
2233
Un-tracks a transport pid.
2334
"""
24-
@spec untrack(pid()) :: integer()
25-
def untrack(pid), do: :ets.update_counter(@table, pid, -1, {pid, 0})
35+
@spec untrack(pid(), binary()) :: integer()
36+
def untrack(pid, tenant_external_id),
37+
do: :ets.update_counter(@table, {pid, tenant_external_id}, -1, {{pid, tenant_external_id}, 0})
2638

2739
@doc """
2840
Returns the number of channels open for a transport pid.
2941
"""
30-
@spec count(pid()) :: integer()
31-
def count(pid) do
32-
case :ets.lookup(@table, pid) do
33-
[{^pid, count}] -> count
42+
@spec count(pid(), binary()) :: integer()
43+
def count(pid, tenant_external_id) do
44+
case :ets.lookup(@table, {pid, tenant_external_id}) do
45+
[{{^pid, ^tenant_external_id}, count}] -> count
3446
[] -> 0
3547
end
3648
end
@@ -57,17 +69,54 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do
5769

5870
@impl true
5971
def init(opts) do
60-
check_interval_in_ms = Keyword.fetch!(opts, :check_interval_in_ms)
61-
Process.send_after(self(), :check_channels, check_interval_in_ms)
62-
{:ok, %{check_interval_in_ms: check_interval_in_ms}}
72+
no_channel_timeout_in_ms = Keyword.fetch!(opts, :no_channel_timeout_in_ms)
73+
no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms)
74+
egress_telemetry_interval_in_ms = Keyword.fetch!(opts, :egress_telemetry_interval_in_ms)
75+
egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms)
76+
77+
{:ok,
78+
%{
79+
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
80+
no_channel_timeout_ref: no_channel_timeout_ref,
81+
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms,
82+
egress_telemetry_ref: egress_telemetry_ref
83+
}}
6384
end
6485

6586
@impl true
6687
def handle_info(:check_channels, state) do
88+
%{no_channel_timeout_ref: no_channel_timeout_ref, no_channel_timeout_in_ms: no_channel_timeout_in_ms} = state
6789
chunked_killing()
6890
:ets.select_delete(@table, @zero_count_delete)
69-
Process.send_after(self(), :check_channels, state.check_interval_in_ms)
70-
{:noreply, state}
91+
Helpers.cancel_timer(no_channel_timeout_ref)
92+
no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms)
93+
{:noreply, %{state | no_channel_timeout_ref: no_channel_timeout_ref}}
94+
end
95+
96+
def handle_info(:send_egress_telemetry, state) do
97+
%{egress_telemetry_ref: egress_telemetry_ref, egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms} =
98+
state
99+
100+
Port.list()
101+
|> Enum.flat_map(fn port ->
102+
case Port.info(port, :links) do
103+
{:links, pids} -> Enum.map(pids, fn pid -> {pid, port} end)
104+
_ -> []
105+
end
106+
end)
107+
|> Enum.group_by(fn {pid, _} -> pid end, fn {_, port} -> port end)
108+
|> collect_egress_telemetry()
109+
|> Enum.each(fn {tenant_external_id, output_bytes} ->
110+
if output_bytes > 0 do
111+
:telemetry.execute([:realtime, :connections, :output_bytes], %{output_bytes: output_bytes}, %{
112+
tenant_external_id: tenant_external_id
113+
})
114+
end
115+
end)
116+
117+
Helpers.cancel_timer(egress_telemetry_ref)
118+
egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms)
119+
{:noreply, %{state | egress_telemetry_ref: egress_telemetry_ref}}
71120
end
72121

73122
defp chunked_killing(cont \\ nil) do
@@ -84,5 +133,38 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do
84133
end
85134
end
86135

136+
defp collect_egress_telemetry(pid_and_port_list, cont \\ nil, acc \\ %{}) do
137+
result = if cont, do: :ets.select(cont), else: :ets.select(@table, @egress_telemetry_match, 1000)
138+
139+
case result do
140+
:"$end_of_table" ->
141+
acc
142+
143+
{pids, cont} ->
144+
acc =
145+
Enum.reduce(pids, acc, fn [pid, tenant_external_id], acc ->
146+
ports = Map.get(pid_and_port_list, pid, [])
147+
148+
output_bytes =
149+
Enum.sum_by(ports, fn port ->
150+
case :inet.getstat(port, [:send_oct]) do
151+
{:ok, stats} -> stats[:send_oct]
152+
_ -> 0
153+
end
154+
end)
155+
156+
{_, acc} =
157+
Map.get_and_update(acc, tenant_external_id, fn
158+
nil -> {output_bytes, output_bytes}
159+
output_bytes -> {output_bytes, output_bytes + output_bytes}
160+
end)
161+
162+
acc
163+
end)
164+
165+
collect_egress_telemetry(pid_and_port_list, cont, acc)
166+
end
167+
end
168+
87169
def table_name, do: @table
88170
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.61.0",
7+
version: "2.61.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/monitoring/prom_ex/plugins/tenant_test.exs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
6363
%{tenant: external_id}
6464
)
6565
end
66+
67+
def fake_output_bytes(external_id) do
68+
Realtime.Telemetry.execute(
69+
[:realtime, :connections, :output_bytes],
70+
%{output_bytes: 100},
71+
%{tenant: external_id}
72+
)
73+
end
6674
end
6775
end)
6876

@@ -316,6 +324,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
316324

317325
assert metric_value(bucket_pattern) > 0
318326
end
327+
328+
test "egress bytes metric exists after check", context do
329+
external_id = context.tenant.external_id
330+
pattern = ~r/realtime_connections_output_bytes{tenant="#{external_id}"}\s(?<number>\d+)/
331+
332+
FakeUserCounter.fake_output_bytes(external_id)
333+
334+
Process.sleep(200)
335+
assert metric_value(pattern) == 100
336+
end
319337
end
320338

321339
defp metric_value(pattern) do

0 commit comments

Comments
 (0)