From c4875f4ceca15963a5372e7ac7c4666b4e0e2610 Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Thu, 25 Sep 2025 18:47:15 +0200 Subject: [PATCH 1/7] chore: move groups ingestion to the ingestion layer --- posthog/clickhouse/client/connection.py | 1 + .../migrations/0018_group_analytics_schema.py | 4 +- .../migrations/0149_groups_ingestion_layer.py | 17 +++++++++ posthog/clickhouse/table_engines.py | 5 ++- posthog/models/group/sql.py | 38 +++++++++++++------ 5 files changed, 51 insertions(+), 14 deletions(-) create mode 100644 posthog/clickhouse/migrations/0149_groups_ingestion_layer.py diff --git a/posthog/clickhouse/client/connection.py b/posthog/clickhouse/client/connection.py index fff6d012e1dca..d3c872cd8d5cb 100644 --- a/posthog/clickhouse/client/connection.py +++ b/posthog/clickhouse/client/connection.py @@ -37,6 +37,7 @@ class NodeRole(StrEnum): COORDINATOR = "coordinator" DATA = "data" INGESTION_EVENTS = "events" + INGESTION_SMALL = "small" SHUFFLEHOG = "shufflehog" diff --git a/posthog/clickhouse/migrations/0018_group_analytics_schema.py b/posthog/clickhouse/migrations/0018_group_analytics_schema.py index 69e923f8b7989..c35f8ca2d59c8 100644 --- a/posthog/clickhouse/migrations/0018_group_analytics_schema.py +++ b/posthog/clickhouse/migrations/0018_group_analytics_schema.py @@ -1,8 +1,8 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.group.sql import GROUPS_TABLE_MV_SQL, GROUPS_TABLE_SQL, KAFKA_GROUPS_TABLE_SQL +from posthog.models.group.sql import GROUPS_TABLE, GROUPS_TABLE_MV_SQL, GROUPS_TABLE_SQL, KAFKA_GROUPS_TABLE_SQL operations = [ run_sql_with_exceptions(GROUPS_TABLE_SQL()), run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL()), - run_sql_with_exceptions(GROUPS_TABLE_MV_SQL), + run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(target_table=GROUPS_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py b/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py new file mode 100644 index 0000000000000..366721d7949a0 --- /dev/null +++ b/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py @@ -0,0 +1,17 @@ +from posthog.clickhouse.client.connection import NodeRole +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.group.sql import ( + DROP_GROUPS_TABLE_MV_SQL, + DROP_KAFKA_GROUPS_TABLE_SQL, + GROUPS_TABLE_MV_SQL, + GROUPS_WRITABLE_TABLE_SQL, + KAFKA_GROUPS_TABLE_SQL, +) + +operations = [ + run_sql_with_exceptions(DROP_GROUPS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(DROP_KAFKA_GROUPS_TABLE_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), +] diff --git a/posthog/clickhouse/table_engines.py b/posthog/clickhouse/table_engines.py index d007045f1d9ec..a8e3ffcc5b558 100644 --- a/posthog/clickhouse/table_engines.py +++ b/posthog/clickhouse/table_engines.py @@ -80,12 +80,15 @@ class AggregatingMergeTree(MergeTreeEngine): class Distributed: - def __init__(self, data_table: str, sharding_key: str, cluster: str = settings.CLICKHOUSE_CLUSTER): + def __init__(self, data_table: str, sharding_key: Optional[str] = None, cluster: str = settings.CLICKHOUSE_CLUSTER): self.data_table = data_table self.sharding_key = sharding_key self.cluster = cluster def __str__(self): + if not self.sharding_key: + return f"Distributed('{self.cluster}', '{settings.CLICKHOUSE_DATABASE}', '{self.data_table}')" + return ( f"Distributed('{self.cluster}', '{settings.CLICKHOUSE_DATABASE}', '{self.data_table}', {self.sharding_key})" ) diff --git a/posthog/models/group/sql.py b/posthog/models/group/sql.py index bf99d77b0eb01..0461af2b4bec5 100644 --- a/posthog/models/group/sql.py +++ b/posthog/models/group/sql.py @@ -1,14 +1,21 @@ +from posthog import settings from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine -from posthog.clickhouse.table_engines import ReplacingMergeTree +from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree from posthog.kafka_client.topics import KAFKA_GROUPS -from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE +from posthog.settings import CLICKHOUSE_CLUSTER GROUPS_TABLE = "groups" +GROUPS_TABLE_MV = f"{GROUPS_TABLE}_mv" +GROUPS_WRITABLE_TABLE = f"writable_{GROUPS_TABLE}" +KAFKA_GROUPS_TABLE = f"kafka_{GROUPS_TABLE}" DROP_GROUPS_TABLE_SQL = f"DROP TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" -TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" +DROP_GROUPS_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {GROUPS_TABLE_MV}" +DROP_KAFKA_GROUPS_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_GROUPS_TABLE}" + +TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" GROUPS_TABLE_BASE_SQL = """ CREATE TABLE IF NOT EXISTS {table_name} {on_cluster_clause} @@ -30,7 +37,7 @@ def GROUPS_TABLE_ENGINE(): def GROUPS_TABLE_SQL(on_cluster=True): return ( GROUPS_TABLE_BASE_SQL - + """Order By (team_id, group_type_index, group_key) + + """ORDER BY (team_id, group_type_index, group_key) {storage_policy} """ ).format( @@ -51,11 +58,20 @@ def KAFKA_GROUPS_TABLE_SQL(on_cluster=True): ) -# You must include the database here because of a bug in clickhouse -# related to https://github.com/ClickHouse/ClickHouse/issues/10471 -GROUPS_TABLE_MV_SQL = f""" -CREATE MATERIALIZED VIEW IF NOT EXISTS {GROUPS_TABLE}_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}' -TO {CLICKHOUSE_DATABASE}.{GROUPS_TABLE} +def GROUPS_WRITABLE_TABLE_SQL(): + # This is a table used for writing from the ingestion layer. It's not sharded, thus it uses the single shard cluster. + return GROUPS_TABLE_BASE_SQL.format( + table_name="writable_" + GROUPS_TABLE, + on_cluster_clause=ON_CLUSTER_CLAUSE(False), + engine=Distributed(data_table=GROUPS_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER), + extra_fields=KAFKA_COLUMNS, + ) + + +def GROUPS_TABLE_MV_SQL(target_table=GROUPS_WRITABLE_TABLE, on_cluster=True): + return f""" +CREATE MATERIALIZED VIEW IF NOT EXISTS {GROUPS_TABLE_MV} {ON_CLUSTER_CLAUSE(on_cluster)} +TO {target_table} AS SELECT group_type_index, group_key, @@ -64,13 +80,13 @@ def KAFKA_GROUPS_TABLE_SQL(on_cluster=True): group_properties, _timestamp, _offset -FROM {CLICKHOUSE_DATABASE}.kafka_{GROUPS_TABLE} +FROM {KAFKA_GROUPS_TABLE} """ + # { ..., "group_0": 1325 } # To join with events join using $group_{group_type_index} column -TRUNCATE_GROUPS_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS {GROUPS_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" INSERT_GROUP_SQL = """ INSERT INTO groups (group_type_index, group_key, team_id, group_properties, created_at, _timestamp, _offset) SELECT %(group_type_index)s, %(group_key)s, %(team_id)s, %(group_properties)s, %(created_at)s, %(_timestamp)s, 0 From aa76d8fba4b6e93fc94f1de2d43abf95652cae2d Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Thu, 25 Sep 2025 18:51:39 +0200 Subject: [PATCH 2/7] fix: create tables on ingestion cluster --- .../clickhouse/migrations/0149_groups_ingestion_layer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py b/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py index 366721d7949a0..5daf0794fcb95 100644 --- a/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py +++ b/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py @@ -11,7 +11,7 @@ operations = [ run_sql_with_exceptions(DROP_GROUPS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), run_sql_with_exceptions(DROP_KAFKA_GROUPS_TABLE_SQL, node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), ] From 11041f0ebf935cb824b0d3a6e2d16419a66cc415 Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Mon, 29 Sep 2025 11:48:16 +0100 Subject: [PATCH 3/7] chore: move non-sharded table ingestion to ingestion layer --- .../0005_person_replacing_by_version.py | 2 +- posthog/clickhouse/migrations/0004_kafka.py | 3 +- .../migrations/0008_plugin_log_entries.py | 3 +- .../migrations/0009_person_deleted_column.py | 4 +- .../0013_persons_distinct_ids_column.py | 4 +- ...0014_persons_distinct_ids_column_remove.py | 4 +- .../migrations/0022_person_distinct_id2.py | 3 +- .../migrations/0029_person_version_column.py | 4 +- ...d_person_distinct_id_overrides_consumer.py | 8 +- ...0150_non_sharded_tables_ingestion_layer.py | 57 ++++++++ posthog/clickhouse/plugin_log_entries.py | 40 ++++-- posthog/models/person/sql.py | 125 +++++++++++++----- 12 files changed, 202 insertions(+), 55 deletions(-) create mode 100644 posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py diff --git a/posthog/async_migrations/migrations/0005_person_replacing_by_version.py b/posthog/async_migrations/migrations/0005_person_replacing_by_version.py index 00b79a3f9f372..4128ccc723b18 100644 --- a/posthog/async_migrations/migrations/0005_person_replacing_by_version.py +++ b/posthog/async_migrations/migrations/0005_person_replacing_by_version.py @@ -162,7 +162,7 @@ def operations(self): ), AsyncMigrationOperationSQL( database=AnalyticsDBMS.CLICKHOUSE, - sql=PERSONS_TABLE_MV_SQL, + sql=PERSONS_TABLE_MV_SQL(target_table=PERSON_TABLE), rollback=None, ), AsyncMigrationOperation( diff --git a/posthog/clickhouse/migrations/0004_kafka.py b/posthog/clickhouse/migrations/0004_kafka.py index 5243e206bd098..fe4e21a9fe7da 100644 --- a/posthog/clickhouse/migrations/0004_kafka.py +++ b/posthog/clickhouse/migrations/0004_kafka.py @@ -4,6 +4,7 @@ KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL, KAFKA_PERSONS_TABLE_SQL, PERSONS_DISTINCT_ID_TABLE_MV_SQL, + PERSONS_TABLE, PERSONS_TABLE_MV_SQL, ) @@ -27,7 +28,7 @@ operations = [ run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()), run_sql_with_exceptions(KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL()), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)), run_sql_with_exceptions(PERSONS_DISTINCT_ID_TABLE_MV_SQL), run_sql_with_exceptions(WRITABLE_EVENTS_TABLE_SQL()), run_sql_with_exceptions(DISTRIBUTED_EVENTS_TABLE_SQL()), diff --git a/posthog/clickhouse/migrations/0008_plugin_log_entries.py b/posthog/clickhouse/migrations/0008_plugin_log_entries.py index db29f8eb8f3cc..da6eb0bebeded 100644 --- a/posthog/clickhouse/migrations/0008_plugin_log_entries.py +++ b/posthog/clickhouse/migrations/0008_plugin_log_entries.py @@ -1,6 +1,7 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions from posthog.clickhouse.plugin_log_entries import ( KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, + PLUGIN_LOG_ENTRIES_TABLE, PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, PLUGIN_LOG_ENTRIES_TABLE_SQL, ) @@ -8,5 +9,5 @@ operations = [ run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_SQL()), run_sql_with_exceptions(KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL()), - run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL), + run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(target_table=PLUGIN_LOG_ENTRIES_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0009_person_deleted_column.py b/posthog/clickhouse/migrations/0009_person_deleted_column.py index 053504de6f95c..02f97872a290b 100644 --- a/posthog/clickhouse/migrations/0009_person_deleted_column.py +++ b/posthog/clickhouse/migrations/0009_person_deleted_column.py @@ -1,5 +1,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL +from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL from posthog.settings import CLICKHOUSE_CLUSTER operations = [ @@ -9,5 +9,5 @@ f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS is_deleted Int8 DEFAULT 0" ), run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0013_persons_distinct_ids_column.py b/posthog/clickhouse/migrations/0013_persons_distinct_ids_column.py index 036352cb9ce46..9915490c40de1 100644 --- a/posthog/clickhouse/migrations/0013_persons_distinct_ids_column.py +++ b/posthog/clickhouse/migrations/0013_persons_distinct_ids_column.py @@ -1,5 +1,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL +from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL from posthog.settings import CLICKHOUSE_CLUSTER operations = [ @@ -9,5 +9,5 @@ f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS distinct_ids Array(VARCHAR)" ), run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0014_persons_distinct_ids_column_remove.py b/posthog/clickhouse/migrations/0014_persons_distinct_ids_column_remove.py index 0d805b9d47e17..9db62fd7b0991 100644 --- a/posthog/clickhouse/migrations/0014_persons_distinct_ids_column_remove.py +++ b/posthog/clickhouse/migrations/0014_persons_distinct_ids_column_remove.py @@ -1,5 +1,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL +from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL from posthog.settings import CLICKHOUSE_CLUSTER operations = [ @@ -7,5 +7,5 @@ run_sql_with_exceptions(f"DROP TABLE kafka_person ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), run_sql_with_exceptions(f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' DROP COLUMN IF EXISTS distinct_ids"), run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0022_person_distinct_id2.py b/posthog/clickhouse/migrations/0022_person_distinct_id2.py index bb901d7168711..8e17f9c71d928 100644 --- a/posthog/clickhouse/migrations/0022_person_distinct_id2.py +++ b/posthog/clickhouse/migrations/0022_person_distinct_id2.py @@ -2,11 +2,12 @@ from posthog.models.person.sql import ( KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, PERSON_DISTINCT_ID2_MV_SQL, + PERSON_DISTINCT_ID2_TABLE, PERSON_DISTINCT_ID2_TABLE_SQL, ) operations = [ run_sql_with_exceptions(PERSON_DISTINCT_ID2_TABLE_SQL()), run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL()), - run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL), + run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL(target_table=PERSON_DISTINCT_ID2_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0029_person_version_column.py b/posthog/clickhouse/migrations/0029_person_version_column.py index 879df893d591a..4e1239b03f46a 100644 --- a/posthog/clickhouse/migrations/0029_person_version_column.py +++ b/posthog/clickhouse/migrations/0029_person_version_column.py @@ -1,5 +1,5 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL +from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL from posthog.settings import CLICKHOUSE_CLUSTER operations = [ @@ -9,5 +9,5 @@ f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS version UInt64" ), run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py index b5d9c5ef4c695..e858fe711df90 100644 --- a/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py +++ b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py @@ -1,7 +1,11 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.person.sql import KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, PERSON_DISTINCT_ID_OVERRIDES_MV_SQL +from posthog.models.person.sql import ( + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, + PERSON_DISTINCT_ID_OVERRIDES_TABLE, +) operations = [ run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()), - run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL), + run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL(target_table=PERSON_DISTINCT_ID_OVERRIDES_TABLE)), ] diff --git a/posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py b/posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py new file mode 100644 index 0000000000000..80f103f591d79 --- /dev/null +++ b/posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py @@ -0,0 +1,57 @@ +from posthog.clickhouse.client.connection import NodeRole +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.clickhouse.plugin_log_entries import ( + DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, + DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, + KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, + PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, + PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL, +) +from posthog.models.person.sql import ( + DROP_KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, + DROP_KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, + DROP_KAFKA_PERSONS_TABLE_SQL, + DROP_PERSON_DISTINCT_ID2_TABLE_MV_SQL, + DROP_PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV_SQL, + DROP_PERSONS_TABLE_MV_SQL, + KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, + KAFKA_PERSONS_TABLE_SQL, + PERSON_DISTINCT_ID2_MV_SQL, + PERSON_DISTINCT_ID2_WRITABLE_TABLE_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, + PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE_SQL, + PERSONS_TABLE_MV_SQL, + PERSONS_WRITABLE_TABLE_SQL, +) + +operations = [ + run_sql_with_exceptions(DROP_PERSON_DISTINCT_ID2_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(DROP_KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PERSON_DISTINCT_ID2_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions( + KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL] + ), + run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(DROP_PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(DROP_KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions( + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL] + ), + run_sql_with_exceptions( + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL] + ), + run_sql_with_exceptions(DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions( + KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL] + ), + run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(DROP_PERSONS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(DROP_KAFKA_PERSONS_TABLE_SQL, node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), +] diff --git a/posthog/clickhouse/plugin_log_entries.py b/posthog/clickhouse/plugin_log_entries.py index a498e2e16c700..edda3b75958aa 100644 --- a/posthog/clickhouse/plugin_log_entries.py +++ b/posthog/clickhouse/plugin_log_entries.py @@ -1,10 +1,17 @@ +from posthog import settings from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period -from posthog.clickhouse.table_engines import ReplacingMergeTree +from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree from posthog.kafka_client.topics import KAFKA_PLUGIN_LOG_ENTRIES -from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE PLUGIN_LOG_ENTRIES_TABLE = "plugin_log_entries" +PLUGIN_LOG_ENTRIES_TABLE_MV = f"{PLUGIN_LOG_ENTRIES_TABLE}_mv" +PLUGIN_LOG_ENTRIES_WRITABLE_TABLE = f"writable_{PLUGIN_LOG_ENTRIES_TABLE}" +KAFKA_PLUGIN_LOG_ENTRIES_TABLE = f"kafka_{PLUGIN_LOG_ENTRIES_TABLE}" + +DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_PLUGIN_LOG_ENTRIES_TABLE}" +DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {PLUGIN_LOG_ENTRIES_TABLE_MV}" + PLUGIN_LOG_ENTRIES_TTL_WEEKS = 1 PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL = """ @@ -46,16 +53,17 @@ def PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True): def KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True): return PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL.format( - table_name="kafka_" + PLUGIN_LOG_ENTRIES_TABLE, + table_name=KAFKA_PLUGIN_LOG_ENTRIES_TABLE, on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), engine=kafka_engine(topic=KAFKA_PLUGIN_LOG_ENTRIES), extra_fields="", ) -PLUGIN_LOG_ENTRIES_TABLE_MV_SQL = """ -CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' -TO {database}.{table_name} +def PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=True, target_table=PLUGIN_LOG_ENTRIES_WRITABLE_TABLE): + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} {on_cluster_clause} +TO {target_table} AS SELECT id, team_id, @@ -68,12 +76,22 @@ def KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True): instance_id, _timestamp, _offset -FROM {database}.kafka_{table_name} +FROM {kafka_table} """.format( - table_name=PLUGIN_LOG_ENTRIES_TABLE, - cluster=CLICKHOUSE_CLUSTER, - database=CLICKHOUSE_DATABASE, -) + target_table=target_table, + mv_name=PLUGIN_LOG_ENTRIES_TABLE_MV, + on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), + kafka_table=KAFKA_PLUGIN_LOG_ENTRIES_TABLE, + ) + + +def PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL(): + return PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL.format( + table_name=PLUGIN_LOG_ENTRIES_WRITABLE_TABLE, + on_cluster_clause=ON_CLUSTER_CLAUSE(False), + engine=Distributed(data_table=PLUGIN_LOG_ENTRIES_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER), + extra_fields=KAFKA_COLUMNS, + ) INSERT_PLUGIN_LOG_ENTRY_SQL = """ diff --git a/posthog/models/person/sql.py b/posthog/models/person/sql.py index b2b6431576fe8..444739e1b9351 100644 --- a/posthog/models/person/sql.py +++ b/posthog/models/person/sql.py @@ -1,8 +1,9 @@ +from posthog import settings from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE from posthog.clickhouse.indexes import index_by_kafka_timestamp from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, KAFKA_COLUMNS_WITH_PARTITION, STORAGE_POLICY, kafka_engine -from posthog.clickhouse.table_engines import CollapsingMergeTree, ReplacingMergeTree +from posthog.clickhouse.table_engines import CollapsingMergeTree, Distributed, ReplacingMergeTree from posthog.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_DISTINCT_ID, KAFKA_PERSON_UNIQUE_ID from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE @@ -14,6 +15,12 @@ TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS person_distinct_id2 {ON_CLUSTER_CLAUSE()}" PERSONS_TABLE = "person" +PERSONS_TABLE_MV = f"{PERSONS_TABLE}_mv" +PERSONS_WRITABLE_TABLE = f"writable_{PERSONS_TABLE}" +KAFKA_PERSONS_TABLE = f"kafka_{PERSONS_TABLE}" + +DROP_PERSONS_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {PERSONS_TABLE_MV}" +DROP_KAFKA_PERSONS_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_PERSONS_TABLE}" PERSONS_TABLE_BASE_SQL = """ CREATE TABLE IF NOT EXISTS {table_name} {on_cluster_clause} @@ -37,7 +44,7 @@ def PERSONS_TABLE_ENGINE(): def PERSONS_TABLE_SQL(on_cluster=True): return ( PERSONS_TABLE_BASE_SQL - + """Order By (team_id, id) + + """ORDER BY (team_id, id) {storage_policy} """ ).format( @@ -54,18 +61,17 @@ def PERSONS_TABLE_SQL(on_cluster=True): def KAFKA_PERSONS_TABLE_SQL(on_cluster=True): return PERSONS_TABLE_BASE_SQL.format( - table_name="kafka_" + PERSONS_TABLE, + table_name=KAFKA_PERSONS_TABLE, on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), engine=kafka_engine(KAFKA_PERSON), extra_fields="", ) -# You must include the database here because of a bug in clickhouse -# related to https://github.com/ClickHouse/ClickHouse/issues/10471 -PERSONS_TABLE_MV_SQL = """ -CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' -TO {database}.{table_name} +def PERSONS_TABLE_MV_SQL(on_cluster=True, target_table=PERSONS_WRITABLE_TABLE): + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} {on_cluster_clause} +TO {target_table} AS SELECT id, created_at, @@ -76,8 +82,23 @@ def KAFKA_PERSONS_TABLE_SQL(on_cluster=True): version, _timestamp, _offset -FROM {database}.kafka_{table_name} -""".format(table_name=PERSONS_TABLE, cluster=CLICKHOUSE_CLUSTER, database=CLICKHOUSE_DATABASE) +FROM {kafka_table} +""".format( + mv_name=PERSONS_TABLE_MV, + on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), + target_table=target_table, + kafka_table=KAFKA_PERSONS_TABLE, + ) + + +def PERSONS_WRITABLE_TABLE_SQL(): + return PERSONS_TABLE_BASE_SQL.format( + table_name=PERSONS_WRITABLE_TABLE, + on_cluster_clause=ON_CLUSTER_CLAUSE(False), + engine=Distributed(data_table=PERSONS_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER), + extra_fields=KAFKA_COLUMNS, + ) + GET_LATEST_PERSON_SQL = """ SELECT * FROM person JOIN ( @@ -175,6 +196,12 @@ def PERSONS_DISTINCT_ID_TABLE_SQL(on_cluster=True): # PERSON_DISTINCT_ID2_TABLE = "person_distinct_id2" +PERSON_DISTINCT_ID2_TABLE_MV = f"{PERSON_DISTINCT_ID2_TABLE}_mv" +PERSON_DISTINCT_ID2_WRITABLE_TABLE = f"writable_{PERSON_DISTINCT_ID2_TABLE}" +KAFKA_PERSON_DISTINCT_ID2_TABLE = f"kafka_{PERSON_DISTINCT_ID2_TABLE}" + +DROP_KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_PERSON_DISTINCT_ID2_TABLE}" +DROP_PERSON_DISTINCT_ID2_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {PERSON_DISTINCT_ID2_TABLE_MV}" # NOTE: This table base SQL is also used for distinct ID overrides! PERSON_DISTINCT_ID2_TABLE_BASE_SQL = """ @@ -215,18 +242,17 @@ def PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=True): def KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=True): return PERSON_DISTINCT_ID2_TABLE_BASE_SQL.format( - table_name="kafka_" + PERSON_DISTINCT_ID2_TABLE, + table_name=KAFKA_PERSON_DISTINCT_ID2_TABLE, on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), engine=kafka_engine(KAFKA_PERSON_DISTINCT_ID), extra_fields="", ) -# You must include the database here because of a bug in clickhouse -# related to https://github.com/ClickHouse/ClickHouse/issues/10471 -PERSON_DISTINCT_ID2_MV_SQL = """ -CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' -TO {database}.{table_name} +def PERSON_DISTINCT_ID2_MV_SQL(on_cluster=True, target_table=PERSON_DISTINCT_ID2_WRITABLE_TABLE): + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} {on_cluster_clause} +TO {target_table} AS SELECT team_id, distinct_id, @@ -236,12 +262,26 @@ def KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=True): _timestamp, _offset, _partition -FROM {database}.kafka_{table_name} +FROM {kafka_table} """.format( - table_name=PERSON_DISTINCT_ID2_TABLE, - cluster=CLICKHOUSE_CLUSTER, - database=CLICKHOUSE_DATABASE, -) + mv_name=PERSON_DISTINCT_ID2_TABLE_MV, + on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), + target_table=target_table, + kafka_table=KAFKA_PERSON_DISTINCT_ID2_TABLE, + ) + + +def PERSON_DISTINCT_ID2_WRITABLE_TABLE_SQL(): + # This is a table used for writing from the ingestion layer. It's not sharded, thus it uses the single shard cluster. + return PERSON_DISTINCT_ID2_TABLE_BASE_SQL.format( + table_name=PERSON_DISTINCT_ID2_WRITABLE_TABLE, + on_cluster_clause=ON_CLUSTER_CLAUSE(False), + engine=Distributed(data_table=PERSON_DISTINCT_ID2_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER), + extra_fields=f""" + {KAFKA_COLUMNS_WITH_PARTITION} + """, + ) + # # person_distinct_id_overrides: This table contains rows for all (team_id, @@ -249,7 +289,14 @@ def KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=True): # yet been integrated back into the events table via squashing. # + PERSON_DISTINCT_ID_OVERRIDES_TABLE = "person_distinct_id_overrides" +PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV = f"{PERSON_DISTINCT_ID_OVERRIDES_TABLE}_mv" +PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE = f"writable_{PERSON_DISTINCT_ID_OVERRIDES_TABLE}" +KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE = f"kafka_{PERSON_DISTINCT_ID_OVERRIDES_TABLE}" + +DROP_KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE}" +DROP_PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV}" PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL = PERSON_DISTINCT_ID2_TABLE_BASE_SQL @@ -278,16 +325,18 @@ def PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL(on_cluster=True): KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = ( lambda on_cluster=True: PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL.format( - table_name="kafka_" + PERSON_DISTINCT_ID_OVERRIDES_TABLE, + table_name=KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE, on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), engine=kafka_engine(KAFKA_PERSON_DISTINCT_ID, group="clickhouse-person-distinct-id-overrides"), extra_fields="", ) ) -PERSON_DISTINCT_ID_OVERRIDES_MV_SQL = """ -CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' -TO {database}.{table_name} + +def PERSON_DISTINCT_ID_OVERRIDES_MV_SQL(on_cluster=True, target_table=PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE): + return """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} {on_cluster_clause} +TO {target_table} AS SELECT team_id, distinct_id, @@ -297,13 +346,29 @@ def PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL(on_cluster=True): _timestamp, _offset, _partition -FROM {database}.kafka_{table_name} +FROM {kafka_table} WHERE version > 0 -- only store updated rows, not newly inserted ones """.format( - table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE, - cluster=CLICKHOUSE_CLUSTER, - database=CLICKHOUSE_DATABASE, -) + mv_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV, + on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster), + target_table=target_table, + kafka_table=KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE, + ) + + +def PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE_SQL(): + # This is a table used for writing from the ingestion layer. It's not sharded, thus it uses the single shard cluster. + return PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL.format( + table_name=PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE, + on_cluster_clause=ON_CLUSTER_CLAUSE(False), + engine=Distributed( + data_table=PERSON_DISTINCT_ID_OVERRIDES_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER + ), + extra_fields=f""" + {KAFKA_COLUMNS_WITH_PARTITION} + """, + ) + TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = ( f"TRUNCATE TABLE IF EXISTS {PERSON_DISTINCT_ID_OVERRIDES_TABLE} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" From 00d73148ff84cdc3e1fac4ba56ff84eba39536d3 Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Mon, 29 Sep 2025 12:19:20 +0100 Subject: [PATCH 4/7] chore: rename migration --- ..._layer.py => 0152_non_sharded_tables_ingestion_layer.py} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename posthog/clickhouse/migrations/{0150_non_sharded_tables_ingestion_layer.py => 0152_non_sharded_tables_ingestion_layer.py} (96%) diff --git a/posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py b/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py similarity index 96% rename from posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py rename to posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py index 80f103f591d79..c121d55abec43 100644 --- a/posthog/clickhouse/migrations/0150_non_sharded_tables_ingestion_layer.py +++ b/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py @@ -51,7 +51,7 @@ run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), run_sql_with_exceptions(DROP_PERSONS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), run_sql_with_exceptions(DROP_KAFKA_PERSONS_TABLE_SQL, node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), - run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), ] From d4d998f7067bab771e54dfbd8d4ef1c63f660083 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 29 Sep 2025 11:51:27 +0000 Subject: [PATCH 5/7] Update query snapshots --- .../test/__snapshots__/test_schema.ambr | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index 32f9a6afc67eb..51dcbe87bab1e 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -1812,7 +1812,7 @@ , INDEX kafka_timestamp_minmax_person _timestamp TYPE minmax GRANULARITY 3 ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person', '{replica}-{shard}', version) - Order By (team_id, id) + ORDER BY (team_id, id) ''' @@ -1846,7 +1846,7 @@ ''' CREATE MATERIALIZED VIEW IF NOT EXISTS person_distinct_id2_mv ON CLUSTER 'posthog' - TO posthog_test.person_distinct_id2 + TO writable_person_distinct_id2 AS SELECT team_id, distinct_id, @@ -1856,7 +1856,7 @@ _timestamp, _offset, _partition - FROM posthog_test.kafka_person_distinct_id2 + FROM kafka_person_distinct_id2 ''' # --- @@ -1925,7 +1925,7 @@ ''' CREATE MATERIALIZED VIEW IF NOT EXISTS person_distinct_id_overrides_mv ON CLUSTER 'posthog' - TO posthog_test.person_distinct_id_overrides + TO writable_person_distinct_id_overrides AS SELECT team_id, distinct_id, @@ -1935,7 +1935,7 @@ _timestamp, _offset, _partition - FROM posthog_test.kafka_person_distinct_id_overrides + FROM kafka_person_distinct_id_overrides WHERE version > 0 -- only store updated rows, not newly inserted ones ''' @@ -1944,7 +1944,7 @@ ''' CREATE MATERIALIZED VIEW IF NOT EXISTS person_mv ON CLUSTER 'posthog' - TO posthog_test.person + TO writable_person AS SELECT id, created_at, @@ -1955,7 +1955,7 @@ version, _timestamp, _offset - FROM posthog_test.kafka_person + FROM kafka_person ''' # --- @@ -2112,7 +2112,7 @@ ''' CREATE MATERIALIZED VIEW IF NOT EXISTS plugin_log_entries_mv ON CLUSTER 'posthog' - TO posthog_test.plugin_log_entries + TO writable_plugin_log_entries AS SELECT id, team_id, @@ -2125,7 +2125,7 @@ instance_id, _timestamp, _offset - FROM posthog_test.kafka_plugin_log_entries + FROM kafka_plugin_log_entries ''' # --- @@ -5000,7 +5000,7 @@ , INDEX kafka_timestamp_minmax_person _timestamp TYPE minmax GRANULARITY 3 ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person', '{replica}-{shard}', version) - Order By (team_id, id) + ORDER BY (team_id, id) SETTINGS storage_policy = 'hot_to_cold' ''' From 76e65e62397a1ff7ad3798285528f0d5aee07262 Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Mon, 29 Sep 2025 12:52:56 +0100 Subject: [PATCH 6/7] fix: duplicated migration --- .../migrations/0149_groups_ingestion_layer.py | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 posthog/clickhouse/migrations/0149_groups_ingestion_layer.py diff --git a/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py b/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py deleted file mode 100644 index 5daf0794fcb95..0000000000000 --- a/posthog/clickhouse/migrations/0149_groups_ingestion_layer.py +++ /dev/null @@ -1,17 +0,0 @@ -from posthog.clickhouse.client.connection import NodeRole -from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions -from posthog.models.group.sql import ( - DROP_GROUPS_TABLE_MV_SQL, - DROP_KAFKA_GROUPS_TABLE_SQL, - GROUPS_TABLE_MV_SQL, - GROUPS_WRITABLE_TABLE_SQL, - KAFKA_GROUPS_TABLE_SQL, -) - -operations = [ - run_sql_with_exceptions(DROP_GROUPS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(DROP_KAFKA_GROUPS_TABLE_SQL, node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), - run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), - run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), -] From 3b28958f9bac15757829706dbd4d6fa33b31cf62 Mon Sep 17 00:00:00 2001 From: Daniel Escribano Date: Mon, 29 Sep 2025 13:44:05 +0100 Subject: [PATCH 7/7] fix: node_role --- .../migrations/0152_non_sharded_tables_ingestion_layer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py b/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py index c121d55abec43..80f103f591d79 100644 --- a/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py +++ b/posthog/clickhouse/migrations/0152_non_sharded_tables_ingestion_layer.py @@ -51,7 +51,7 @@ run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), run_sql_with_exceptions(DROP_PERSONS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]), run_sql_with_exceptions(DROP_KAFKA_PERSONS_TABLE_SQL, node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), - run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.DATA]), + run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), + run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]), ]