From 8d70c315797c5a6a5c814291c66dd6ec2298357e Mon Sep 17 00:00:00 2001 From: Yasen Slavov Date: Mon, 29 Sep 2025 16:19:51 +0300 Subject: [PATCH 1/2] Amend the cache refresh task to operate on the last 14h records only. --- .../advanced_activity_logs/field_discovery.py | 144 +++++++++++++----- .../advanced_activity_logs/fields_cache.py | 11 ++ .../refresh_activity_log_fields_cache.py | 46 +++++- posthog/tasks/tasks.py | 80 ++++++++-- 4 files changed, 227 insertions(+), 54 deletions(-) diff --git a/posthog/api/advanced_activity_logs/field_discovery.py b/posthog/api/advanced_activity_logs/field_discovery.py index ece1d8200ed56..4dda11d05c26c 100644 --- a/posthog/api/advanced_activity_logs/field_discovery.py +++ b/posthog/api/advanced_activity_logs/field_discovery.py @@ -1,10 +1,12 @@ import gc import json import dataclasses +from datetime import timedelta from typing import Any, TypedDict from django.db import connection from django.db.models import QuerySet +from django.utils import timezone from posthog.models.activity_logging.activity_log import ActivityLog, Change from posthog.models.utils import UUIDT @@ -96,8 +98,49 @@ def _analyze_detail_fields_memory(self) -> DetailFieldsResult: def _get_org_record_count(self) -> int: return ActivityLog.objects.filter(organization_id=self.organization_id).count() - def process_batch_for_large_org(self, offset: int, limit: int) -> None: - batch_fields = self._process_batch_memory(offset, limit, use_sampling=True) + def get_activity_logs_queryset(self, hours_back: int | None = None) -> QuerySet: + """Get the base queryset for activity logs, optionally filtered by time.""" + queryset = ActivityLog.objects.filter(organization_id=self.organization_id, detail__isnull=False) + + if hours_back is not None: + cutoff_time = timezone.now() - timedelta(hours=hours_back) + queryset = queryset.filter(created_at__gte=cutoff_time) + + return queryset + + def get_sampled_records(self, limit: int, offset: int = 0) -> list[dict]: + """Get sampled records using SQL TABLESAMPLE for large datasets.""" + query = f""" + SELECT scope, detail + FROM posthog_activitylog TABLESAMPLE SYSTEM ({SAMPLING_PERCENTAGE}) + WHERE organization_id = %s + AND detail IS NOT NULL + ORDER BY created_at DESC + LIMIT %s OFFSET %s + """ + + with connection.cursor() as cursor: + cursor.execute(query, [str(self.organization_id), limit, offset]) + records = [] + for row in cursor.fetchall(): + scope, detail = row + if isinstance(detail, str): + try: + detail = json.loads(detail) + except (json.JSONDecodeError, TypeError): + detail = None + records.append({"scope": scope, "detail": detail}) + return records + + def process_batch_for_large_org(self, records: list[dict], hours_back: int | None = None) -> None: + """Process a batch of records for large organizations. + + Args: + records: List of activity log records to process + hours_back: If provided, used to get appropriate static filters for the time range + """ + # Process the provided records + batch_fields = self._extract_fields_from_records(records) batch_converted = self._convert_to_discovery_format(batch_fields) existing_cache = get_cached_fields(str(self.organization_id)) @@ -108,11 +151,22 @@ def process_batch_for_large_org(self, offset: int, limit: int) -> None: current_detail_fields = {} self._merge_fields_into_result(current_detail_fields, batch_converted) - static_filters = ( - existing_cache.get("static_filters") - if existing_cache - else self._get_static_filters(self._get_base_queryset()) - ) + # Get static filters for the appropriate time range + if hours_back is not None: + recent_queryset = self.get_activity_logs_queryset(hours_back=hours_back) + new_static_filters = self._get_static_filters(recent_queryset) + + # Merge with existing static filters + if existing_cache and "static_filters" in existing_cache: + static_filters = self._merge_static_filters(existing_cache["static_filters"], new_static_filters) + else: + static_filters = new_static_filters + else: + static_filters = ( + existing_cache.get("static_filters") + if existing_cache + else self._get_static_filters(self._get_base_queryset()) + ) cache_data = { "static_filters": static_filters, @@ -181,38 +235,8 @@ def _discover_fields_memory( return all_fields - def _process_batch_memory( - self, offset: int, limit: int, use_sampling: bool = True - ) -> dict[str, set[tuple[str, str]]]: - if use_sampling: - query = f""" - SELECT scope, detail - FROM posthog_activitylog TABLESAMPLE SYSTEM ({SAMPLING_PERCENTAGE}) - WHERE organization_id = %s - AND detail IS NOT NULL - ORDER BY created_at DESC - LIMIT %s OFFSET %s - """ - - with connection.cursor() as cursor: - cursor.execute(query, [str(self.organization_id), limit, offset]) - records = [] - for row in cursor.fetchall(): - scope, detail = row - if isinstance(detail, str): - try: - detail = json.loads(detail) - except (json.JSONDecodeError, TypeError): - detail = None - records.append({"scope": scope, "detail": detail}) - else: - records = [ - {"scope": record["scope"], "detail": record["detail"]} - for record in ActivityLog.objects.filter( - organization_id=self.organization_id, detail__isnull=False - ).values("scope", "detail")[offset : offset + limit] - ] - + def _extract_fields_from_records(self, records: list[dict]) -> dict[str, set[tuple[str, str]]]: + """Extract field information from a list of activity log records.""" batch_fields: dict[str, set[tuple[str, str]]] = {} for record in records: @@ -231,6 +255,20 @@ def _process_batch_memory( return batch_fields + def _process_batch_memory( + self, offset: int, limit: int, use_sampling: bool = True + ) -> dict[str, set[tuple[str, str]]]: + """Legacy method for backward compatibility.""" + if use_sampling: + records = self.get_sampled_records(limit, offset) + else: + records = [ + {"scope": record["scope"], "detail": record["detail"]} + for record in self.get_activity_logs_queryset().values("scope", "detail")[offset : offset + limit] + ] + + return self._extract_fields_from_records(records) + def _extract_json_paths(self, obj: Any, prefix: str = "") -> set[tuple[str, str]]: paths = set() @@ -304,3 +342,31 @@ def _convert_to_discovery_format(self, fields: dict[str, set[tuple[str, str]]]) result.append((scope, field_path, sorted(types))) return result + + def _merge_static_filters(self, existing: dict, new: dict) -> dict: + """Merge static filters additively""" + merged = { + "users": existing.get("users", []), + "scopes": existing.get("scopes", []), + "activities": existing.get("activities", []), + } + + # Merge users (by uuid) + existing_user_ids = {u["value"] for u in merged["users"]} + for user in new.get("users", []): + if user["value"] not in existing_user_ids: + merged["users"].append(user) + + # Merge scopes + existing_scopes = {s["value"] for s in merged["scopes"]} + for scope in new.get("scopes", []): + if scope["value"] not in existing_scopes: + merged["scopes"].append(scope) + + # Merge activities + existing_activities = {a["value"] for a in merged["activities"]} + for activity in new.get("activities", []): + if activity["value"] not in existing_activities: + merged["activities"].append(activity) + + return merged diff --git a/posthog/api/advanced_activity_logs/fields_cache.py b/posthog/api/advanced_activity_logs/fields_cache.py index 42a66d236892c..f05bbebe1bd60 100644 --- a/posthog/api/advanced_activity_logs/fields_cache.py +++ b/posthog/api/advanced_activity_logs/fields_cache.py @@ -40,3 +40,14 @@ def cache_fields(organization_id: str, fields_data: dict, record_count: int) -> client.setex(key, CACHE_TTL_SECONDS, json_data) except Exception as e: capture_exception(e) + + +def delete_cached_fields(organization_id: str) -> bool: + """Delete cached fields for an organization""" + try: + client = get_client() + key = _get_cache_key(organization_id) + return bool(client.delete(key)) + except Exception as e: + capture_exception(e) + return False diff --git a/posthog/management/commands/refresh_activity_log_fields_cache.py b/posthog/management/commands/refresh_activity_log_fields_cache.py index 37db792c4c6a7..fe7e61ffa29d8 100644 --- a/posthog/management/commands/refresh_activity_log_fields_cache.py +++ b/posthog/management/commands/refresh_activity_log_fields_cache.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from posthog.api.advanced_activity_logs.field_discovery import SMALL_ORG_THRESHOLD +from posthog.api.advanced_activity_logs.constants import SMALL_ORG_THRESHOLD from posthog.tasks.tasks import refresh_activity_log_fields_cache @@ -9,10 +9,24 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("--dry-run", action="store_true", help="Show what would be processed without running") + parser.add_argument( + "--flush", + action="store_true", + help="Delete existing cache and rebuild from scratch (uses 10% sampling for full rebuild)", + ) + parser.add_argument( + "--hours-back", + type=int, + default=14, + help="Number of hours to look back when not using --flush (default: 14 = 12h + 2h buffer)", + ) def handle(self, *args, **options): if options["dry_run"]: + from datetime import timedelta + from django.db.models import Count + from django.utils import timezone from posthog.models import Organization from posthog.models.activity_logging.activity_log import ActivityLog @@ -35,9 +49,31 @@ def handle(self, *args, **options): org.activity_count = activity_counts.get(org.id, 0) self.stdout.write(f"Would process {len(large_orgs)} organizations:") - for org in large_orgs: - self.stdout.write(f" - {org.name} (id={org.id}) - {org.activity_count:,} records") + + if options["flush"]: + self.stdout.write("Mode: FLUSH - Delete existing cache and rebuild from scratch with 10% sampling") + for org in large_orgs: + self.stdout.write(f" - {org.name} (id={org.id}) - {org.activity_count:,} total records") + else: + cutoff = timezone.now() - timedelta(hours=options["hours_back"]) + self.stdout.write(f"Mode: INCREMENTAL - Process last {options['hours_back']} hours with 100% coverage") + self.stdout.write(f"Cutoff time: {cutoff}") + + for org in large_orgs: + recent_count = ActivityLog.objects.filter( + organization_id=org.id, created_at__gte=cutoff, detail__isnull=False + ).count() + self.stdout.write( + f" - {org.name} (id={org.id}) - {recent_count:,} records from last {options['hours_back']}h" + ) else: - self.stdout.write("Starting activity log fields cache refresh...") - refresh_activity_log_fields_cache() + mode = ( + "FLUSH mode" + if options["flush"] + else f"INCREMENTAL mode (last {options['hours_back']}h with 100% coverage)" + ) + self.stdout.write(f"Starting activity log fields cache refresh in {mode}...") + + refresh_activity_log_fields_cache(flush=options["flush"], hours_back=options["hours_back"]) + self.stdout.write("Cache refresh completed.") diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index d99eee2e36938..edd220056da9e 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -926,17 +926,64 @@ def background_delete_model_task( @shared_task(ignore_result=True, time_limit=7200) -def refresh_activity_log_fields_cache() -> None: - """Refresh fields cache for large organizations every 12 hours""" +def refresh_activity_log_fields_cache(flush: bool = False, hours_back: int = 14) -> None: + """ + Refresh fields cache for large organizations. + + Args: + flush: If True, delete existing cache and rebuild from scratch + hours_back: Number of hours to look back (default: 14 = 12h schedule + 2h buffer) + """ + + from uuid import UUID + from django.db.models import Count from posthog.api.advanced_activity_logs.constants import BATCH_SIZE, SAMPLING_PERCENTAGE, SMALL_ORG_THRESHOLD from posthog.api.advanced_activity_logs.field_discovery import AdvancedActivityLogFieldDiscovery + from posthog.api.advanced_activity_logs.fields_cache import delete_cached_fields from posthog.exceptions_capture import capture_exception from posthog.models import Organization from posthog.models.activity_logging.activity_log import ActivityLog - logger.info("[refresh_activity_log_fields_cache] running task") + def _process_org_with_flush(discovery: AdvancedActivityLogFieldDiscovery, org_id: UUID) -> None: + """Rebuild cache from scratch with sampling.""" + deleted = delete_cached_fields(str(org_id)) + logger.info(f"Flushed cache for org {org_id}: {deleted}") + + record_count = discovery._get_org_record_count() + estimated_sampled_records = int(record_count * (SAMPLING_PERCENTAGE / 100)) + total_batches = (estimated_sampled_records + BATCH_SIZE - 1) // BATCH_SIZE + + logger.info( + f"Rebuilding cache for org {org_id} from scratch: " + f"{record_count} total records, sampling {estimated_sampled_records} records" + ) + + for batch_num in range(total_batches): + offset = batch_num * BATCH_SIZE + records = discovery.get_sampled_records(limit=BATCH_SIZE, offset=offset) + discovery.process_batch_for_large_org(records) + + def _process_org_incremental(discovery: AdvancedActivityLogFieldDiscovery, org_id: UUID, hours_back: int) -> int: + """Process recent records with 100% coverage.""" + recent_queryset = discovery.get_activity_logs_queryset(hours_back=hours_back) + recent_count = recent_queryset.count() + + logger.info(f"Processing {recent_count} records from last {hours_back}h for org {org_id} (100% coverage)") + + for batch_num in range(0, recent_count, BATCH_SIZE): + records = [ + {"scope": record["scope"], "detail": record["detail"]} + for record in recent_queryset.values("scope", "detail")[batch_num : batch_num + BATCH_SIZE] + ] + if records: + discovery.process_batch_for_large_org(records, hours_back=hours_back) + + return recent_count + + mode = "FLUSH" if flush else f"INCREMENTAL (last {hours_back}h, 100% coverage)" + logger.info(f"[refresh_activity_log_fields_cache] running task in {mode} mode") large_org_data = ( ActivityLog.objects.values("organization_id") @@ -951,24 +998,37 @@ def refresh_activity_log_fields_cache() -> None: org_count = len(large_orgs) logger.info(f"[refresh_activity_log_fields_cache] processing {org_count} large organizations") + processed_orgs = 0 + total_recent_records = 0 + for org in large_orgs: try: discovery = AdvancedActivityLogFieldDiscovery(org.id) - record_count = discovery._get_org_record_count() - estimated_sampled_records = int(record_count * (SAMPLING_PERCENTAGE / 100)) - total_batches = (estimated_sampled_records + BATCH_SIZE - 1) // BATCH_SIZE + if flush: + _process_org_with_flush(discovery, org.id) + else: + recent_count = _process_org_incremental(discovery, org.id, hours_back) + total_recent_records += recent_count - for batch_num in range(total_batches): - offset = batch_num * BATCH_SIZE - discovery.process_batch_for_large_org(offset, BATCH_SIZE) + processed_orgs += 1 except Exception as e: logger.exception( "Failed to refresh activity log fields cache for org", org_id=org.id, + mode=mode, error=e, ) capture_exception(e) - logger.info(f"[refresh_activity_log_fields_cache] completed for {org_count} organizations") + if not flush: + logger.info( + f"[refresh_activity_log_fields_cache] completed for {processed_orgs}/{org_count} organizations " + f"in {mode} mode. Total recent records processed: {total_recent_records}" + ) + else: + logger.info( + f"[refresh_activity_log_fields_cache] completed flush and rebuild for " + f"{processed_orgs}/{org_count} organizations" + ) From c2708c6b89faeabf6c8b928a7aae379d94fbf46d Mon Sep 17 00:00:00 2001 From: Yasen Slavov Date: Mon, 29 Sep 2025 16:42:01 +0300 Subject: [PATCH 2/2] Lint --- posthog/api/advanced_activity_logs/field_discovery.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/posthog/api/advanced_activity_logs/field_discovery.py b/posthog/api/advanced_activity_logs/field_discovery.py index 4dda11d05c26c..f20c1b80d4d47 100644 --- a/posthog/api/advanced_activity_logs/field_discovery.py +++ b/posthog/api/advanced_activity_logs/field_discovery.py @@ -162,11 +162,10 @@ def process_batch_for_large_org(self, records: list[dict], hours_back: int | Non else: static_filters = new_static_filters else: - static_filters = ( - existing_cache.get("static_filters") - if existing_cache - else self._get_static_filters(self._get_base_queryset()) - ) + if existing_cache and existing_cache.get("static_filters"): + static_filters = existing_cache["static_filters"] + else: + static_filters = self._get_static_filters(self._get_base_queryset()) cache_data = { "static_filters": static_filters,