Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def operations(self):
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=PERSONS_TABLE_MV_SQL,
sql=PERSONS_TABLE_MV_SQL(target_table=PERSON_TABLE),
rollback=None,
),
AsyncMigrationOperation(
Expand Down
3 changes: 2 additions & 1 deletion posthog/clickhouse/migrations/0004_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL,
KAFKA_PERSONS_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_MV_SQL,
PERSONS_TABLE,
PERSONS_TABLE_MV_SQL,
)

Expand All @@ -27,7 +28,7 @@
operations = [
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL()),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)),
run_sql_with_exceptions(PERSONS_DISTINCT_ID_TABLE_MV_SQL),
run_sql_with_exceptions(WRITABLE_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_EVENTS_TABLE_SQL()),
Expand Down
3 changes: 2 additions & 1 deletion posthog/clickhouse/migrations/0008_plugin_log_entries.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.clickhouse.plugin_log_entries import (
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE,
PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
PLUGIN_LOG_ENTRIES_TABLE_SQL,
)

operations = [
run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL()),
run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL),
run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(target_table=PLUGIN_LOG_ENTRIES_TABLE)),
]
4 changes: 2 additions & 2 deletions posthog/clickhouse/migrations/0009_person_deleted_column.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
Expand All @@ -9,5 +9,5 @@
f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS is_deleted Int8 DEFAULT 0"
),
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)),
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
Expand All @@ -9,5 +9,5 @@
f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS distinct_ids Array(VARCHAR)"
),
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)),
]
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
run_sql_with_exceptions(f"DROP TABLE person_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE kafka_person ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' DROP COLUMN IF EXISTS distinct_ids"),
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)),
]
3 changes: 2 additions & 1 deletion posthog/clickhouse/migrations/0022_person_distinct_id2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from posthog.models.person.sql import (
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
PERSON_DISTINCT_ID2_MV_SQL,
PERSON_DISTINCT_ID2_TABLE,
PERSON_DISTINCT_ID2_TABLE_SQL,
)

operations = [
run_sql_with_exceptions(PERSON_DISTINCT_ID2_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL()),
run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL),
run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL(target_table=PERSON_DISTINCT_ID2_TABLE)),
]
4 changes: 2 additions & 2 deletions posthog/clickhouse/migrations/0029_person_version_column.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE_MV_SQL
from posthog.models.person.sql import KAFKA_PERSONS_TABLE_SQL, PERSONS_TABLE, PERSONS_TABLE_MV_SQL
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
Expand All @@ -9,5 +9,5 @@
f"ALTER TABLE person ON CLUSTER '{CLICKHOUSE_CLUSTER}' ADD COLUMN IF NOT EXISTS version UInt64"
),
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL()),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(target_table=PERSONS_TABLE)),
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, PERSON_DISTINCT_ID_OVERRIDES_MV_SQL
from posthog.models.person.sql import (
KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
PERSON_DISTINCT_ID_OVERRIDES_MV_SQL,
PERSON_DISTINCT_ID_OVERRIDES_TABLE,
)

operations = [
run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()),
run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL),
run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL(target_table=PERSON_DISTINCT_ID_OVERRIDES_TABLE)),
]
17 changes: 17 additions & 0 deletions posthog/clickhouse/migrations/0149_groups_ingestion_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.group.sql import (
DROP_GROUPS_TABLE_MV_SQL,
DROP_KAFKA_GROUPS_TABLE_SQL,
GROUPS_TABLE_MV_SQL,
GROUPS_WRITABLE_TABLE_SQL,
KAFKA_GROUPS_TABLE_SQL,
)

operations = [
run_sql_with_exceptions(DROP_GROUPS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(DROP_KAFKA_GROUPS_TABLE_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(GROUPS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(KAFKA_GROUPS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(GROUPS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.clickhouse.plugin_log_entries import (
DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL,
)
from posthog.models.person.sql import (
DROP_KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
DROP_KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
DROP_KAFKA_PERSONS_TABLE_SQL,
DROP_PERSON_DISTINCT_ID2_TABLE_MV_SQL,
DROP_PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV_SQL,
DROP_PERSONS_TABLE_MV_SQL,
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
KAFKA_PERSONS_TABLE_SQL,
PERSON_DISTINCT_ID2_MV_SQL,
PERSON_DISTINCT_ID2_WRITABLE_TABLE_SQL,
PERSON_DISTINCT_ID_OVERRIDES_MV_SQL,
PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE_SQL,
PERSONS_TABLE_MV_SQL,
PERSONS_WRITABLE_TABLE_SQL,
)

operations = [
run_sql_with_exceptions(DROP_PERSON_DISTINCT_ID2_TABLE_MV_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(DROP_KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(PERSON_DISTINCT_ID2_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]
),
run_sql_with_exceptions(PERSON_DISTINCT_ID2_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(DROP_PERSON_DISTINCT_ID_OVERRIDES_TABLE_MV_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(DROP_KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(
KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]
),
run_sql_with_exceptions(
PERSON_DISTINCT_ID_OVERRIDES_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]
),
run_sql_with_exceptions(DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]
),
run_sql_with_exceptions(PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.INGESTION_SMALL]),
run_sql_with_exceptions(DROP_PERSONS_TABLE_MV_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(DROP_KAFKA_PERSONS_TABLE_SQL, node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(PERSONS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(KAFKA_PERSONS_TABLE_SQL(on_cluster=False), node_roles=[NodeRole.DATA]),
run_sql_with_exceptions(PERSONS_TABLE_MV_SQL(on_cluster=False), node_roles=[NodeRole.DATA]),
]
40 changes: 29 additions & 11 deletions posthog/clickhouse/plugin_log_entries.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from posthog import settings
from posthog.clickhouse.cluster import ON_CLUSTER_CLAUSE
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import ReplacingMergeTree
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_PLUGIN_LOG_ENTRIES
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE

PLUGIN_LOG_ENTRIES_TABLE = "plugin_log_entries"
PLUGIN_LOG_ENTRIES_TABLE_MV = f"{PLUGIN_LOG_ENTRIES_TABLE}_mv"
PLUGIN_LOG_ENTRIES_WRITABLE_TABLE = f"writable_{PLUGIN_LOG_ENTRIES_TABLE}"
KAFKA_PLUGIN_LOG_ENTRIES_TABLE = f"kafka_{PLUGIN_LOG_ENTRIES_TABLE}"

DROP_KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL = f"DROP TABLE IF EXISTS {KAFKA_PLUGIN_LOG_ENTRIES_TABLE}"
DROP_PLUGIN_LOG_ENTRIES_TABLE_MV_SQL = f"DROP TABLE IF EXISTS {PLUGIN_LOG_ENTRIES_TABLE_MV}"

PLUGIN_LOG_ENTRIES_TTL_WEEKS = 1

PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL = """
Expand Down Expand Up @@ -46,16 +53,17 @@ def PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True):

def KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True):
return PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL.format(
table_name="kafka_" + PLUGIN_LOG_ENTRIES_TABLE,
table_name=KAFKA_PLUGIN_LOG_ENTRIES_TABLE,
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
engine=kafka_engine(topic=KAFKA_PLUGIN_LOG_ENTRIES),
extra_fields="",
)


PLUGIN_LOG_ENTRIES_TABLE_MV_SQL = """
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}'
TO {database}.{table_name}
def PLUGIN_LOG_ENTRIES_TABLE_MV_SQL(on_cluster=True, target_table=PLUGIN_LOG_ENTRIES_WRITABLE_TABLE):
return """
CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} {on_cluster_clause}
TO {target_table}
AS SELECT
id,
team_id,
Expand All @@ -68,12 +76,22 @@ def KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL(on_cluster=True):
instance_id,
_timestamp,
_offset
FROM {database}.kafka_{table_name}
FROM {kafka_table}
""".format(
table_name=PLUGIN_LOG_ENTRIES_TABLE,
cluster=CLICKHOUSE_CLUSTER,
database=CLICKHOUSE_DATABASE,
)
target_table=target_table,
mv_name=PLUGIN_LOG_ENTRIES_TABLE_MV,
on_cluster_clause=ON_CLUSTER_CLAUSE(on_cluster),
kafka_table=KAFKA_PLUGIN_LOG_ENTRIES_TABLE,
)


def PLUGIN_LOG_ENTRIES_WRITABLE_TABLE_SQL():
return PLUGIN_LOG_ENTRIES_TABLE_BASE_SQL.format(
table_name=PLUGIN_LOG_ENTRIES_WRITABLE_TABLE,
on_cluster_clause=ON_CLUSTER_CLAUSE(False),
engine=Distributed(data_table=PLUGIN_LOG_ENTRIES_TABLE, cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER),
extra_fields=KAFKA_COLUMNS,
)


INSERT_PLUGIN_LOG_ENTRY_SQL = """
Expand Down
Loading
Loading