Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
elixir 1.18.4
elixir 1.18.4-otp-27
nodejs 18.13.0
erlang 27
62 changes: 35 additions & 27 deletions lib/realtime/adapters/postgres/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ defmodule Realtime.Adapters.Postgres.Decoder do

"""
def decode_message(message) when is_binary(message) do
# Logger.debug("Message before conversion " <> message)
decode_message_impl(message)
end

Expand Down Expand Up @@ -218,19 +217,13 @@ defmodule Realtime.Adapters.Postgres.Decoder do
defp decode_message_impl(<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

%Insert{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
end

defp decode_message_impl(<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

%Update{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
%Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
end

defp decode_message_impl(
Expand All @@ -242,10 +235,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do

{<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns)

base_update_msg = %Update{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
base_update_msg = %Update{relation_id: relation_id, tuple_data: decoded_tuple_data}

case key_or_old do
"K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data)
Expand All @@ -259,9 +249,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do
when key_or_old == "K" or key_or_old == "O" do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

base_delete_msg = %Delete{
relation_id: relation_id
}
base_delete_msg = %Delete{relation_id: relation_id}

case key_or_old do
"K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data)
Expand Down Expand Up @@ -313,19 +301,40 @@ defmodule Realtime.Adapters.Postgres.Decoder do
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator])

@start_date "2000-01-01T00:00:00Z"
defp decode_tuple_data(
<<"t", column_length::integer-32, rest::binary>>,
<<"b", column_length::integer-32, rest::binary>>,
columns_remaining,
accumulator
),
do:
decode_tuple_data(
:erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}),
columns_remaining - 1,
[
:erlang.binary_part(rest, {0, column_length}) | accumulator
]
)
) do
data = :erlang.binary_part(rest, {0, column_length})
remainder = :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)})

data =
case data do
<<1>> ->
true

<<0>> ->
false

<<1, binary::binary-size(column_length - 1)>> ->
binary

<<microseconds::signed-big-64>> ->
@start_date
|> NaiveDateTime.from_iso8601!()
|> NaiveDateTime.add(microseconds, :microsecond)

<<uuid_binary::binary-16>> ->
UUID.binary_to_string!(uuid_binary)

data when is_binary(data) ->
data
end

decode_tuple_data(remainder, columns_remaining - 1, [data | accumulator])
end

defp decode_columns(binary, accumulator \\ [])
defp decode_columns(<<>>, accumulator), do: Enum.reverse(accumulator)
Expand All @@ -345,7 +354,6 @@ defmodule Realtime.Adapters.Postgres.Decoder do
name: name,
flags: decoded_flags,
type: OidDatabase.name_for_type_id(data_type_id),
# type: data_type_id,
type_modifier: type_modifier
}
| accumulator
Expand Down
4 changes: 2 additions & 2 deletions lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Realtime.Api.Message do
@schema_prefix "realtime"

@type t :: %__MODULE__{}

@timestamps_opts [type: :naive_datetime_usec]
schema "messages" do
field(:topic, :string)
field(:extension, Ecto.Enum, values: [:broadcast, :presence])
Expand Down Expand Up @@ -37,7 +37,7 @@ defmodule Realtime.Api.Message do
end

defp put_timestamp(changeset, field) do
changeset |> put_change(field, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second))
put_change(changeset, field, NaiveDateTime.utc_now(:microsecond))
end

defp maybe_put_timestamp(changeset, field) do
Expand Down
4 changes: 2 additions & 2 deletions lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do
[:realtime, :tenants, :broadcast_from_database, :latency_inserted_at],
event_name: [:realtime, :tenants, :broadcast_from_database],
measurement: :latency_inserted_at,
unit: :second,
unit: {:microsecond, :millisecond},
description: "Latency of database inserted_at until reaches server to be broadcasted",
tags: [:tenant],
reporter_options: [buckets: [1, 2, 5]]
reporter_options: [buckets: [10, 250, 5000]]
)
]
)
Expand Down
22 changes: 18 additions & 4 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ defmodule Realtime.Tenants.ReplicationConnection do
* `handler_module` - The module that will handle the data received from the replication stream.
* `metadata` - The metadata to pass to the handler module.

## How did we understood the data tuple format when pgoutput uses binary mode?
We checked the documentation with the help of AI and understood that pgoutput does the following logic:
* When `pgoutput` uses binary mode goes to https://doxygen.postgresql.org/proto_8c_source.html#l00767
* We end up in `OidSendFunctionCall` which gets the write function to call and transform into a binary payload based on OID - https://doxygen.postgresql.org/fmgr_8c.html#ad028dec5dc28e883308be264689f4559
* The functions are referenced in https://doxygen.postgresql.org/dir_19ba549c088f18020d0eadfcd07e87be.html
* It does `<data_tupe>.c` and calls `<data_type>_send` as the function to use
* Jsonb - https://doxygen.postgresql.org/jsonb_8c.html#afa4708e251f3078084e9986a46f228e9
* First byte is JSONB version (1), remainder is the text value
* Timestamp - https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c.html#ab0f80b3ae44bdcc50d0c0abb0c1d7939
* 64 bytes integer with microseconds since `2000-01-01`
* Boolean - https://doxygen.postgresql.org/bool_8c.html#a71cfb4f6c28259924b231dae4781b3fc
* 1 is true, 0 is false
* UUID - https://doxygen.postgresql.org/uuid_8c.html#a3cd205c61c108aaf08d6746c1d321e81
* 16 byte binary ( https://doxygen.postgresql.org/uuid_8h.html#a9692a0205a857ed2cc29558470c2ed77 )
* Text - https://doxygen.postgresql.org/varlena_8c.html#a0ea0ddd71c614e3fbd3571e70c53ed77
* Sends binary data
"""
use Postgrex.ReplicationConnection
use Realtime.Logs
Expand Down Expand Up @@ -236,7 +252,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
)

query =
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}', binary 'true')"

{:stream, query, [], %{state | step: :streaming}}
end
Expand Down Expand Up @@ -318,8 +334,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
payload: Map.put_new(payload, "id", id)
},
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at)
latency_inserted_at = NaiveDateTime.utc_now(:microsecond) |> NaiveDateTime.diff(inserted_at, :microsecond)

Telemetry.execute(
[:realtime, :tenants, :broadcast_from_database],
Expand Down Expand Up @@ -377,7 +392,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
|> Map.new(fn
{nil, %{name: name}} -> {name, nil}
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
{value, %{name: name}} -> {name, value}
end)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.55.0",
version: "2.55.1",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
4 changes: 2 additions & 2 deletions test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
[:realtime, :tenants, :broadcast_from_database],
%{
latency_committed_at: 10,
latency_inserted_at: 1
latency_inserted_at: 10
},
%{tenant: external_id}
)
Expand Down Expand Up @@ -253,7 +253,7 @@ defmodule Realtime.PromEx.Plugins.TenantTest do
assert metric_value(pattern) == metric_value + 1

bucket_pattern =
~r/realtime_tenants_broadcast_from_database_latency_inserted_at_bucket{tenant="#{context.tenant.external_id}",le="5"}\s(?<number>\d+)/
~r/realtime_tenants_broadcast_from_database_latency_inserted_at_bucket{tenant="#{context.tenant.external_id}",le="10"}\s(?<number>\d+)/

assert metric_value(bucket_pattern) > 0
end
Expand Down
20 changes: 10 additions & 10 deletions test/realtime/postgres_decoder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,30 @@ defmodule Realtime.PostgresDecoderTest do
describe "data message (TupleData) decoder" do
test "decodes insert messages" do
assert Decoder.decode_message(
<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>
<<73, 0, 0, 96, 0, 78, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48>>
) == %Insert{
relation_id: 24_576,
tuple_data: {"baz", "560"}
}
end

test "decodes insert messages with null values" do
assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 110, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{
assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 110, 98, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{
relation_id: 24_576,
tuple_data: {nil, "560"}
}
end

test "decodes insert messages with unchanged toasted values" do
assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 117, 116, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{
assert Decoder.decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 117, 98, 0, 0, 0, 3, 53, 54, 48>>) == %Insert{
relation_id: 24_576,
tuple_data: {:unchanged_toast, "560"}
}
end

test "decodes update messages with default replica identity setting" do
assert Decoder.decode_message(
<<85, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54,
<<85, 0, 0, 96, 0, 78, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54,
48>>
) == %Update{
relation_id: 24_576,
Expand All @@ -150,8 +150,8 @@ defmodule Realtime.PostgresDecoderTest do

test "decodes update messages with FULL replica identity setting" do
assert Decoder.decode_message(
<<85, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48, 78, 0, 2, 116, 0,
0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, 48>>
<<85, 0, 0, 96, 0, 79, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48, 78, 0, 2, 98, 0, 0,
0, 7, 101, 120, 97, 109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54, 48>>
) == %Update{
relation_id: 24_576,
changed_key_tuple_data: nil,
Expand All @@ -162,8 +162,8 @@ defmodule Realtime.PostgresDecoderTest do

test "decodes update messages with USING INDEX replica identity setting" do
assert Decoder.decode_message(
<<85, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 110, 78, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97,
109, 112, 108, 101, 116, 0, 0, 0, 3, 53, 54, 48>>
<<85, 0, 0, 96, 0, 75, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 110, 78, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97,
109, 112, 108, 101, 98, 0, 0, 0, 3, 53, 54, 48>>
) == %Update{
relation_id: 24_576,
changed_key_tuple_data: {"baz", nil},
Expand All @@ -174,7 +174,7 @@ defmodule Realtime.PostgresDecoderTest do

test "decodes DELETE messages with USING INDEX replica identity setting" do
assert Decoder.decode_message(
<<68, 0, 0, 96, 0, 75, 0, 2, 116, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 110>>
<<68, 0, 0, 96, 0, 75, 0, 2, 98, 0, 0, 0, 7, 101, 120, 97, 109, 112, 108, 101, 110>>
) == %Delete{
relation_id: 24_576,
changed_key_tuple_data: {"example", nil}
Expand All @@ -183,7 +183,7 @@ defmodule Realtime.PostgresDecoderTest do

test "decodes DELETE messages with FULL replica identity setting" do
assert Decoder.decode_message(
<<68, 0, 0, 96, 0, 79, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>
<<68, 0, 0, 96, 0, 79, 0, 2, 98, 0, 0, 0, 3, 98, 97, 122, 98, 0, 0, 0, 3, 53, 54, 48>>
) == %Delete{
relation_id: 24_576,
old_tuple_data: {"baz", "560"}
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
%{tenant: ^external_id}}

assert latency_committed_at
assert latency_inserted_at
assert latency_inserted_at != 0
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/support/tenant_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ defmodule TenantConnection do
alias RealtimeWeb.Endpoint

def create_message(attrs, conn, opts \\ [mode: :savepoint]) do
channel = Message.changeset(%Message{}, attrs)
message = Message.changeset(%Message{}, attrs)

{:ok, result} =
Database.transaction(conn, fn transaction_conn ->
with {:ok, %Message{} = channel} <- Repo.insert(transaction_conn, channel, Message, opts) do
channel
with {:ok, %Message{} = message} <- Repo.insert(transaction_conn, message, Message, opts) do
message
end
end)

Expand Down
Loading