Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 105 additions & 39 deletions posthog/api/advanced_activity_logs/field_discovery.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import gc
import json
import dataclasses
from datetime import timedelta
from typing import Any, TypedDict

from django.db import connection
from django.db.models import QuerySet
from django.utils import timezone

from posthog.models.activity_logging.activity_log import ActivityLog, Change
from posthog.models.utils import UUIDT
Expand Down Expand Up @@ -96,8 +98,49 @@
def _get_org_record_count(self) -> int:
return ActivityLog.objects.filter(organization_id=self.organization_id).count()

def process_batch_for_large_org(self, offset: int, limit: int) -> None:
batch_fields = self._process_batch_memory(offset, limit, use_sampling=True)
def get_activity_logs_queryset(self, hours_back: int | None = None) -> QuerySet:
"""Get the base queryset for activity logs, optionally filtered by time."""
queryset = ActivityLog.objects.filter(organization_id=self.organization_id, detail__isnull=False)

if hours_back is not None:
cutoff_time = timezone.now() - timedelta(hours=hours_back)
queryset = queryset.filter(created_at__gte=cutoff_time)

return queryset

def get_sampled_records(self, limit: int, offset: int = 0) -> list[dict]:
"""Get sampled records using SQL TABLESAMPLE for large datasets."""
query = f"""
SELECT scope, detail
FROM posthog_activitylog TABLESAMPLE SYSTEM ({SAMPLING_PERCENTAGE})
WHERE organization_id = %s
AND detail IS NOT NULL
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""

with connection.cursor() as cursor:
cursor.execute(query, [str(self.organization_id), limit, offset])
records = []
for row in cursor.fetchall():
scope, detail = row
if isinstance(detail, str):
try:
detail = json.loads(detail)
except (json.JSONDecodeError, TypeError):
detail = None
records.append({"scope": scope, "detail": detail})
return records

def process_batch_for_large_org(self, records: list[dict], hours_back: int | None = None) -> None:
"""Process a batch of records for large organizations.
Args:
records: List of activity log records to process
hours_back: If provided, used to get appropriate static filters for the time range
"""
# Process the provided records
batch_fields = self._extract_fields_from_records(records)
batch_converted = self._convert_to_discovery_format(batch_fields)

existing_cache = get_cached_fields(str(self.organization_id))
Expand All @@ -108,11 +151,22 @@
current_detail_fields = {}
self._merge_fields_into_result(current_detail_fields, batch_converted)

static_filters = (
existing_cache.get("static_filters")
if existing_cache
else self._get_static_filters(self._get_base_queryset())
)
# Get static filters for the appropriate time range
if hours_back is not None:
recent_queryset = self.get_activity_logs_queryset(hours_back=hours_back)
new_static_filters = self._get_static_filters(recent_queryset)

# Merge with existing static filters
if existing_cache and "static_filters" in existing_cache:
static_filters = self._merge_static_filters(existing_cache["static_filters"], new_static_filters)
else:
static_filters = new_static_filters
else:
static_filters = (
existing_cache.get("static_filters")

Check failure on line 166 in posthog/api/advanced_activity_logs/field_discovery.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Incompatible types in assignment (expression has type "Any | dict[str, list[dict[str, str]]] | None", variable has type "dict[Any, Any]")
if existing_cache
else self._get_static_filters(self._get_base_queryset())
)

cache_data = {
"static_filters": static_filters,
Expand Down Expand Up @@ -181,38 +235,8 @@

return all_fields

def _process_batch_memory(
self, offset: int, limit: int, use_sampling: bool = True
) -> dict[str, set[tuple[str, str]]]:
if use_sampling:
query = f"""
SELECT scope, detail
FROM posthog_activitylog TABLESAMPLE SYSTEM ({SAMPLING_PERCENTAGE})
WHERE organization_id = %s
AND detail IS NOT NULL
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""

with connection.cursor() as cursor:
cursor.execute(query, [str(self.organization_id), limit, offset])
records = []
for row in cursor.fetchall():
scope, detail = row
if isinstance(detail, str):
try:
detail = json.loads(detail)
except (json.JSONDecodeError, TypeError):
detail = None
records.append({"scope": scope, "detail": detail})
else:
records = [
{"scope": record["scope"], "detail": record["detail"]}
for record in ActivityLog.objects.filter(
organization_id=self.organization_id, detail__isnull=False
).values("scope", "detail")[offset : offset + limit]
]

def _extract_fields_from_records(self, records: list[dict]) -> dict[str, set[tuple[str, str]]]:
"""Extract field information from a list of activity log records."""
batch_fields: dict[str, set[tuple[str, str]]] = {}

for record in records:
Expand All @@ -231,6 +255,20 @@

return batch_fields

def _process_batch_memory(
self, offset: int, limit: int, use_sampling: bool = True
) -> dict[str, set[tuple[str, str]]]:
"""Legacy method for backward compatibility."""
if use_sampling:
records = self.get_sampled_records(limit, offset)
else:
records = [
{"scope": record["scope"], "detail": record["detail"]}
for record in self.get_activity_logs_queryset().values("scope", "detail")[offset : offset + limit]
]

return self._extract_fields_from_records(records)

def _extract_json_paths(self, obj: Any, prefix: str = "") -> set[tuple[str, str]]:
paths = set()

Expand Down Expand Up @@ -304,3 +342,31 @@
result.append((scope, field_path, sorted(types)))

return result

def _merge_static_filters(self, existing: dict, new: dict) -> dict:
"""Merge static filters additively"""
merged = {
"users": existing.get("users", []),
"scopes": existing.get("scopes", []),
"activities": existing.get("activities", []),
}

# Merge users (by uuid)
existing_user_ids = {u["value"] for u in merged["users"]}
for user in new.get("users", []):
if user["value"] not in existing_user_ids:
merged["users"].append(user)

# Merge scopes
existing_scopes = {s["value"] for s in merged["scopes"]}
for scope in new.get("scopes", []):
if scope["value"] not in existing_scopes:
merged["scopes"].append(scope)

# Merge activities
existing_activities = {a["value"] for a in merged["activities"]}
for activity in new.get("activities", []):
if activity["value"] not in existing_activities:
merged["activities"].append(activity)

return merged
11 changes: 11 additions & 0 deletions posthog/api/advanced_activity_logs/fields_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,14 @@ def cache_fields(organization_id: str, fields_data: dict, record_count: int) ->
client.setex(key, CACHE_TTL_SECONDS, json_data)
except Exception as e:
capture_exception(e)


def delete_cached_fields(organization_id: str) -> bool:
"""Delete cached fields for an organization"""
try:
client = get_client()
key = _get_cache_key(organization_id)
return bool(client.delete(key))
except Exception as e:
capture_exception(e)
return False
46 changes: 41 additions & 5 deletions posthog/management/commands/refresh_activity_log_fields_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.core.management.base import BaseCommand

from posthog.api.advanced_activity_logs.field_discovery import SMALL_ORG_THRESHOLD
from posthog.api.advanced_activity_logs.constants import SMALL_ORG_THRESHOLD
from posthog.tasks.tasks import refresh_activity_log_fields_cache


Expand All @@ -9,10 +9,24 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", help="Show what would be processed without running")
parser.add_argument(
"--flush",
action="store_true",
help="Delete existing cache and rebuild from scratch (uses 10% sampling for full rebuild)",
)
parser.add_argument(
"--hours-back",
type=int,
default=14,
help="Number of hours to look back when not using --flush (default: 14 = 12h + 2h buffer)",
)

def handle(self, *args, **options):
if options["dry_run"]:
from datetime import timedelta

from django.db.models import Count
from django.utils import timezone

from posthog.models import Organization
from posthog.models.activity_logging.activity_log import ActivityLog
Expand All @@ -35,9 +49,31 @@ def handle(self, *args, **options):
org.activity_count = activity_counts.get(org.id, 0)

self.stdout.write(f"Would process {len(large_orgs)} organizations:")
for org in large_orgs:
self.stdout.write(f" - {org.name} (id={org.id}) - {org.activity_count:,} records")

if options["flush"]:
self.stdout.write("Mode: FLUSH - Delete existing cache and rebuild from scratch with 10% sampling")
for org in large_orgs:
self.stdout.write(f" - {org.name} (id={org.id}) - {org.activity_count:,} total records")
else:
cutoff = timezone.now() - timedelta(hours=options["hours_back"])
self.stdout.write(f"Mode: INCREMENTAL - Process last {options['hours_back']} hours with 100% coverage")
self.stdout.write(f"Cutoff time: {cutoff}")

for org in large_orgs:
recent_count = ActivityLog.objects.filter(
organization_id=org.id, created_at__gte=cutoff, detail__isnull=False
).count()
self.stdout.write(
f" - {org.name} (id={org.id}) - {recent_count:,} records from last {options['hours_back']}h"
)
else:
self.stdout.write("Starting activity log fields cache refresh...")
refresh_activity_log_fields_cache()
mode = (
"FLUSH mode"
if options["flush"]
else f"INCREMENTAL mode (last {options['hours_back']}h with 100% coverage)"
)
self.stdout.write(f"Starting activity log fields cache refresh in {mode}...")

refresh_activity_log_fields_cache(flush=options["flush"], hours_back=options["hours_back"])

self.stdout.write("Cache refresh completed.")
80 changes: 70 additions & 10 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,17 +926,64 @@ def background_delete_model_task(


@shared_task(ignore_result=True, time_limit=7200)
def refresh_activity_log_fields_cache() -> None:
"""Refresh fields cache for large organizations every 12 hours"""
def refresh_activity_log_fields_cache(flush: bool = False, hours_back: int = 14) -> None:
"""
Refresh fields cache for large organizations.

Args:
flush: If True, delete existing cache and rebuild from scratch
hours_back: Number of hours to look back (default: 14 = 12h schedule + 2h buffer)
"""

from uuid import UUID

from django.db.models import Count

from posthog.api.advanced_activity_logs.constants import BATCH_SIZE, SAMPLING_PERCENTAGE, SMALL_ORG_THRESHOLD
from posthog.api.advanced_activity_logs.field_discovery import AdvancedActivityLogFieldDiscovery
from posthog.api.advanced_activity_logs.fields_cache import delete_cached_fields
from posthog.exceptions_capture import capture_exception
from posthog.models import Organization
from posthog.models.activity_logging.activity_log import ActivityLog

logger.info("[refresh_activity_log_fields_cache] running task")
def _process_org_with_flush(discovery: AdvancedActivityLogFieldDiscovery, org_id: UUID) -> None:
"""Rebuild cache from scratch with sampling."""
deleted = delete_cached_fields(str(org_id))
logger.info(f"Flushed cache for org {org_id}: {deleted}")

record_count = discovery._get_org_record_count()
estimated_sampled_records = int(record_count * (SAMPLING_PERCENTAGE / 100))
total_batches = (estimated_sampled_records + BATCH_SIZE - 1) // BATCH_SIZE

logger.info(
f"Rebuilding cache for org {org_id} from scratch: "
f"{record_count} total records, sampling {estimated_sampled_records} records"
)

for batch_num in range(total_batches):
offset = batch_num * BATCH_SIZE
records = discovery.get_sampled_records(limit=BATCH_SIZE, offset=offset)
discovery.process_batch_for_large_org(records)

def _process_org_incremental(discovery: AdvancedActivityLogFieldDiscovery, org_id: UUID, hours_back: int) -> int:
"""Process recent records with 100% coverage."""
recent_queryset = discovery.get_activity_logs_queryset(hours_back=hours_back)
recent_count = recent_queryset.count()

logger.info(f"Processing {recent_count} records from last {hours_back}h for org {org_id} (100% coverage)")

for batch_num in range(0, recent_count, BATCH_SIZE):
records = [
{"scope": record["scope"], "detail": record["detail"]}
for record in recent_queryset.values("scope", "detail")[batch_num : batch_num + BATCH_SIZE]
]
if records:
discovery.process_batch_for_large_org(records, hours_back=hours_back)

return recent_count

mode = "FLUSH" if flush else f"INCREMENTAL (last {hours_back}h, 100% coverage)"
logger.info(f"[refresh_activity_log_fields_cache] running task in {mode} mode")

large_org_data = (
ActivityLog.objects.values("organization_id")
Expand All @@ -951,24 +998,37 @@ def refresh_activity_log_fields_cache() -> None:
org_count = len(large_orgs)
logger.info(f"[refresh_activity_log_fields_cache] processing {org_count} large organizations")

processed_orgs = 0
total_recent_records = 0

for org in large_orgs:
try:
discovery = AdvancedActivityLogFieldDiscovery(org.id)
record_count = discovery._get_org_record_count()

estimated_sampled_records = int(record_count * (SAMPLING_PERCENTAGE / 100))
total_batches = (estimated_sampled_records + BATCH_SIZE - 1) // BATCH_SIZE
if flush:
_process_org_with_flush(discovery, org.id)
else:
recent_count = _process_org_incremental(discovery, org.id, hours_back)
total_recent_records += recent_count

for batch_num in range(total_batches):
offset = batch_num * BATCH_SIZE
discovery.process_batch_for_large_org(offset, BATCH_SIZE)
processed_orgs += 1

except Exception as e:
logger.exception(
"Failed to refresh activity log fields cache for org",
org_id=org.id,
mode=mode,
error=e,
)
capture_exception(e)

logger.info(f"[refresh_activity_log_fields_cache] completed for {org_count} organizations")
if not flush:
logger.info(
f"[refresh_activity_log_fields_cache] completed for {processed_orgs}/{org_count} organizations "
f"in {mode} mode. Total recent records processed: {total_recent_records}"
)
else:
logger.info(
f"[refresh_activity_log_fields_cache] completed flush and rebuild for "
f"{processed_orgs}/{org_count} organizations"
)
Loading