diff --git a/lib/realtime/adapters/postgres/decoder.ex b/lib/realtime/adapters/postgres/decoder.ex index e5ea161e3..086776265 100644 --- a/lib/realtime/adapters/postgres/decoder.ex +++ b/lib/realtime/adapters/postgres/decoder.ex @@ -162,7 +162,6 @@ defmodule Realtime.Adapters.Postgres.Decoder do """ def decode_message(message) when is_binary(message) do - # Logger.debug("Message before conversion " <> message) decode_message_impl(message) end @@ -218,19 +217,13 @@ defmodule Realtime.Adapters.Postgres.Decoder do defp decode_message_impl(<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) - %Insert{ - relation_id: relation_id, - tuple_data: decoded_tuple_data - } + %Insert{relation_id: relation_id, tuple_data: decoded_tuple_data} end defp decode_message_impl(<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) - %Update{ - relation_id: relation_id, - tuple_data: decoded_tuple_data - } + %Update{relation_id: relation_id, tuple_data: decoded_tuple_data} end defp decode_message_impl( @@ -242,10 +235,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do {<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns) - base_update_msg = %Update{ - relation_id: relation_id, - tuple_data: decoded_tuple_data - } + base_update_msg = %Update{relation_id: relation_id, tuple_data: decoded_tuple_data} case key_or_old do "K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data) @@ -259,9 +249,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do when key_or_old == "K" or key_or_old == "O" do {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) - base_delete_msg = %Delete{ - relation_id: relation_id - } + base_delete_msg = %Delete{relation_id: relation_id} case key_or_old do "K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data) @@ -313,19 +301,40 @@ defmodule Realtime.Adapters.Postgres.Decoder do defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator), do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator]) + @start_date "2000-01-01T00:00:00Z" defp decode_tuple_data( - <<"t", column_length::integer-32, rest::binary>>, + <<"b", column_length::integer-32, rest::binary>>, columns_remaining, accumulator - ), - do: - decode_tuple_data( - :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}), - columns_remaining - 1, - [ - :erlang.binary_part(rest, {0, column_length}) | accumulator - ] - ) + ) do + data = :erlang.binary_part(rest, {0, column_length}) + remainder = :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}) + + data = + case data do + <<1>> -> + true + + <<0>> -> + false + + <<1, binary::binary-size(column_length - 1)>> -> + binary + + <> -> + @start_date + |> NaiveDateTime.from_iso8601!() + |> NaiveDateTime.add(microseconds, :microsecond) + + <> -> + UUID.binary_to_string!(uuid_binary) + + data when is_binary(data) -> + data + end + + decode_tuple_data(remainder, columns_remaining - 1, [data | accumulator]) + end defp decode_columns(binary, accumulator \\ []) defp decode_columns(<<>>, accumulator), do: Enum.reverse(accumulator) @@ -345,7 +354,6 @@ defmodule Realtime.Adapters.Postgres.Decoder do name: name, flags: decoded_flags, type: OidDatabase.name_for_type_id(data_type_id), - # type: data_type_id, type_modifier: type_modifier } | accumulator diff --git a/lib/realtime/api/message.ex b/lib/realtime/api/message.ex index 18bbc9a87..1c7bb5b63 100644 --- a/lib/realtime/api/message.ex +++ b/lib/realtime/api/message.ex @@ -9,7 +9,7 @@ defmodule Realtime.Api.Message do @schema_prefix "realtime" @type t :: %__MODULE__{} - + @timestamps_opts [type: :naive_datetime_usec] schema "messages" do field(:topic, :string) field(:extension, Ecto.Enum, values: [:broadcast, :presence]) @@ -37,7 +37,7 @@ defmodule Realtime.Api.Message do end defp put_timestamp(changeset, field) do - changeset |> put_change(field, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)) + put_change(changeset, field, NaiveDateTime.utc_now(:microsecond)) end defp maybe_put_timestamp(changeset, field) do diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex index b7b953ca9..39b11927c 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex @@ -242,10 +242,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do [:realtime, :tenants, :broadcast_from_database, :latency_inserted_at], event_name: [:realtime, :tenants, :broadcast_from_database], measurement: :latency_inserted_at, - unit: :second, + unit: {:microsecond, :millisecond}, description: "Latency of database inserted_at until reaches server to be broadcasted", tags: [:tenant], - reporter_options: [buckets: [1, 2, 5]] + reporter_options: [buckets: [10, 250, 5000]] ), distribution( [:realtime, :tenants, :replay], diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index 43ce5d1d4..cc2860a84 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -57,7 +57,7 @@ defmodule Realtime.Tenants.ReplicationConnection do publication_name: nil, replication_slot_name: nil, output_plugin: "pgoutput", - proto_version: 1, + proto_version: 3, relations: %{}, buffer: [], monitored_pid: nil, @@ -237,7 +237,7 @@ defmodule Realtime.Tenants.ReplicationConnection do ) query = - "START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')" + "START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}', binary 'true')" {:stream, query, [], %{state | step: :streaming}} end @@ -319,8 +319,7 @@ defmodule Realtime.Tenants.ReplicationConnection do payload: Jason.Fragment.new(payload) }, :ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do - inserted_at = NaiveDateTime.from_iso8601!(inserted_at) - latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at) + latency_inserted_at = NaiveDateTime.utc_now(:microsecond) |> NaiveDateTime.diff(inserted_at, :microsecond) Telemetry.execute( [:realtime, :tenants, :broadcast_from_database], @@ -382,7 +381,7 @@ defmodule Realtime.Tenants.ReplicationConnection do |> Enum.zip(columns) |> Map.new(fn {nil, %{name: name}} -> {name, nil} - {value, %{name: name, type: "bool"}} -> {name, value == "t"} + {value, %{name: name, type: "bool"}} -> {name, value} {value, %{name: name}} -> {name, value} end) end diff --git a/mix.exs b/mix.exs index ef593400a..b227db108 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.63.0", + version: "2.63.1", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs index 03f6e3469..7094a92d2 100644 --- a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs @@ -58,7 +58,7 @@ defmodule Realtime.PromEx.Plugins.TenantTest do [:realtime, :tenants, :broadcast_from_database], %{ latency_committed_at: 10, - latency_inserted_at: 1 + latency_inserted_at: 10 }, %{tenant: external_id} ) @@ -274,7 +274,7 @@ defmodule Realtime.PromEx.Plugins.TenantTest do assert metric_value(pattern) == metric_value + 1 bucket_pattern = - ~r/realtime_tenants_broadcast_from_database_latency_inserted_at_bucket{tenant="#{context.tenant.external_id}",le="5"}\s(?\d+)/ + ~r/realtime_tenants_broadcast_from_database_latency_inserted_at_bucket{tenant="#{context.tenant.external_id}",le="10"}\s(?\d+)/ assert metric_value(bucket_pattern) > 0 end diff --git a/test/realtime/postgres_decoder_test.exs b/test/realtime/postgres_decoder_test.exs index 9516e5e9a..8f3eb89ff 100644 --- a/test/realtime/postgres_decoder_test.exs +++ b/test/realtime/postgres_decoder_test.exs @@ -115,7 +115,7 @@ defmodule Realtime.PostgresDecoderTest do describe "data message (TupleData) decoder" do test "decodes insert messages" do assert Decoder.decode_message( - <<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>> + <<73, 0, 0, 96, 0, 78, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48>> ) == %Insert{ relation_id: 24_576, tuple_data: {"baz", "560"} @@ -123,14 +123,14 @@ defmodule Realtime.PostgresDecoderTest do end test "decodes insert messages with null values" do - assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 110, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ + assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 110, 98, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ relation_id: 24_576, tuple_data: {nil, "560"} } end test "decodes insert messages with unchanged toasted values" do - assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 117, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ + assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 117, 98, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{ relation_id: 24_576, tuple_data: {:unchanged_toast, "560"} } @@ -138,7 +138,7 @@ defmodule Realtime.PostgresDecoderTest do test "decodes update messages with default replica identity setting" do assert Decoder.decode_message( - <<85, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, + <<85, 0, 0, 96, 0, 78, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54, 48>> ) == %Update{ relation_id: 24_576, @@ -150,8 +150,8 @@ defmodule Realtime.PostgresDecoderTest do test "decodes update messages with FULL replica identity setting" do assert Decoder.decode_message( - <<85, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48, 78, 0, 2, 116, 0, - 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, 48>> + <<85, 0, 0, 96, 0, 79, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48, 78, 0, 2, 98, 0, 0, + 0, 7, 101, 120, 97, 109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54, 48>> ) == %Update{ relation_id: 24_576, changed_key_tuple_data: nil, @@ -162,8 +162,8 @@ defmodule Realtime.PostgresDecoderTest do test "decodes update messages with USING INDEX replica identity setting" do assert Decoder.decode_message( - <<85, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 110, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, - 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, 48>> + <<85, 0, 0, 96, 0, 75, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 110, 78, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97, + 109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54, 48>> ) == %Update{ relation_id: 24_576, changed_key_tuple_data: {"baz", nil}, @@ -174,7 +174,7 @@ defmodule Realtime.PostgresDecoderTest do test "decodes DELETE messages with USING INDEX replica identity setting" do assert Decoder.decode_message( - <<68, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 110>> + <<68, 0, 0, 96, 0, 75, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 110>> ) == %Delete{ relation_id: 24_576, changed_key_tuple_data: {"example", nil} @@ -183,7 +183,7 @@ defmodule Realtime.PostgresDecoderTest do test "decodes DELETE messages with FULL replica identity setting" do assert Decoder.decode_message( - <<68, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>> + <<68, 0, 0, 96, 0, 79, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48>> ) == %Delete{ relation_id: 24_576, old_tuple_data: {"baz", "560"} diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index b87dc5f23..99dbe998a 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -151,6 +151,88 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do end end + test "starts a handler for the tenant and broadcasts to public channel", %{tenant: tenant, db_conn: db_conn} do + start_link_supervised!( + {ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}}, + restart: :transient + ) + + topic = random_string() + tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, true) + subscribe(tenant_topic, topic) + + total_messages = 5 + # Works with one insert per transaction + for _ <- 1..total_messages do + value = random_string() + + row = + message_fixture(tenant, %{ + "topic" => topic, + "private" => false, + "event" => "INSERT", + "payload" => %{"value" => value} + }) + + assert_receive {:socket_push, :text, data} + message = data |> IO.iodata_to_binary() |> Jason.decode!() + + payload = %{ + "event" => "INSERT", + "meta" => %{"id" => row.id}, + "payload" => %{ + "value" => value + }, + "type" => "broadcast" + } + + assert message == %{"event" => "broadcast", "payload" => payload, "ref" => nil, "topic" => topic} + end + + Process.sleep(500) + # Works with batch inserts + messages = + for _ <- 1..total_messages do + Message.changeset(%Message{}, %{ + "topic" => topic, + "private" => false, + "event" => "INSERT", + "extension" => "broadcast", + "payload" => %{"value" => random_string()} + }) + end + + {:ok, _} = Realtime.Repo.insert_all_entries(db_conn, messages, Message) + + messages_received = + for _ <- 1..total_messages, into: [] do + assert_receive {:socket_push, :text, data} + data |> IO.iodata_to_binary() |> Jason.decode!() + end + + for row <- messages do + assert Enum.count(messages_received, fn message_received -> + value = row |> Map.from_struct() |> get_in([:changes, :payload, "value"]) + + match?( + %{ + "event" => "broadcast", + "payload" => %{ + "event" => "INSERT", + "meta" => %{"id" => _id}, + "payload" => %{ + "value" => ^value + } + }, + "ref" => nil, + "topic" => ^topic + }, + message_received + ) + end) == 1 + end + end + test "monitored pid stopping brings down ReplicationConnection ", %{tenant: tenant} do monitored_pid = spawn(fn -> @@ -435,7 +517,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do "payload" => %{"value" => random_string()} }) - assert_receive {:socket_push, :text, data} + assert_receive {:socket_push, :text, data}, 500 message = data |> IO.iodata_to_binary() |> Jason.decode!() assert %{"event" => "broadcast", "payload" => _, "ref" => nil, "topic" => ^topic} = message @@ -444,8 +526,8 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do %{latency_committed_at: latency_committed_at, latency_inserted_at: latency_inserted_at}, %{tenant: ^external_id}} - assert latency_committed_at - assert latency_inserted_at + assert latency_committed_at > 0 + assert latency_inserted_at > 0 end end diff --git a/test/support/tenant_connection.ex b/test/support/tenant_connection.ex index ce5956b49..6c70cc5f6 100644 --- a/test/support/tenant_connection.ex +++ b/test/support/tenant_connection.ex @@ -9,12 +9,12 @@ defmodule TenantConnection do alias RealtimeWeb.Endpoint def create_message(attrs, conn, opts \\ [mode: :savepoint]) do - channel = Message.changeset(%Message{}, attrs) + message = Message.changeset(%Message{}, attrs) {:ok, result} = Database.transaction(conn, fn transaction_conn -> - with {:ok, %Message{} = channel} <- Repo.insert(transaction_conn, channel, Message, opts) do - channel + with {:ok, %Message{} = message} <- Repo.insert(transaction_conn, message, Message, opts) do + message end end)