Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |
| POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. |
| USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. |
| NO_CHANNEL_TIMEOUT_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it. Defaults to 10 minutes. |
| EGRESS_TELEMETRY_INTERVAL_IN_MS | number | Time in ms to send telemetry for egress. Defaults to 15 seconds. |

The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).

Expand Down
6 changes: 4 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
egress_telemetry_interval_in_ms = Env.get_integer("EGRESS_TELEMETRY_INTERVAL_IN_MS", :timer.seconds(15))
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))

if !(db_version in [nil, "ipv6", "ipv4"]),
Expand Down Expand Up @@ -122,13 +123,14 @@ config :realtime,
tenant_cache_expiration: tenant_cache_expiration,
rpc_timeout: rpc_timeout,
max_gen_rpc_clients: max_gen_rpc_clients,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size,
users_scope_shards: users_scope_shards,
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
regional_broadcasting: regional_broadcasting
regional_broadcasting: regional_broadcasting,
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms,
no_channel_timeout_in_ms: no_channel_timeout_in_ms

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
5 changes: 4 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Realtime.Application do
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms)
egress_telemetry_interval_in_ms = Application.get_env(:realtime, :egress_telemetry_interval_in_ms)

children =
[
Expand Down Expand Up @@ -94,7 +95,9 @@ defmodule Realtime.Application do
strategy: :one_for_one,
name: Connect.DynamicSupervisor,
partitions: connect_partition_slots},
{RealtimeWeb.RealtimeChannel.Tracker, check_interval_in_ms: no_channel_timeout_in_ms},
{RealtimeWeb.RealtimeChannel.Tracker,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence
] ++ extensions_supervisors() ++ janitor_tasks()
Expand Down
18 changes: 17 additions & 1 deletion lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,26 @@ defmodule Realtime.PromEx.Plugins.Tenant do
channel_events(),
replication_metrics(),
subscription_metrics(),
payload_size_metrics()
payload_size_metrics(),
output_bytes_metrics()
]
end

defp output_bytes_metrics do
Event.build(
:realtime_connections_output_bytes_metrics,
[
last_value(
[:realtime, :connections, :output_bytes],
event_name: [:realtime, :connections, :output_bytes],
description: "The total count of output bytes for a tenant.",
measurement: :output_bytes,
tags: [:tenant]
)
]
)
end

defp payload_size_metrics do
Event.build(
:realtime_tenant_payload_size_metrics,
Expand Down
16 changes: 14 additions & 2 deletions lib/realtime/telemetry/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule Realtime.Telemetry.Logger do
[:realtime, :rate_counter, :channel, :events],
[:realtime, :rate_counter, :channel, :joins],
[:realtime, :rate_counter, :channel, :db_events],
[:realtime, :rate_counter, :channel, :presence_events]
[:realtime, :rate_counter, :channel, :presence_events],
[:realtime, :connections, :output_bytes]
]

def start_link(args) do
Expand All @@ -29,13 +30,24 @@ defmodule Realtime.Telemetry.Logger do
Logs billing metrics for a tenant aggregated and emitted by a PromEx metric poller.
"""

def handle_event([:realtime, :connections, :output_bytes], %{output_bytes: output_bytes}, %{tenant: tenant}, _config) do
meta = %{project: tenant}

Logger.info(
["Billing metrics: [:realtime, :connections, :output_bytes] output_bytes=" <> inspect(output_bytes)],
meta
)

:ok
end

def handle_event(event, measurements, %{tenant: tenant}, _config) do
meta = %{project: tenant, measurements: measurements}
Logger.info(["Billing metrics: ", inspect(event)], meta)
:ok
end

def handle_event(_event, _measurements, _metadata, _config) do
def handle_event(_, _, _, _config) do
:ok
end

Expand Down
6 changes: 3 additions & 3 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule RealtimeWeb.RealtimeChannel do

Process.flag(:max_heap_size, max_heap_size())
Process.flag(:fullsweep_after, @fullsweep_after)
Tracker.track(socket.transport_pid)
Tracker.track(socket.transport_pid, tenant_id)
Logger.metadata(external_id: tenant_id, project: tenant_id)
Logger.put_process_level(self(), log_level)

Expand Down Expand Up @@ -499,10 +499,10 @@ defmodule RealtimeWeb.RealtimeChannel do
end

@impl true
def terminate(reason, %{transport_pid: transport_pid}) do
def terminate(reason, %{transport_pid: transport_pid, assigns: %{tenant: tenant_id}}) do
Logger.debug("Channel terminated with reason: #{reason}")
:telemetry.execute([:prom_ex, :plugin, :realtime, :disconnected], %{})
Tracker.untrack(transport_pid)
Tracker.untrack(transport_pid, tenant_id)
:ok
end

Expand Down
112 changes: 97 additions & 15 deletions lib/realtime_web/channels/realtime_channel/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,41 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do
"""
use GenServer
require Logger
alias Realtime.Helpers

defstruct [
:no_channel_timeout_in_ms,
:no_channel_timeout_ref,
:egress_telemetry_interval_in_ms,
:egress_telemetry_ref
]

@table :channel_tracker
@zero_count_match [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}]
@zero_count_delete [{{:"$1", :"$2"}, [{:"=<", :"$2", 0}], [true]}]
@zero_count_match [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [:"$1"]}]
@zero_count_delete [{{{:"$1", :_}, :"$2"}, [{:"=<", :"$2", 0}], [true]}]
@egress_telemetry_match [{{{:"$1", :"$2"}, :"$3"}, [{:>, :"$3", 0}], [[:"$1", :"$2"]]}]

@doc """
Tracks a transport pid.
"""
@spec track(pid()) :: integer()
def track(pid), do: :ets.update_counter(@table, pid, 1, {pid, 0})
@spec track(pid(), binary()) :: integer()
def track(pid, tenant_external_id),
do: :ets.update_counter(@table, {pid, tenant_external_id}, 1, {{pid, tenant_external_id}, 0})

@doc """
Un-tracks a transport pid.
"""
@spec untrack(pid()) :: integer()
def untrack(pid), do: :ets.update_counter(@table, pid, -1, {pid, 0})
@spec untrack(pid(), binary()) :: integer()
def untrack(pid, tenant_external_id),
do: :ets.update_counter(@table, {pid, tenant_external_id}, -1, {{pid, tenant_external_id}, 0})

@doc """
Returns the number of channels open for a transport pid.
"""
@spec count(pid()) :: integer()
def count(pid) do
case :ets.lookup(@table, pid) do
[{^pid, count}] -> count
@spec count(pid(), binary()) :: integer()
def count(pid, tenant_external_id) do
case :ets.lookup(@table, {pid, tenant_external_id}) do
[{{^pid, ^tenant_external_id}, count}] -> count
[] -> 0
end
end
Expand All @@ -57,17 +69,54 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do

@impl true
def init(opts) do
check_interval_in_ms = Keyword.fetch!(opts, :check_interval_in_ms)
Process.send_after(self(), :check_channels, check_interval_in_ms)
{:ok, %{check_interval_in_ms: check_interval_in_ms}}
no_channel_timeout_in_ms = Keyword.fetch!(opts, :no_channel_timeout_in_ms)
no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms)
egress_telemetry_interval_in_ms = Keyword.fetch!(opts, :egress_telemetry_interval_in_ms)
egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms)

{:ok,
%{
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
no_channel_timeout_ref: no_channel_timeout_ref,
egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms,
egress_telemetry_ref: egress_telemetry_ref
}}
end

@impl true
def handle_info(:check_channels, state) do
%{no_channel_timeout_ref: no_channel_timeout_ref, no_channel_timeout_in_ms: no_channel_timeout_in_ms} = state
chunked_killing()
:ets.select_delete(@table, @zero_count_delete)
Process.send_after(self(), :check_channels, state.check_interval_in_ms)
{:noreply, state}
Helpers.cancel_timer(no_channel_timeout_ref)
no_channel_timeout_ref = Process.send_after(self(), :check_channels, no_channel_timeout_in_ms)
{:noreply, %{state | no_channel_timeout_ref: no_channel_timeout_ref}}
end

def handle_info(:send_egress_telemetry, state) do
%{egress_telemetry_ref: egress_telemetry_ref, egress_telemetry_interval_in_ms: egress_telemetry_interval_in_ms} =
state

Port.list()
|> Enum.flat_map(fn port ->
case Port.info(port, :links) do
{:links, pids} -> Enum.map(pids, fn pid -> {pid, port} end)
_ -> []
end
end)
|> Enum.group_by(fn {pid, _} -> pid end, fn {_, port} -> port end)
|> collect_egress_telemetry()
|> Enum.each(fn {tenant_external_id, output_bytes} ->
if output_bytes > 0 do
:telemetry.execute([:realtime, :connections, :output_bytes], %{output_bytes: output_bytes}, %{
tenant_external_id: tenant_external_id
})
end
end)

Helpers.cancel_timer(egress_telemetry_ref)
egress_telemetry_ref = Process.send_after(self(), :send_egress_telemetry, egress_telemetry_interval_in_ms)
{:noreply, %{state | egress_telemetry_ref: egress_telemetry_ref}}
end

defp chunked_killing(cont \\ nil) do
Expand All @@ -84,5 +133,38 @@ defmodule RealtimeWeb.RealtimeChannel.Tracker do
end
end

defp collect_egress_telemetry(pid_and_port_list, cont \\ nil, acc \\ %{}) do
result = if cont, do: :ets.select(cont), else: :ets.select(@table, @egress_telemetry_match, 1000)

case result do
:"$end_of_table" ->
acc

{pids, cont} ->
acc =
Enum.reduce(pids, acc, fn [pid, tenant_external_id], acc ->
ports = Map.get(pid_and_port_list, pid, [])

output_bytes =
Enum.sum_by(ports, fn port ->
case :inet.getstat(port, [:send_oct]) do
{:ok, stats} -> stats[:send_oct]
_ -> 0
end
end)

{_, acc} =
Map.get_and_update(acc, tenant_external_id, fn
nil -> {output_bytes, output_bytes}
output_bytes -> {output_bytes, output_bytes + output_bytes}
end)

acc
end)

collect_egress_telemetry(pid_and_port_list, cont, acc)
end
end

def table_name, do: @table
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.61.0",
version: "2.61.2",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2273,7 +2273,7 @@
end
end

test "tracks and untracks properly channels", %{tenant: tenant} do

Check failure on line 2276 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test tracks and untracks properly channels (Realtime.Integration.RtChannelTest)
assert [] = Tracker.list_pids()

{socket, _} = get_connection(tenant)
Expand All @@ -2295,7 +2295,7 @@
assert_receive %Message{topic: ^topic, event: "phx_close"}, 500
end

start_supervised!({Tracker, check_interval_in_ms: 100})
start_supervised!({Tracker, no_channel_timeout_in_ms: 100})
# wait to trigger tracker
assert_process_down(socket, 1000)
assert [] = Tracker.list_pids()
Expand Down
18 changes: 18 additions & 0 deletions test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
%{tenant: external_id}
)
end

def fake_output_bytes(external_id) do
Realtime.Telemetry.execute(
[:realtime, :connections, :output_bytes],
%{output_bytes: 100},
%{tenant: external_id}
)
end
end
end)

Expand Down Expand Up @@ -316,6 +324,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do

assert metric_value(bucket_pattern) > 0
end

test "egress bytes metric exists after check", context do
external_id = context.tenant.external_id
pattern = ~r/realtime_connections_output_bytes{tenant="#{external_id}"}\s(?<number>\d+)/

FakeUserCounter.fake_output_bytes(external_id)

Process.sleep(200)
assert metric_value(pattern) == 100
end
end

defp metric_value(pattern) do
Expand Down
21 changes: 18 additions & 3 deletions test/realtime/telemetry/logger_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,24 @@ defmodule Realtime.Telemetry.LoggerTest do
test "logs on telemetry event" do
start_link_supervised!({TelemetryLogger, handler_id: "telemetry-logger-test"})

assert capture_log(fn ->
:telemetry.execute([:realtime, :connections], %{count: 1}, %{tenant: "tenant"})
end) =~ "Billing metrics: [:realtime, :connections]"
log =
capture_log(fn ->
:telemetry.execute([:realtime, :connections], %{count: 1}, %{tenant: "tenant"})
:telemetry.execute([:realtime, :rate_counter, :channel, :events], %{count: 1}, %{tenant: "tenant"})
:telemetry.execute([:realtime, :rate_counter, :channel, :joins], %{count: 1}, %{tenant: "tenant"})
:telemetry.execute([:realtime, :rate_counter, :channel, :db_events], %{count: 1}, %{tenant: "tenant"})
:telemetry.execute([:realtime, :rate_counter, :channel, :presence_events], %{count: 1}, %{tenant: "tenant"})
:telemetry.execute([:realtime, :connections, :output_bytes], %{output_bytes: 100}, %{tenant: "tenant"})
end)
|> IO.inspect(label: "log")

assert log =~ "project=tenant"
assert log =~ "Billing metrics: [:realtime, :connections]"
assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :events]"
assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :joins]"
assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :db_events]"
assert log =~ "Billing metrics: [:realtime, :rate_counter, :channel, :presence_events]"
assert log =~ "Billing metrics: [:realtime, :connections, :output_bytes] output_bytes=100"
end

test "ignores events without tenant" do
Expand Down
Loading
Loading