-
Notifications
You must be signed in to change notification settings - Fork 499
[docs] binlog virtual table #2565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+128
−54
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,20 +10,14 @@ Virtual tables in Fluss are system-generated tables that provide access to metad | |
|
|
||
| Fluss supports the following virtual table types: | ||
|
|
||
| | Virtual Table | Suffix | Description | | ||
| |---------------|--------|-------------| | ||
| | [Changelog](#changelog-table) | `$changelog` | Provides access to the raw changelog stream with metadata | | ||
|
|
||
| More virtual table types will be added in future releases. | ||
| | Virtual Table | Suffix | Description | Supported Tables | | ||
| |---------------|--------|-----------------------------------------------------------|------------------| | ||
| | [Changelog](#changelog-table) | `$changelog` | Provides access to the raw changelog stream with metadata | Primary Key Tables, Log Tables | | ||
| | [Binlog](#binlog-table) | `$binlog` | Provides binlog format with before/after metadata | Primary Key Tables only | | ||
|
|
||
| ## Changelog Table | ||
|
|
||
| The `$changelog` virtual table provides read-only access to the raw changelog stream of a table, allowing you to audit and process all data changes with their associated metadata. This is useful for: | ||
|
|
||
| - **Change Data Capture (CDC)**: Track all inserts, updates, and deletes | ||
| - **Auditing**: Monitor data modifications with timestamps and offsets | ||
| - **Event Processing**: Build event-driven applications based on data changes | ||
| - **Data Replication**: Replicate changes to downstream systems | ||
| The `$changelog` virtual table provides read-only access to the raw changelog stream of a table, allowing you to audit and process all data changes with their associated metadata. | ||
|
|
||
| ### Accessing the Changelog | ||
|
|
||
|
|
@@ -55,23 +49,21 @@ For Primary Key Tables, the following change types are supported: | |
|
|
||
| | Change Type | Description | | ||
| |-------------|-------------| | ||
| | `+I` | **Insert** - A new row was inserted | | ||
| | `-U` | **Update Before** - The previous value of an updated row (retraction) | | ||
| | `+U` | **Update After** - The new value of an updated row | | ||
| | `-D` | **Delete** - A row was deleted | | ||
| | `insert` | A new row was inserted | | ||
| | `update_before` | The previous value of an updated row (retraction) | | ||
| | `update_after` | The new value of an updated row | | ||
| | `delete` | A row was deleted | | ||
|
|
||
| #### Log Tables | ||
|
|
||
| For Log Tables (append-only), only one change type is used: | ||
|
|
||
| | Change Type | Description | | ||
| |-------------|-------------| | ||
| | `+A` | **Append** - A new row was appended to the log | | ||
| | `insert` | A new row was inserted into the log | | ||
MehulBatra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### Examples | ||
|
|
||
| #### Primary Key Table Changelog | ||
|
|
||
| Consider a Primary Key Table tracking user orders: | ||
|
|
||
| ```sql title="Flink SQL" | ||
|
|
@@ -99,70 +91,152 @@ SELECT * FROM orders$changelog; | |
| Output: | ||
|
|
||
| ``` | ||
| +----+--------------+-------------+---------------------+----------+---------------+---------+ | ||
| | op | _change_type | _log_offset | _commit_timestamp | order_id | customer_name | amount | | ||
| +----+--------------+-------------+---------------------+----------+---------------+---------+ | ||
| | +I | +I | 0 | 2024-01-15 10:30:00 | 1 | Rhea | 100.00 | | ||
| | +I | -U | 1 | 2024-01-15 10:35:00 | 1 | Rhea | 100.00 | | ||
| | +I | +U | 2 | 2024-01-15 10:35:00 | 1 | Rhea | 150.00 | | ||
| | +I | -D | 3 | 2024-01-15 10:40:00 | 1 | Rhea | 150.00 | | ||
| +----+--------------+-------------+---------------------+----------+---------------+---------+ | ||
| +---------------+-------------+---------------------+----------+---------------+---------+ | ||
| | _change_type | _log_offset | _commit_timestamp | order_id | customer_name | amount | | ||
| +---------------+-------------+---------------------+----------+---------------+---------+ | ||
| | insert | 0 | 2024-01-15 10:30:00 | 1 | Rhea | 100.00 | | ||
| | update_before | 1 | 2024-01-15 10:35:00 | 1 | Rhea | 100.00 | | ||
| | update_after | 2 | 2024-01-15 10:35:00 | 1 | Rhea | 150.00 | | ||
| | delete | 3 | 2024-01-15 10:40:00 | 1 | Rhea | 150.00 | | ||
| +---------------+-------------+---------------------+----------+---------------+---------+ | ||
| ``` | ||
|
|
||
|
|
||
| ### Startup Modes | ||
|
|
||
|
|
||
| | Mode | Description | | ||
| |------|-------------| | ||
| | `earliest` | Start reading from the beginning of the log | | ||
| | `latest` | Start reading from the current end of the log (only new changes) | | ||
| | `timestamp` | Start reading from a specific timestamp (milliseconds since epoch) | | ||
|
|
||
|
|
||
| The changelog table supports different startup modes to control where reading begins: | ||
|
|
||
| ```sql title="Flink SQL" | ||
| -- Read from the beginning (default) | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */; | ||
|
|
||
| -- Read only new changes from now | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */; | ||
|
|
||
| -- Read from a specific timestamp | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */; | ||
| ``` | ||
|
|
||
| ### Limitations | ||
|
|
||
| - Projection, partition, and predicate pushdowns are not supported yet. This will be addressed in future releases. | ||
|
|
||
| ## Binlog Table | ||
|
|
||
| The `$binlog` virtual table provides access to change data where each record contains both the before and after images of the row. This is useful for: | ||
|
|
||
| :::note | ||
| The `op` column is Flink's row kind indicator. For changelog virtual tables, all rows are emitted as `+I` (insert) to the downstream, while the actual change type is captured in the `_change_type` column. | ||
| The `$binlog` virtual table is only available for **Primary Key Tables**. | ||
| ::: | ||
|
|
||
| #### Log Table Changelog | ||
| ### Accessing the Binlog | ||
|
|
||
| To access the binlog of a Primary Key Table, append `$binlog` to the table name: | ||
|
|
||
| ```sql title="Flink SQL" | ||
| SELECT * FROM my_pk_table$binlog; | ||
| ``` | ||
|
|
||
| ### Schema | ||
|
|
||
| The binlog virtual table includes three metadata columns followed by nested `before` and `after` row structures: | ||
|
|
||
| | Column | Type | Description | | ||
| |--------|------|-------------| | ||
| | `_change_type` | STRING NOT NULL | The type of change operation: `insert`, `update`, or `delete` | | ||
| | `_log_offset` | BIGINT NOT NULL | The offset position in the log | | ||
| | `_commit_timestamp` | TIMESTAMP_LTZ(3) NOT NULL | The timestamp when the change was committed | | ||
| | `before` | ROW<...> | The row values before the change (NULL for inserts) | | ||
| | `after` | ROW<...> | The row values after the change (NULL for deletes) | | ||
|
|
||
| The `before` and `after` columns are nested ROW types containing all columns from the base table. | ||
|
|
||
| ### Change Types | ||
|
|
||
| | Change Type | Description | `before` | `after` | | ||
| |-------------|-------------|----------|---------| | ||
| | `insert` | A new row was inserted | NULL | Contains new row values | | ||
| | `update` | A row was updated | Contains old row values | Contains new row values | | ||
| | `delete` | A row was deleted | Contains deleted row values | NULL | | ||
|
|
||
| Consider a Log Table storing click events: | ||
| ### Examples | ||
|
|
||
| ```sql title="Flink SQL" | ||
| -- Create a log table (no primary key) | ||
| CREATE TABLE click_events ( | ||
| event_id INT, | ||
| user_id INT, | ||
| event_type STRING | ||
| -- Create a primary key table | ||
| CREATE TABLE users ( | ||
| user_id INT NOT NULL, | ||
| name STRING, | ||
| email STRING, | ||
| PRIMARY KEY (user_id) NOT ENFORCED | ||
| ) WITH ('bucket.num' = '1'); | ||
|
|
||
| -- Append events | ||
| INSERT INTO click_events VALUES (1, 101, 'click'), (2, 102, 'view'); | ||
| -- Insert, update, then delete a record | ||
| INSERT INTO users VALUES (1, 'Alice', '[email protected]'); | ||
| INSERT INTO users VALUES (1, 'Alice Smith', '[email protected]'); | ||
| DELETE FROM users WHERE user_id = 1; | ||
|
|
||
| -- Query the changelog | ||
| SELECT * FROM click_events$changelog; | ||
| -- Query the binlog | ||
| SELECT * FROM users$binlog; | ||
| ``` | ||
|
|
||
| Output: | ||
|
|
||
| ``` | ||
| +----+--------------+-------------+---------------------+----------+---------+------------+ | ||
| | op | _change_type | _log_offset | _commit_timestamp | event_id | user_id | event_type | | ||
| +----+--------------+-------------+---------------------+----------+---------+------------+ | ||
| | +I | +A | 0 | 2024-01-15 11:00:00 | 1 | 101 | click | | ||
| | +I | +A | 1 | 2024-01-15 11:00:00 | 2 | 102 | view | | ||
| +----+--------------+-------------+---------------------+----------+---------+------------+ | ||
| +--------------+-------------+---------------------+----------------------------------+--------------------------------------+ | ||
| | _change_type | _log_offset | _commit_timestamp | before | after | | ||
| +--------------+-------------+---------------------+----------------------------------+--------------------------------------+ | ||
| | insert | 0 | 2024-01-15 10:30:00 | NULL | (1, Alice, [email protected]) | | ||
| | update | 2 | 2024-01-15 10:35:00 | (1, Alice, [email protected]) | (1, Alice Smith, [email protected]) | | ||
| | delete | 3 | 2024-01-15 10:40:00 | (1, Alice Smith, [email protected]) | NULL | | ||
| +--------------+-------------+---------------------+----------------------------------+--------------------------------------+ | ||
| ``` | ||
|
|
||
| ### Startup Modes | ||
| #### Accessing Nested Fields | ||
|
|
||
| The changelog virtual table supports different startup modes to control where reading begins: | ||
| You can access individual fields from the `before` and `after` structures: | ||
|
|
||
| ```sql title="Flink SQL" | ||
| -- Read from the beginning (default) | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest') */; | ||
| SELECT | ||
| _change_type, | ||
| _commit_timestamp, | ||
| `before`.name AS old_name, | ||
| `after`.name AS new_name | ||
| FROM users$binlog | ||
| WHERE _change_type = 'update'; | ||
| ``` | ||
|
|
||
| -- Read only new changes from now | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */; | ||
| ### Startup Modes | ||
MehulBatra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| -- Read from a specific timestamp | ||
| SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */; | ||
| ``` | ||
|
|
||
| | Mode | Description | | ||
| |------|-------------| | ||
| | `earliest` | Start reading from the beginning of the log | | ||
| | `latest` | Start reading from the current end of the log (only new changes) | | ||
| | `timestamp` | Start reading from a specific timestamp (milliseconds since epoch) | | ||
|
|
||
|
|
||
| The binlog table supports different startup modes to control where reading begins: | ||
|
|
||
| ```sql title="Flink SQL" | ||
| -- Read from the beginning (default) | ||
| SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */; | ||
|
|
||
| -- Read only new changes from now | ||
| SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */; | ||
|
|
||
| -- Read from a specific timestamp | ||
| SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1705312200000') */; | ||
| ``` | ||
|
|
||
|
|
||
| ### Limitations | ||
| - Projection & partition & predicate pushdowns are not supported yet. This will be addressed in future releases. | ||
|
|
||
| - Projection, partition, and predicate pushdowns are not supported yet. This will be addressed in future releases. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.