diff --git a/ami/main/admin.py b/ami/main/admin.py index a66bf8637..76b996507 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -10,6 +10,7 @@ import ami.utils from ami import tasks +from ami.main.models import group_images_into_events, update_calculated_fields_for_events from ami.ml.models.project_pipeline_config import ProjectPipelineConfig from ami.ml.tasks import remove_duplicate_classifications @@ -20,6 +21,7 @@ Detection, Device, Event, + EventQuerySet, Occurrence, Project, S3StorageSource, @@ -43,7 +45,7 @@ class AdminBase(admin.ModelAdmin): readonly_fields = ("created_at", "updated_at") @admin.action(description="Save selected instances in the background") - def save_async(self, request: HttpRequest, queryset: QuerySet[SourceImage]) -> None: + def save_async(self, request: HttpRequest, queryset: QuerySet[Any]) -> None: app_label = self.model._meta.app_label model_name = self.model._meta.model_name assert app_label and model_name, "Model must have app_label and model_name" @@ -239,13 +241,35 @@ def duration_display(self, obj) -> str: # Save all events in queryset @admin.action(description="Updated pre-calculated fields") def update_calculated_fields(self, request: HttpRequest, queryset: QuerySet[Event]) -> None: - from ami.main.models import update_calculated_fields_for_events - update_calculated_fields_for_events(qs=queryset) self.message_user(request, f"Updated {queryset.count()} events.") + @admin.action() + def dissociate_related_objects(self, request: HttpRequest, queryset: EventQuerySet) -> None: + """ + Remove source images and occurrences from events. + + This is useful when you want to recalculate events from source images. + """ + queryset.dissociate_related_objects() + self.message_user(request, f"Dissociated {queryset.count()} events from captures and occurrences.") + + @admin.action(description="Fix sessions by regrouping images") + def fix_sessions(self, request: HttpRequest, queryset: EventQuerySet) -> None: + queryset.dissociate_related_objects() + # Get unique deployments from the selected events + deployments = Deployment.objects.filter(events__in=queryset).distinct() + + # Regroup images for each deployment + for deployment in deployments: + # use_existing=False to consider regrouping all images, + # not just images without an assigned event (newly add images) + group_images_into_events(deployment, use_existing=False) + + self.message_user(request, f"Fixed sessions: regrouped images in {len(deployments)} deployment(s).") + list_filter = ("deployment", "project", "start") - actions = [update_calculated_fields] + actions = [fix_sessions, dissociate_related_objects, update_calculated_fields] @admin.register(SourceImage) @@ -274,10 +298,7 @@ class SourceImageAdmin(AdminBase): "collections", ) - search_fields = ( - "id", - "path", - ) + search_fields = ("id", "path", "event__start__date") def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: return super().get_queryset(request).select_related("event", "deployment", "deployment__data_source") diff --git a/ami/main/management/commands/recalculate_events.py b/ami/main/management/commands/recalculate_events.py new file mode 100644 index 000000000..7dacd730e --- /dev/null +++ b/ami/main/management/commands/recalculate_events.py @@ -0,0 +1,710 @@ +""" +Django management command to update existing events using improved clustering logic. + +PURPOSE: +This command addresses data quality issues in existing event groupings by finding events +that are unusually long (indicating poor clustering) and recreating all events for those +deployments using the current, improved grouping algorithm. + +BACKGROUND PROBLEMS: +The original event grouping logic had several issues: +1. Multiple events on the same day were grouped into a single event +2. Events spanning multiple days when they should be separate sessions + +TYPICAL MONITORING PATTERNS: +Most camera trap deployments for nocturnal insects follow predictable patterns: +- Regular overnight sessions (evening to next morning) - these should span 2 calendar days +- Consistent start/end times within each deployment +- Occasional short daytime events for testing/maintenance +- Similar duration events within the same deployment + +WHAT THIS COMMAND DOES: +1. Identifies deployments with unusually long events (default >8 hours) +2. Shows comprehensive before/after statistics for transparency +3. Recreates ALL events in those deployments using improved logic +4. Provides detailed analysis of changes including: + - Event count and duration statistics + - Multi-day events (spanning >2 calendar days) + - Daily event distribution analysis + - Outlier detection (events >2σ from mean duration) + - Empty events that would be deleted + +USE CASES: +- Audit existing data quality before making changes (--dry-run) +- Update problematic deployments after algorithm improvements +- Validate that event grouping matches expected monitoring schedules +- Identify deployments that may need manual review + +SAFETY FEATURES: +- Dry-run mode shows all changes without applying them +- Project filtering to process specific datasets +- Comprehensive logging of all changes +- Before/after statistics for validation +""" + +import logging +import statistics +import typing +from collections import defaultdict +from datetime import timedelta + +from django.core.management.base import BaseCommand +from django.db import models, transaction +from django.db.models import Count, Sum + +from ...models import Deployment, Event, Project + +logger = logging.getLogger(__name__) + + +def calculate_event_stats(events: list[Event]) -> dict[str, typing.Any]: + """Calculate statistics for a list of events. + + Args: + events: List of Event objects (already evaluated) + + Returns: + Dictionary with event statistics + """ + if not events: + return {"count": 0} + + durations = [] + captures_counts = [] + daily_events = defaultdict(list) + multi_day_events = [] + empty_events = [] + + for event in events: + if event.end and event.start: + duration = event.end - event.start + duration_hours = duration.total_seconds() / 3600 + durations.append(duration_hours) + + # Check for multi-day events (spanning MORE than 2 calendar days) + # Normal overnight monitoring should span exactly 2 days (evening to next morning) + days_spanned = (event.end.date() - event.start.date()).days + 1 + if days_spanned > 2: + multi_day_events.append(event) + + # Group by date (using start date) + daily_events[event.start.date()].append(event) + + captures_count = event.captures_count or 0 + captures_counts.append(captures_count) + + if captures_count == 0: + empty_events.append(event) + + stats = { + "count": len(events), + "empty_events": empty_events, + "multi_day_events": multi_day_events, + "daily_events": daily_events, + } + + if durations: + stats.update( + { + "avg_duration_hours": statistics.mean(durations), + "std_duration_hours": statistics.stdev(durations) if len(durations) > 1 else 0, + "min_duration_hours": min(durations), + "max_duration_hours": max(durations), + } + ) + + # Find outliers (events more than 2 standard deviations from mean) + if len(durations) > 2: + mean_duration = stats["avg_duration_hours"] + std_duration = stats["std_duration_hours"] + outlier_threshold = 2 * std_duration + outliers = [] + for i, event in enumerate(events): + if event.end and event.start: + duration_hours = (event.end - event.start).total_seconds() / 3600 + if abs(duration_hours - mean_duration) > outlier_threshold: + outliers.append(event) + stats["outliers"] = outliers + else: + stats["outliers"] = [] + + if captures_counts: + stats.update( + { + "avg_captures": statistics.mean(captures_counts), + "std_captures": statistics.stdev(captures_counts) if len(captures_counts) > 1 else 0, + "min_captures": min(captures_counts), + "max_captures": max(captures_counts), + } + ) + + return stats + + +def format_stats_output(stats: dict[str, typing.Any], label: str) -> str: + """Format statistics for display.""" + if not stats or stats["count"] == 0: + return f"{label}: No events" + + lines = [f"{label}:"] + lines.append(f" Count: {stats['count']}") + + if "avg_duration_hours" in stats: + lines.append( + f" Duration: avg={stats['avg_duration_hours']:.1f}h, " + f"std={stats['std_duration_hours']:.1f}h, " + f"range={stats['min_duration_hours']:.1f}h-{stats['max_duration_hours']:.1f}h" + ) + + if "avg_captures" in stats: + lines.append( + f" Captures: avg={stats['avg_captures']:.0f}, " + f"std={stats['std_captures']:.0f}, " + f"range={stats['min_captures']}-{stats['max_captures']}" + ) + + if stats.get("empty_events"): + lines.append(f" Empty events (0 captures): {len(stats['empty_events'])}") + + if stats.get("multi_day_events"): + lines.append(f" Multi-day events (>2 days): {len(stats['multi_day_events'])}") + + if stats.get("outliers"): + lines.append(f" Outliers (>2σ from mean): {len(stats['outliers'])}") + + return "\n".join(lines) + + +class Command(BaseCommand): + help = ( + "Update existing events using improved clustering logic. " + "Finds deployments with unusually long events (indicating poor grouping) " + "and recreates ALL events for those deployments. Provides comprehensive " + "before/after analysis including statistics, outliers, and multi-day events. " + "Use --dry-run to audit data quality without making changes." + ) + + def add_arguments(self, parser) -> None: + parser.add_argument( + "--duration-hours", + type=int, + default=8, + help="Minimum duration in hours for events to be considered for updating (default: 8)", + ) + parser.add_argument( + "--project", + type=str, + help="Filter to a specific project by ID", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be changed without making any changes", + ) + parser.add_argument( + "--max-time-gap", + type=int, + default=120, + help="Maximum time gap in minutes between images to group into the same event (default: 120)", + ) + parser.add_argument( + "--yes", + action="store_true", + help="Skip confirmation prompt and proceed automatically", + ) + + def handle(self, *args: typing.Any, **options: typing.Any) -> None: + duration_hours = options["duration_hours"] + project_filter = options["project"] + dry_run = options["dry_run"] + max_time_gap_minutes = options["max_time_gap"] + skip_confirmation = options["yes"] + + self.stdout.write( + self.style.WARNING(f"{'DRY RUN: ' if dry_run else ''}Finding events longer than {duration_hours} hours...") + ) + + # Build the query for long events with optimized select_related + long_events_query = ( + Event.objects.annotate(duration=models.F("end") - models.F("start")) + .filter(duration__gte=timedelta(hours=duration_hours)) + .select_related("deployment", "deployment__project") + ) + + # Apply project filter if specified + if project_filter: + try: + # Try to parse as ID first + project_id = int(project_filter) + project = Project.objects.get(pk=project_id) + except (ValueError, Project.DoesNotExist): + # Not an ID or doesn't exist, treat as name + try: + project = Project.objects.get(name=project_filter) + except Project.DoesNotExist: + self.stdout.write(self.style.ERROR(f"Project '{project_filter}' not found.")) + return + + long_events_query = long_events_query.filter(deployment__project=project) + self.stdout.write(f"Filtering to project: {project}") + + # Check count before evaluating queryset + long_events_count = long_events_query.count() + if long_events_count == 0: + self.stdout.write("No events found matching the criteria.") + return + + self.stdout.write(f"Found {long_events_count} events longer than {duration_hours} hours:") + + # Get deployments with long events using a more efficient approach + # Now get the actual long events grouped by deployment + long_events_by_deployment = defaultdict(list) + for event in long_events_query: + if event.deployment: # Check if deployment exists + long_events_by_deployment[event.deployment].append(event) + + self.stdout.write(f"\nThese events belong to {len(long_events_by_deployment)} deployment(s):") + + # Show before state - optimize to reduce queries + total_events_before = 0 + total_captures_before = 0 + + for deployment, long_events_list in long_events_by_deployment.items(): + self.stdout.write(f"\nDeployment: {deployment} ({deployment.project or 'No project'})") + self.stdout.write(f" Long events: {len(long_events_list)}") + + for event in long_events_list: + # Calculate duration properly + if event.end and event.start: + duration = event.end - event.start + duration_hours_actual = duration.total_seconds() / 3600 + else: + duration_hours_actual = 0 + + self.stdout.write( + f" - Event {event.pk}: {event.start} to {event.end} " + f"({duration_hours_actual:.1f} hours, {event.captures_count or 0} captures)" + ) + + # Get aggregated stats for this deployment in one query + deployment_stats = Event.objects.filter(deployment=deployment).aggregate( + event_count=Count("id"), total_captures=Sum("captures_count") + ) + + event_count = deployment_stats["event_count"] or 0 + captures_count = deployment_stats["total_captures"] or 0 + + total_events_before += event_count + total_captures_before += captures_count + + self.stdout.write(f" Total events in deployment: {event_count}") + self.stdout.write(f" Total captures in deployment: {captures_count}") + + if dry_run: + self.stdout.write(self.style.WARNING("\nDRY RUN: Analyzing what changes would be made...")) + # Show detailed analysis for each deployment without making changes + for deployment, long_events_list in long_events_by_deployment.items(): + self._analyze_deployment_preview(deployment, max_time_gap_minutes) + + self.stdout.write( + self.style.WARNING("\nDRY RUN completed. Run without --dry-run to actually update events.") + ) + return + + # Ask for user confirmation before proceeding + if not skip_confirmation: + self.stdout.write( + self.style.WARNING( + f"\nAbout to process {len(long_events_by_deployment)} deployment(s). " + "This will recreate some events. Only empty events are deleted." + ) + ) + + confirm = input("Do you want to continue? [y/N]: ").lower().strip() + if confirm not in ["y", "yes"]: + self.stdout.write("Operation cancelled.") + return + else: + self.stdout.write( + self.style.SUCCESS(f"\nProceeding automatically with {len(long_events_by_deployment)} deployment(s).") + ) + + # Process each deployment + self.stdout.write(self.style.SUCCESS("\nProcessing deployments...")) + + total_events_after = 0 + total_captures_after = 0 + + for deployment, long_events_list in long_events_by_deployment.items(): + with transaction.atomic(): # Wrap each deployment processing in a transaction + events_after_count, captures_after_count = self._process_deployment(deployment, max_time_gap_minutes) + total_events_after += events_after_count + total_captures_after += captures_after_count + + # Summary + self.stdout.write(self.style.SUCCESS("\nSummary:")) + self.stdout.write(f" Total events: {total_events_before} -> {total_events_after}") + self.stdout.write(f" Total captures: {total_captures_before} -> {total_captures_after}") + + events_diff = total_events_after - total_events_before + if events_diff > 0: + self.stdout.write(f" Events created: +{events_diff}") + elif events_diff < 0: + self.stdout.write(f" Events removed: {events_diff}") + else: + self.stdout.write(" Events count unchanged") + + self.stdout.write(self.style.SUCCESS("Event update completed!")) + + def _analyze_deployment_preview(self, deployment: Deployment, max_time_gap_minutes: int) -> None: + """Analyze what changes would happen to a deployment without making them.""" + self.stdout.write(f"\n Preview for deployment: {deployment}") + + # Get current stats using optimized queries + events_current_qs = Event.objects.filter(deployment=deployment) + + current_stats = events_current_qs.aggregate(event_count=Count("id"), total_captures=Sum("captures_count")) + + events_current_count = current_stats["event_count"] or 0 + captures_current = current_stats["total_captures"] or 0 + + self.stdout.write(f" Current state: {events_current_count} events, {captures_current} captures") + + if events_current_count == 0: + self.stdout.write(" No events to analyze.") + return + + # Fetch events for detailed analysis + events_current = list(events_current_qs) + + # Calculate current statistics for detailed analysis + stats_current = calculate_event_stats(events_current) + + # Show current statistics + self.stdout.write(f" {format_stats_output(stats_current, 'Current Statistics')}") + + # Analyze current issues that would be fixed + self._preview_current_issues(stats_current, max_time_gap_minutes) + + # Show what the analysis functions would report + self.stdout.write(" Expected improvements after regrouping:") + + # Analyze multi-day events that would likely be split + multi_day_current = stats_current.get("multi_day_events", []) + if multi_day_current and isinstance(multi_day_current, list): + self.stdout.write(f" - Would likely split {len(multi_day_current)} multi-day events") + for event in multi_day_current[:3]: # Show first 3 as examples + duration_hours = self._get_event_duration_hours(event) + days_spanned = (event.end.date() - event.start.date()).days + 1 if event.end and event.start else 0 + self.stdout.write(f" • Event {event.pk}: {duration_hours:.1f}h spanning {days_spanned} days") + if len(multi_day_current) > 3: + self.stdout.write(f" • ... and {len(multi_day_current) - 3} more") + + # Analyze days with multiple events + daily_events = stats_current.get("daily_events", {}) + if isinstance(daily_events, dict): + days_with_multiple = {day: events for day, events in daily_events.items() if len(events) > 1} + + if days_with_multiple: + self.stdout.write(f" - Would analyze {len(days_with_multiple)} days with multiple events") + for day, day_events in list(days_with_multiple.items())[:2]: # Show first 2 as examples + self.stdout.write(f" • {day}: {len(day_events)} events might be consolidated or split") + if len(days_with_multiple) > 2: + self.stdout.write(f" • ... and {len(days_with_multiple) - 2} more days") + + # Analyze outliers + outliers = stats_current.get("outliers", []) + if outliers and isinstance(outliers, list): + self.stdout.write(f" - Would address {len(outliers)} outlier events (>2σ from mean duration)") + + # Analyze empty events + empty_events = stats_current.get("empty_events", []) + if empty_events and isinstance(empty_events, list): + self.stdout.write(f" - Would remove {len(empty_events)} empty events (0 captures)") + + if not any( + [ + multi_day_current and isinstance(multi_day_current, list), + ( + daily_events + and isinstance(daily_events, dict) + and any(len(events) > 1 for events in daily_events.values()) + ), + outliers and isinstance(outliers, list), + empty_events and isinstance(empty_events, list), + ] + ): + self.stdout.write(" - No obvious improvements expected") + + def _preview_current_issues(self, stats_current: dict[str, typing.Any], max_time_gap_minutes: int) -> None: + """Preview current issues that would be addressed by regrouping.""" + self.stdout.write(" Current issues that would be addressed:") + + issues_found = [] + + # Check for multi-day events + multi_day_events = stats_current.get("multi_day_events", []) + if multi_day_events and isinstance(multi_day_events, list): + issues_found.append(f"Multi-day events: {len(multi_day_events)} events span >2 calendar days") + + # Check for outlier durations + outliers = stats_current.get("outliers", []) + if outliers and isinstance(outliers, list): + issues_found.append(f"Duration outliers: {len(outliers)} events >2σ from mean") + + # Check for empty events + empty_events = stats_current.get("empty_events", []) + if empty_events and isinstance(empty_events, list): + issues_found.append(f"Empty events: {len(empty_events)} events with 0 captures") + + # Check for days with multiple events (potential over-splitting) + daily_events = stats_current.get("daily_events", {}) + if isinstance(daily_events, dict): + days_with_multiple = sum(1 for events in daily_events.values() if len(events) > 1) + if days_with_multiple > 0: + issues_found.append(f"Multiple events per day: {days_with_multiple} days with >1 event") + + # Check for very long events (the original criteria) + if "avg_duration_hours" in stats_current: + avg_duration = stats_current["avg_duration_hours"] + max_duration = stats_current["max_duration_hours"] + if max_duration > 12: # Arbitrary threshold for "very long" + issues_found.append(f"Very long events: max duration {max_duration:.1f}h (avg: {avg_duration:.1f}h)") + + if issues_found: + for issue in issues_found: + self.stdout.write(f" • {issue}") + else: + self.stdout.write(" • No obvious issues detected") + + self.stdout.write(f" Regrouping will use max time gap of {max_time_gap_minutes} minutes between captures") + + def _process_deployment(self, deployment: Deployment, max_time_gap_minutes: int) -> tuple[int, int]: + """Process a single deployment, optimizing queryset usage. + + Returns: + Tuple of (events_after_count, captures_after_count) + """ + self.stdout.write(f"\nProcessing deployment: {deployment}") + + # Get before stats using optimized queries + events_before_qs = Event.objects.filter(deployment=deployment) + + # Use aggregate to get counts efficiently + before_stats = events_before_qs.aggregate(event_count=Count("id"), total_captures=Sum("captures_count")) + + events_before_count = before_stats["event_count"] or 0 + captures_before = before_stats["total_captures"] or 0 + + # Only fetch events if we need detailed analysis + events_before = list(events_before_qs) if events_before_count > 0 else [] + + # Calculate before statistics + stats_before = calculate_event_stats(events_before) + + # Store details of events before regrouping for comparison + events_before_data = {} + for event in events_before: + events_before_data[event.pk] = { + "start": event.start, + "end": event.end, + "captures_count": event.captures_count or 0, + } + + # Dissociate all events in this deployment + self.stdout.write(" Dissociating existing events...") + # Create an EventQuerySet to access dissociate_related_objects method + from ...models import EventQuerySet + + event_qs = EventQuerySet(Event, using="default").filter(deployment=deployment) + event_qs.dissociate_related_objects() + + # Regroup images into events + self.stdout.write(f" Regrouping images (max gap: {max_time_gap_minutes} minutes)...") + from ...models import group_images_into_events + + group_images_into_events( + deployment=deployment, + max_time_gap=timedelta(minutes=max_time_gap_minutes), + use_existing=False, # Regroup all images, not just new ones + ) + + # Get after stats using optimized queries + events_after_qs = Event.objects.filter(deployment=deployment) + + after_stats = events_after_qs.aggregate(event_count=Count("id"), total_captures=Sum("captures_count")) + + events_after_count = after_stats["event_count"] or 0 + captures_after = after_stats["total_captures"] or 0 + + # Only fetch events if we need detailed analysis + events_after = list(events_after_qs) if events_after_count > 0 else [] + + # Calculate after statistics + stats_after = calculate_event_stats(events_after) + + self.stdout.write(f" Events: {events_before_count} -> {events_after_count}") + self.stdout.write(f" Captures: {captures_before} -> {captures_after}") + + # Display statistics + self.stdout.write(f"\n {format_stats_output(stats_before, 'BEFORE Statistics')}") + self.stdout.write(f"\n {format_stats_output(stats_after, 'AFTER Statistics')}") + + # Categorize events as new, modified, or unchanged + self._analyze_event_changes(events_before_data, events_after) + + # Analyze day-to-day date ranges with multiple events + self._analyze_daily_events(stats_before, stats_after) + + # Analyze multi-day events that were split + self._analyze_multi_day_events(stats_before, stats_after) + + return events_after_count, captures_after + + def _analyze_event_changes( + self, events_before_data: dict[int, dict[str, typing.Any]], events_after: list[Event] + ) -> None: + """Analyze changes between before and after events.""" + new_events = [] + modified_events = [] + unchanged_events = [] + deleted_events = [] + + # Find deleted events (events that existed before but not after) + events_after_ids = {event.pk for event in events_after} + for event_pk in events_before_data: + if event_pk not in events_after_ids: + # This event was deleted (probably had 0 captures) + deleted_events.append(event_pk) + + for event in events_after: + if event.pk in events_before_data: + # Event existed before, check if it changed + before_data = events_before_data[event.pk] + if ( + before_data["start"] != event.start + or before_data["end"] != event.end + or before_data["captures_count"] != (event.captures_count or 0) + ): + modified_events.append(event) + else: + unchanged_events.append(event) + else: + # This is a new event + new_events.append(event) + + # Show details of truly new events + if new_events: + self.stdout.write(f" New events ({len(new_events)}):") + for event in new_events: + duration_hours_actual = self._get_event_duration_hours(event) + self.stdout.write( + f" - Event {event.pk}: {event.start} to {event.end} " + f"({duration_hours_actual:.1f} hours, {event.captures_count or 0} captures)" + ) + else: + self.stdout.write(" New events: None") + + # Show details of modified events + if modified_events: + self.stdout.write(f" Modified events ({len(modified_events)}):") + for event in modified_events: + before_data = events_before_data[event.pk] + duration_hours_actual = self._get_event_duration_hours(event) + self.stdout.write( + f" - Event {event.pk}: {before_data['start']} to {before_data['end']} -> " + f"{event.start} to {event.end} " + f"({duration_hours_actual:.1f} hours, {event.captures_count or 0} captures)" + ) + else: + self.stdout.write(" Modified events: None") + + if unchanged_events: + self.stdout.write(f" Unchanged events: {len(unchanged_events)}") + + # Show deleted events (events with 0 captures that were removed) + if deleted_events: + self.stdout.write(f" Deleted events (0 captures): {len(deleted_events)}") + for event_pk in deleted_events: + before_data = events_before_data[event_pk] + self.stdout.write( + f" - Event {event_pk}: {before_data['start']} to {before_data['end']} " + f"({before_data['captures_count']} captures) [DELETED]" + ) + + def _analyze_daily_events(self, stats_before: dict[str, typing.Any], stats_after: dict[str, typing.Any]) -> None: + """Analyze daily event distribution.""" + self.stdout.write("\n Day-to-day analysis:") + after_daily_events = stats_after.get("daily_events", {}) + before_daily_events = stats_before.get("daily_events", {}) + + days_with_multiple_events_after = { + day: events for day, events in after_daily_events.items() if len(events) > 1 + } + days_with_multiple_events_before = { + day: events for day, events in before_daily_events.items() if len(events) > 1 + } + + if days_with_multiple_events_after: + count = len(days_with_multiple_events_after) + self.stdout.write(f" Days with multiple events AFTER regrouping: {count}") + for day, day_events in sorted(days_with_multiple_events_after.items()): + self.stdout.write(f" {day}: {len(day_events)} events") + for event in day_events: + duration_hours = self._get_event_duration_hours(event) + self.stdout.write( + f" - {event.start.strftime('%H:%M')} to {event.end.strftime('%H:%M')} " + f"({duration_hours:.1f}h, {event.captures_count or 0} captures)" + ) + else: + self.stdout.write(" No days with multiple events after regrouping") + + if days_with_multiple_events_before: + count = len(days_with_multiple_events_before) + self.stdout.write(f" Days with multiple events BEFORE regrouping: {count}") + + def _analyze_multi_day_events( + self, stats_before: dict[str, typing.Any], stats_after: dict[str, typing.Any] + ) -> None: + """Analyze multi-day events that were split.""" + self.stdout.write("\n Multi-day event analysis:") + self.stdout.write(" (Multi-day = spanning MORE than 2 calendar days)") + self.stdout.write(" (Normal overnight monitoring spans exactly 2 days)") + multi_day_before = stats_before.get("multi_day_events", []) + multi_day_after = stats_after.get("multi_day_events", []) + + if multi_day_before: + self.stdout.write(f" Multi-day events BEFORE: {len(multi_day_before)}") + for event in multi_day_before: + duration_hours = self._get_event_duration_hours(event) + if event.end and event.start: + days_spanned = (event.end.date() - event.start.date()).days + 1 + else: + days_spanned = 0 + self.stdout.write( + f" - Event {event.pk}: {event.start.strftime('%Y-%m-%d %H:%M')} to " + f"{event.end.strftime('%Y-%m-%d %H:%M')} ({duration_hours:.1f}h, {days_spanned} days)" + ) + + if multi_day_after: + self.stdout.write(f" Multi-day events AFTER: {len(multi_day_after)}") + for event in multi_day_after: + duration_hours = self._get_event_duration_hours(event) + if event.end and event.start: + days_spanned = (event.end.date() - event.start.date()).days + 1 + else: + days_spanned = 0 + self.stdout.write( + f" - Event {event.pk}: {event.start.strftime('%Y-%m-%d %H:%M')} to " + f"{event.end.strftime('%Y-%m-%d %H:%M')} ({duration_hours:.1f}h, {days_spanned} days)" + ) + + if len(multi_day_before) > len(multi_day_after): + self.stdout.write(f" ✓ Reduced multi-day events from {len(multi_day_before)} to {len(multi_day_after)}") + + def _get_event_duration_hours(self, event: Event) -> float: + """Calculate event duration in hours.""" + if event.end and event.start: + duration = event.end - event.start + return duration.total_seconds() / 3600 + return 0 diff --git a/ami/main/migrations/0071_remove_event_unique_event_and_more.py b/ami/main/migrations/0071_remove_event_unique_event_and_more.py new file mode 100644 index 000000000..2165880de --- /dev/null +++ b/ami/main/migrations/0071_remove_event_unique_event_and_more.py @@ -0,0 +1,71 @@ +# Generated by Django 4.2.10 on 2025-07-23 05:15 + +from django.db import migrations, models + + +def populate_group_by_on_rollback(apps, schema_editor): + """Populate group_by field with start date for rollback compatibility.""" + Event = apps.get_model("main", "Event") + + # Group events by deployment to handle uniqueness + from collections import defaultdict + + deployment_counters = defaultdict(lambda: defaultdict(int)) + + for event in Event.objects.all().order_by("deployment_id", "start"): + if event.start: + date_key = event.start.date().isoformat() + deployment_counters[event.deployment_id][date_key] += 1 + counter = deployment_counters[event.deployment_id][date_key] + + # Make group_by unique by appending counter if needed + if counter == 1: + event.group_by = date_key + else: + event.group_by = f"{date_key}-{counter}" + + event.save(update_fields=["group_by"]) + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0070_alter_project_feature_flags"), + ] + + operations = [ + migrations.RemoveConstraint( + model_name="event", + name="unique_event", + ), + migrations.RemoveIndex( + model_name="event", + name="main_event_group_b_6ce666_idx", + ), + # Step 1: Make the field nullable with a default for rollback safety + migrations.AlterField( + model_name="event", + name="group_by", + field=models.CharField( + db_index=True, + help_text="A unique identifier for this event, used to group images into events.", + max_length=255, + null=True, + blank=True, + default=None, + ), + ), + # Step 2: Populate data on rollback + migrations.RunPython( + code=migrations.RunPython.noop, + reverse_code=populate_group_by_on_rollback, + ), + # Step 3: Remove the field + migrations.RemoveField( + model_name="event", + name="group_by", + ), + migrations.AddConstraint( + model_name="event", + constraint=models.UniqueConstraint(fields=("deployment", "start", "end"), name="unique_event"), + ), + ] diff --git a/ami/main/models.py b/ami/main/models.py index e797ca5f6..ef29e5b52 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -868,22 +868,48 @@ def save(self, update_calculated_fields=True, regroup_async=True, *args, **kwarg # ami.tasks.model_task.delay("Project", self.project.pk, "update_children_project") +class EventQuerySet(models.QuerySet): + def dissociate_related_objects(self): + """ + Clear all related objects from the event, including source images and occurrences. + This is useful when the event is being deleted or dissociated from its captures. + It does not delete the event itself, but removes its associations with source images and occurrences. + + This was created to reassociate source imag es and occurrences with a new event + when an event is being split into multiple events, or collapsed into one. + """ + with transaction.atomic(): + events = self.filter().distinct() + image_qs = SourceImage.objects.filter(event__in=events) + occ_qs = Occurrence.objects.filter(detections__source_image__in=image_qs).distinct() + if not image_qs.exists() and not occ_qs.exists(): + logger.info("No objects to remove from events") + return events + logger.info( + f"Dissociating {image_qs.count()} source images and {occ_qs.count()} occurrences " + f"from {events.count()} events." + ) + occ_updated_count = occ_qs.update(event=None) + # Ensure the image_qs is updated last + image_updated_count = image_qs.update(event=None) + update_calculated_fields_for_events( + qs=events, + last_updated=timezone.now(), + save=True, + ) + logger.info(f"Dissociated event from {image_updated_count} source images, {occ_updated_count} occurrences") + return events + + +class EventManager(models.Manager): + def get_queryset(self) -> EventQuerySet: + return EventQuerySet(self.model, using=self._db) + + @final class Event(BaseModel): """A monitoring session""" - group_by = models.CharField( - max_length=255, - db_index=True, - help_text=( - "A unique identifier for this event, used to group images into events. " - "This allows images to be prepended or appended to an existing event. " - "The default value is the day the event started, in the format YYYY-MM-DD. " - "However images could also be grouped by camera settings, image dimensions, hour of day, " - "or a random sample." - ), - ) - start = models.DateTimeField(db_index=True, help_text="The timestamp of the first image in the event.") end = models.DateTimeField(null=True, blank=True, help_text="The timestamp of the last image in the event.") @@ -899,14 +925,15 @@ class Event(BaseModel): occurrences_count = models.IntegerField(blank=True, null=True) calculated_fields_updated_at = models.DateTimeField(blank=True, null=True) + objects = EventManager() + class Meta: ordering = ["start"] indexes = [ - models.Index(fields=["group_by"]), models.Index(fields=["start"]), ] constraints = [ - models.UniqueConstraint(fields=["deployment", "group_by"], name="unique_event"), + models.UniqueConstraint(fields=["deployment", "start", "end"], name="unique_event"), ] def __str__(self) -> str: @@ -1005,10 +1032,6 @@ def update_calculated_fields(self, save=False, updated_timestamp: datetime.datet Important: if you update a new field, add it to the bulk_update call in update_calculated_fields_for_events """ event = self - if not event.group_by and event.start: - # If no group_by is set, use the start "day" - event.group_by = str(event.start.date()) - if not event.project and event.deployment: event.project = event.deployment.project @@ -1040,14 +1063,14 @@ def update_calculated_fields_for_events( qs: models.QuerySet[Event] | None = None, pks: list[typing.Any] | None = None, last_updated: datetime.datetime | None = None, - save=True, -): + save: bool = True, +) -> list[Event]: """ This function is called by a migration to update the calculated fields for all events. @TODO this can likely be abstracted to a more generic function that can be used for any model """ - to_update = [] + to_update: list[Event] = [] qs = qs or Event.objects.all() if pks: @@ -1058,18 +1081,17 @@ def update_calculated_fields_for_events( Q(calculated_fields_updated_at__isnull=True) | Q(calculated_fields_updated_at__lte=last_updated) ) - logging.info(f"Updating pre-calculated fields for {len(to_update)} events") - updated_timestamp = timezone.now() for event in qs: event.update_calculated_fields(save=False, updated_timestamp=updated_timestamp) to_update.append(event) - if save: + logging.info(f"Updating pre-calculated fields for {len(to_update)} events") + + if save and to_update: updated_count = Event.objects.bulk_update( to_update, [ - "group_by", "start", "end", "project", @@ -1106,16 +1128,47 @@ def audit_event_lengths(deployment: Deployment): logger.error(f"Found {events_ending_before_start} event(s) with start > end in deployment {deployment}") +def merge_with_following_events(event: Event, events_qs: models.QuerySet, max_time_gap: datetime.timedelta) -> None: + """ + Merge the given event with any following events in the list that overlap or are close enough + based on the max_time_gap. + """ + # Evaluate the events_qs to be able to get the index of the current event + events = list(events_qs) + current_index = events.index(event) + remaining_events = events[current_index + 1 :] # noqa + for next_event in remaining_events: + if next_event.start > event.end + max_time_gap: + break # No further overlapping or close events possible + + if ami.utils.dates.time_ranges_overlap_or_close( + event.start, + event.end, + next_event.start, + next_event.end, + max_time_gap, + ): + SourceImage.objects.filter(event=next_event).update(event=event) + event.end = max(event.end, next_event.end) + + logger.info(f"Finished merging events into event {event.pk}") + + def group_images_into_events( - deployment: Deployment, max_time_gap=datetime.timedelta(minutes=120), delete_empty=True + deployment: Deployment, + max_time_gap=datetime.timedelta(minutes=120), + delete_empty=True, + use_existing=True, ) -> list[Event]: - # Log a warning if multiple SourceImages have the same timestamp + logger.info(f"Grouping images into events for deployment '{deployment}' (use_existing={use_existing})") + + # Log duplicate timestamps dupes = ( SourceImage.objects.filter(deployment=deployment) .values("timestamp") .annotate(count=models.Count("id")) .filter(count__gt=1) - .exclude(timestamp=None) + .exclude(timestamp__isnull=True) ) if dupes.count(): values = "\n".join( @@ -1125,69 +1178,104 @@ def group_images_into_events( f"Found {len(values)} images with the same timestamp in deployment '{deployment}'. " f"Only one image will be used for each timestamp for each event." ) + # Get all images + image_qs = SourceImage.objects.filter(deployment=deployment).exclude(timestamp=None) + if use_existing: + # Get only newly added images (images without an event) + image_qs = image_qs.filter(event__isnull=True) + + if not image_qs.exists(): + logger.info("No relevant images found; skipping") + return [] - image_timestamps = list( - SourceImage.objects.filter(deployment=deployment) - .exclude(timestamp=None) - .values_list("timestamp", flat=True) - .order_by("timestamp") - .distinct() - ) - - timestamp_groups = ami.utils.dates.group_datetimes_by_gap(image_timestamps, max_time_gap) - # @TODO this event grouping needs testing. Still getting events over 24 hours - # timestamp_groups = ami.utils.dates.group_datetimes_by_shifted_day(image_timestamps) - - events = [] + # Group timestamps + timestamps = list(image_qs.values_list("timestamp", flat=True).distinct()) + timestamp_groups = ami.utils.dates.group_datetimes_by_gap(timestamps, max_time_gap) + # Get existing events for this deployment ordered by start time + existing_events_qs = Event.objects.filter(deployment=deployment).order_by("start") + events: list[Event] = [] + + # For each group of images check if we can merge with an existing + # event based on overlapping or proximity if use_existing is True. + # Otherwise if use is_existing is False, we look for an existing event + # with the exact same start and end times and reuse it, + # if not found create a new event. for group in timestamp_groups: - if not len(group): - continue - - start_date = group[0] - end_date = group[-1] - - # Print debugging info about groups - delta = end_date - start_date - hours = round(delta.seconds / 60 / 60, 1) - logger.debug( - f"Found session starting at {start_date} with {len(group)} images that ran for {hours} hours.\n" - f"From {start_date.strftime('%c')} to {end_date.strftime('%c')}." - ) - - # Creating events & assigning images - group_by = start_date.date() - event, _ = Event.objects.get_or_create( - deployment=deployment, - group_by=group_by, - defaults={"start": start_date, "end": end_date}, - ) - events.append(event) - SourceImage.objects.filter(deployment=deployment, timestamp__in=group).update(event=event) - event.save() # Update start and end times and other cached fields - logger.info( - f"Created/updated event {event} with {len(group)} images for deployment {deployment}. " - f"Duration: {event.duration_label()}" - ) + group_start, group_end = group[0], group[-1] + group_set = set(group) + group_image_ids = image_qs.filter(timestamp__in=group_set).values_list("pk", flat=True) + + event = None + if use_existing: + # Look for overlap or proximity + for existing_event in existing_events_qs: + existing_event.refresh_from_db(fields=["start", "end"]) + assert existing_event.end is not None, "Existing event end time should not be None" + if ami.utils.dates.time_ranges_overlap_or_close( + group_start, group_end, existing_event.start, existing_event.end, max_time_gap + ): + event = existing_event + break + else: + # Look for exact match + event = Event.objects.filter( + deployment=deployment, + start=group_start, + end=group_end, + ).first() + + if event: + # Adjust times if necessary (merge) + assert event.end is not None, "Event end time should not be None if event exists" + event.start = min(event.start, group_start) + event.end = max(event.end, group_end) + + # We do not need to do this if use_existing is False + # because we will either have and exact match or a new event. + if use_existing: + # Try to merge with following events if there are any overlapping or close enough events + merge_with_following_events(event, existing_events_qs, max_time_gap) - logger.info( - f"Done grouping {len(image_timestamps)} captures into {len(events)} events " f"for deployment {deployment}" - ) + else: + # Create new event + event = Event.objects.create( + deployment=deployment, + start=group_start, + end=group_end, + ) + logger.info(f"Created new event {event} with {len(group_image_ids)} images") - if delete_empty: - logger.info("Deleting empty events for deployment") - delete_empty_events(deployment=deployment) + SourceImage.objects.filter(id__in=group_image_ids).update(event=event) + event.save() + events.append(event) + # Process each event: set dimensions and update occurrences for event in events: - # Set the width and height of all images in each event based on the first image logger.info(f"Setting image dimensions for event {event}") set_dimensions_for_collection(event) + logger.info("Updating related occurrences for images in event") + # @TODO can occurrences stay up-to-date with the event via a signal? (efficiently?) + Occurrence.objects.filter(detections__source_image__event=event).update( + event=event, + ) + + # Delete empty events after occurrences have been updated + # This ensures that events with occurrences that pointed to old/incorrect events + # are properly cleaned up after the occurrences have been moved to correct events + if delete_empty: + logger.info("Deleting empty events for deployment (after occurrence updates)") + delete_empty_events(deployment=deployment) + logger.info("Updating relevant cached fields on deployment") - deployment.events_count = len(events) + deployment.events_count = Event.objects.filter(deployment=deployment).count() deployment.save(update_calculated_fields=False, update_fields=["events_count"]) audit_event_lengths(deployment) + audit_event_lengths(deployment) + + logger.info(f"Finished grouping {len(timestamps)} images into {len(events)} events for deployment '{deployment}'") return events @@ -1725,6 +1813,13 @@ def update_calculated_fields(self, save=False): self.project = self.deployment.project if self.pk is not None: self.detections_count = self.get_detections_count() + # This is another approach to keep related occurrences up-to-date. + # But it is not used currently because it can be inefficient + # occurrences_with_incorrect_event = Occurrence.objects.filter(detection__source_image=self).exclude( + # event=self.event + # ) + # if occurrences_with_incorrect_event.exists(): + # occurrences_with_incorrect_event.update(event=self.event) if save: self.save(update_calculated_fields=False) diff --git a/ami/main/tests.py b/ami/main/tests.py index 51329b232..642f83bec 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -30,7 +30,15 @@ ) from ami.ml.models.pipeline import Pipeline from ami.ml.models.project_pipeline_config import ProjectPipelineConfig -from ami.tests.fixtures.main import create_captures, create_occurrences, create_taxa, setup_test_project +from ami.tests.fixtures.main import ( + create_captures, + create_captures_in_range, + create_deployment, + create_occurrences, + create_storage_source, + create_taxa, + setup_test_project, +) from ami.tests.fixtures.storage import populate_bucket from ami.users.models import User from ami.users.roles import BasicMember, Identifier, ProjectManager @@ -219,14 +227,17 @@ def test_grouping(self): interval_minutes=10, ) + self.assertEqual(self.deployment.captures.count(), num_nights * images_per_night) + events = group_images_into_events( deployment=self.deployment, max_time_gap=datetime.timedelta(hours=2), + use_existing=False, ) - assert len(events) == num_nights + self.assertEqual(len(events), num_nights) for event in events: - assert event.captures.count() == images_per_night + self.assertEqual(event.captures.count(), images_per_night) def test_pruning_empty_events(self): from ami.main.models import delete_empty_events @@ -262,6 +273,404 @@ def test_setting_image_dimensions(self): # print(capture.path, capture.width, capture.height) assert (capture.width == image_width) and (capture.height == image_height) + def test_grouping_merges_overlapping_groups(self): + now = datetime.datetime.now() + + # Create initial captures (4 captures 0 to 30 mins from now) + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(minutes=30), + interval_minutes=10, + keep_existing=False, + ) + + # Initial grouping with default max_time_gap (2 hours) + initial_events = group_images_into_events( + deployment=self.deployment, + use_existing=False, # force full grouping + ) + + assert len(initial_events) == 1 + initial_event = initial_events[0] + initial_capture_count = initial_event.captures.count() + + # Create overlapping captures (4 captures 25 to 55 mins from now ) + create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(minutes=25), + end_time=now + datetime.timedelta(minutes=55), + interval_minutes=10, + ) + + # Group again using use_existing=True (only new captures will be merged) + merged_events = group_images_into_events( + deployment=self.deployment, + use_existing=True, # merge behavior + ) + + # Assert that the same event was merged into + assert len(merged_events) == 1 # only one updated event + assert merged_events[0] == initial_event, "Expected the same event to be updated with new captures" + initial_event.refresh_from_db(fields=["start", "end"]) + updated_capture_count = initial_event.captures.count() + + assert updated_capture_count == initial_capture_count + 4 # 4 new overlapping captures + # Assert that the start and end times are updated correctly + assert initial_event.start == now + assert initial_event.end >= now + datetime.timedelta(minutes=55) + + def test_grouping_merges_on_proximity(self): + now = datetime.datetime.now() + + # Create first group: 0 to 30 minutes (4 captures) + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(minutes=30), + interval_minutes=10, + keep_existing=False, + ) + + # Group first batch (force new event) + initial_events = group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + assert len(initial_events) == 1 + initial_event = initial_events[0] + initial_capture_count = initial_event.captures.count() + # Assert that the first group has 4 captures + assert initial_capture_count == 4 + + # Create second group: starts just under 2 hours after first group ends + # First group ends at now + 30 min => second group starts at now + 1 hr 50 min + create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=1, minutes=50), + end_time=now + datetime.timedelta(hours=2, minutes=20), + interval_minutes=10, + ) + + # Group new captures + merged_events = group_images_into_events( + deployment=self.deployment, + use_existing=True, + ) + + # The proximity between first group's end and second group's start + # is 1h 20min, which is within the default 2h . We should merge + assert len(merged_events) == 1 # one event was updated + + initial_event.refresh_from_db(fields=["start", "end"]) + updated_capture_count = initial_event.captures.count() + + assert updated_capture_count == initial_capture_count + 4 # 4 new images + assert initial_event.end >= now + datetime.timedelta(hours=2, minutes=20) + + def test_grouping_does_not_merge_when_proximity_exceeds_default_gap(self): + now = datetime.datetime.now() + + # Create first group: 0 to 30 minutes + initial_captures = create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(minutes=30), + interval_minutes=10, + keep_existing=False, + ) + + # Group first batch (fresh start) + initial_events = group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + assert len(initial_events) == 1 + + # Create second group: starts just after 2 hours from first group's end + # First group ends at now + 30 min. Second group starts at now + 2 hr 31 min + second_event_captures = create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=2, minutes=31), + end_time=now + datetime.timedelta(hours=3), + interval_minutes=10, + ) + + # Group new captures (should not merge due to exceeding time gap) + group_images_into_events( + deployment=self.deployment, + use_existing=True, + ) + + # Check that a new event was created instead of merging + all_events = Event.objects.filter(deployment=self.deployment).order_by("start") + assert all_events.count() == 2 + + first_event, second_event = all_events + assert first_event.captures.count() == len(initial_captures) + assert second_event.captures.count() == len(second_event_captures) + assert second_event.start == now + datetime.timedelta(hours=2, minutes=31) + assert second_event.end == now + datetime.timedelta(hours=2, minutes=51) + + def test_grouping_exact_overlap(self): + now = datetime.datetime.now() + + # Create initial captures (0 to 30 min) + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(minutes=30), + interval_minutes=10, + keep_existing=False, + ) + + # Group initial captures + initial_events = group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + assert len(initial_events) == 1 + initial_event = initial_events[0] + initial_capture_count = initial_event.captures.count() + + # Create new captures with exact same timestamps (overlap) + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(minutes=30), + interval_minutes=5, + ) + + # Group again with use_existing=False + group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + + # Assertions + all_events = Event.objects.filter(deployment=self.deployment) + assert all_events.count() == 1 # no new event created + updated_event = all_events.first() + assert updated_event is not None + # assert that updated_event is the same as initial_event + assert updated_event == initial_event + assert updated_event.pk == initial_event.pk + assert ( + updated_event.captures.count() == initial_capture_count + 3 + ) # 3 new images are added (not considered duplicates) + + def test_regroup_with_different_time_gap(self): + """ + Scenario: There are already grouped events using a certain time gap. + Then we re-group with a different (smaller) time gap using use_existing=False. + Expectation: + - All previous groupings are overridden. + - Events match new grouping. + - No empty events are left. + """ + now = datetime.datetime.now() + + # Initial setup: create 6 images, 15 mins apart => total 75 mins + first_batch_start = now + first_batch_end = now + datetime.timedelta(minutes=75) + create_captures_in_range( + deployment=self.deployment, + start_time=first_batch_start, + end_time=first_batch_end, + interval_minutes=15, + keep_existing=False, + ) + gap_between_batches = datetime.timedelta(minutes=45) + second_batch_start = first_batch_end + gap_between_batches + second_batch_end = second_batch_start + datetime.timedelta(minutes=75) + + create_captures_in_range( + deployment=self.deployment, + start_time=second_batch_start, + end_time=second_batch_end, + interval_minutes=15, + keep_existing=True, + ) + + # Group with large time gap (2 hours) + group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(hours=2), + use_existing=False, + ) + assert Event.objects.filter(deployment=self.deployment).count() == 1 + + # Re-group with smaller time gap (30 mins) + group_images_into_events( + deployment=self.deployment, + max_time_gap=datetime.timedelta(minutes=30), + use_existing=False, + ) + + events = Event.objects.filter(deployment=self.deployment) + images = SourceImage.objects.filter(deployment=self.deployment) + + # Should split into two events + assert events.count() == 2, "Expected two events after regrouping with smaller time gap" + # No empty events left + for event in events: + assert event.captures.exists(), f"Event {event.pk} should not be empty after regrouping" + # All images reassigned + assert ( + images.exclude(event__isnull=True).count() == images.count() + ), "All images should be assigned to an event after regrouping" + + def test_full_regroup_after_adding_new_overlapping_images(self): + now = datetime.datetime.now() + + # Create first batch: 4 images (10 min apart) + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(hours=1), + interval_minutes=10, + keep_existing=False, + ) + logger.info("First batch of images created.") + group_images_into_events(deployment=self.deployment, use_existing=False) + old_events = Event.objects.filter(deployment=self.deployment) + assert old_events.exists() + assert old_events.count() == 1, "Expected one event after first grouping" + + # Add second batch of images (new captures) that overlaps with existing events + create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(minutes=40), + end_time=now + datetime.timedelta(hours=2), + interval_minutes=10, + keep_existing=True, + ) + + # Full regroup + group_images_into_events(deployment=self.deployment, use_existing=False) + + events = Event.objects.filter(deployment=self.deployment) + # expected one event, since the new captures overlap with the existing ones + assert events.count() == 1, "Expected one event after regrouping with new overlapping images" + images = SourceImage.objects.filter(deployment=self.deployment) + + # Every image belongs to a valid event + assert images.exclude(event__isnull=True).count() == images.count() + # No empty events + for event in events: + assert event.captures.exists() + + def test_merge_with_multiple_existing_overlapping_events(self): + now = datetime.datetime.now() + + # Create first batch: 0 to 2 hour + create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(hours=2), + interval_minutes=10, + keep_existing=False, + ) + + # Create second batch: 3 to 5 hours + create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=4), + end_time=now + datetime.timedelta(hours=7), + interval_minutes=10, + keep_existing=True, + ) + + # Group all images initially (use_existing=False) + initial_events = group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + assert len(initial_events) == 2 + + # Create third batch: overlaps both (1 to 5 hours) + create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=1), + end_time=now + datetime.timedelta(hours=5), + interval_minutes=10, + keep_existing=True, + ) + + # Regroup all (use_existing=False) — should create one merged event + full_regrouped = group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + assert len(full_regrouped) == 1 + + def test_merge_with_multiple_existing_overlapping_events_use_existing_true(self): + now = datetime.datetime.now() + + # Create first batch: 0 to 2 hour + first_batch_captures = create_captures_in_range( + deployment=self.deployment, + start_time=now, + end_time=now + datetime.timedelta(hours=2), + interval_minutes=10, + keep_existing=False, + ) + + # Create second batch: 4 to 7 hours + second_batch_captures = create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=4), + end_time=now + datetime.timedelta(hours=7), + interval_minutes=10, + keep_existing=True, + ) + + # Group all images initially (use_existing=False) + group_images_into_events( + deployment=self.deployment, + use_existing=False, + ) + # Assert that there are two events + # Get events sorted by start time + initial_events = Event.objects.filter(deployment=self.deployment).order_by("start") + + assert len(initial_events) == 2 + # assert that the first event has 13 captures and start time is now, end time is now + 2 hours + first_event = initial_events[0] + assert first_event.start == now + assert first_event.end == now + datetime.timedelta(hours=2) + assert first_event.captures.count() == 13 + # assert that the second event has 19 captures and start time is now + 4 hours, end time is now + 7 hours + second_event = initial_events[1] + assert second_event.start == now + datetime.timedelta(hours=4) + assert second_event.end == now + datetime.timedelta(hours=7) + assert second_event.captures.count() == 19 + + # Create third batch: overlaps both (1 to 5 hours) + third_batch_captures = create_captures_in_range( + deployment=self.deployment, + start_time=now + datetime.timedelta(hours=1, minutes=1), + end_time=now + datetime.timedelta(hours=5, minutes=1), + interval_minutes=2 * 60, # 2 hours + keep_existing=True, + ) + + # Group only new images (use_existing=True) — should merge into one event + group_images_into_events( + deployment=self.deployment, + use_existing=True, + ) + # Assert that there is one event and it should start at now and end at now + 7 hours + merged_events = Event.objects.filter(deployment=self.deployment).order_by("start") + assert len(merged_events) == 1 + merged_event = merged_events[0] + assert merged_event.start == now + assert merged_event.end == now + datetime.timedelta(hours=7) + + assert merged_event.captures.count() == len(first_batch_captures) + len(second_batch_captures) + len( + third_batch_captures + ) + # This test is disabled because it requires certain data to be present in the database # and data in a configured S3 bucket. Will require Minio or something like it to be running. @@ -357,12 +766,14 @@ def test_event_calculated_fields_batch(self): class TestDuplicateFieldsOnChildren(TestCase): def setUp(self) -> None: - from ami.main.models import Deployment, Project + from ami.main.models import Project self.project_one = Project.objects.create(name="Test Project One") self.project_two = Project.objects.create(name="Test Project Two") - self.deployment = Deployment.objects.create(name="Test Deployment", project=self.project_one) + data_source = create_storage_source(self.project_one, "Test Data Source") + self.deployment = create_deployment(self.project_one, data_source, "Test Deployment") + assert self.deployment.data_source is not None create_captures(deployment=self.deployment) self.deployment.save(regroup_async=False) # Ensure events are grouped immediately create_taxa(project=self.project_one) @@ -1757,17 +2168,18 @@ class TestDeploymentSyncCreatesEvents(TestCase): def test_sync_creates_events_and_updates_counts(self): # Set up a new project and deployment with test data project, deployment = setup_test_project(reuse=False) - + now = datetime.datetime.now() # Populate the object store with image data assert deployment.data_source is not None populate_bucket( config=deployment.data_source.config, subdir=f"deployment_{deployment.pk}", skip_existing=False, + beginning_timestamp=now, ) # Sync captures - deployment.sync_captures() + deployment.sync_captures(regroup_events_per_batch=True) # Refresh and check results deployment.refresh_from_db() @@ -1787,10 +2199,11 @@ def test_sync_creates_events_and_updates_counts(self): num_nights=2, images_per_day=5, minutes_interval=120, + beginning_timestamp=now + datetime.timedelta(days=10), ) # Sync again - deployment.sync_captures() + deployment.sync_captures(regroup_events_per_batch=True) deployment.refresh_from_db() updated_events = Event.objects.filter(deployment=deployment) diff --git a/ami/tasks.py b/ami/tasks.py index 47abfbd09..94f9d602c 100644 --- a/ami/tasks.py +++ b/ami/tasks.py @@ -93,8 +93,9 @@ def regroup_events(deployment_id: int) -> None: deployment = Deployment.objects.get(id=deployment_id) if deployment: logger.info(f"Grouping captures for {deployment}") - events = group_images_into_events(deployment) - logger.info(f"{deployment } now has {len(events)} events") + group_images_into_events(deployment) + total_events = deployment.events.count() + logger.info(f"{deployment } now has {total_events} events") else: logger.error(f"Deployment with id {deployment_id} not found") diff --git a/ami/tests/fixtures/main.py b/ami/tests/fixtures/main.py index 689a9ecb2..2fb05ea56 100644 --- a/ami/tests/fixtures/main.py +++ b/ami/tests/fixtures/main.py @@ -138,7 +138,7 @@ def create_captures( interval_minutes: int = 10, subdir: str = "test", update_deployment: bool = True, -): +) -> list[SourceImage]: # Create some images over a few monitoring nights first_night = datetime.datetime.now() @@ -168,6 +168,51 @@ def create_captures( return created +def create_captures_in_range( + deployment: Deployment, + start_time: datetime.datetime, + end_time: datetime.datetime, + interval_minutes: int = 10, + subdir: str = "test", + keep_existing: bool = True, +): + assert start_time < end_time, "start_time must be before end_time" + assert interval_minutes > 0, "interval_minutes must be > 0" + + if not keep_existing: + SourceImage.objects.filter(deployment=deployment).delete() + + current_time = start_time + source_images = [] + + while current_time <= end_time: + prefix = f"deployment_{deployment.pk}" + path = pathlib.Path(subdir) / f"{prefix}_{current_time.strftime('%Y%m%d%H%M%S')}.jpg" + + source_images.append( + SourceImage( + deployment=deployment, + timestamp=current_time, + path=path, + last_modified=timezone.now(), + size=100, + checksum="", + checksum_algorithm="md5", + ) + ) + + current_time += datetime.timedelta(minutes=interval_minutes) + + created_images = SourceImage.objects.bulk_create( + source_images, + update_conflicts=True, + unique_fields=["deployment", "path"], # type: ignore + update_fields=["last_modified", "size", "checksum", "checksum_algorithm"], + ) + + return created_images + + def create_captures_from_files( deployment: Deployment, skip_existing=True ) -> list[tuple[SourceImage, GeneratedTestFrame]]: @@ -422,3 +467,5 @@ def create_local_admin_user(): email = os.environ.get("DJANGO_SUPERUSER_EMAIL", "Unknown") password = os.environ.get("DJANGO_SUPERUSER_PASSWORD", "Unknown") logger.info(f"Test user credentials: {email} / {password}") + password = os.environ.get("DJANGO_SUPERUSER_PASSWORD", "Unknown") + logger.info(f"Test user credentials: {email} / {password}") diff --git a/ami/tests/fixtures/storage.py b/ami/tests/fixtures/storage.py index d53c782ff..0042c90d1 100644 --- a/ami/tests/fixtures/storage.py +++ b/ami/tests/fixtures/storage.py @@ -1,3 +1,4 @@ +import datetime import io import logging @@ -45,6 +46,7 @@ def populate_bucket( images_per_day: int = 3, minutes_interval: int = 45, minutes_interval_variation: int = 10, + beginning_timestamp: datetime.datetime | None = None, skip_existing: bool = True, ) -> list[GeneratedTestFrame]: # Images need to be named with iso timestamps to be sorted correctly @@ -67,6 +69,7 @@ def populate_bucket( minutes_interval=minutes_interval, minutes_interval_variation=minutes_interval_variation, save_images=False, + beginning_timestamp=beginning_timestamp, ): # Convert the image to bytes img_byte_arr = io.BytesIO() diff --git a/ami/utils/dates.py b/ami/utils/dates.py index a1764172b..6d4f19a57 100644 --- a/ami/utils/dates.py +++ b/ami/utils/dates.py @@ -75,6 +75,22 @@ def get_image_timestamp_from_filename(img_path, raise_error=False) -> datetime.d return date +def time_ranges_overlap_or_close( + start1: datetime.datetime, + end1: datetime.datetime, + start2: datetime.datetime, + end2: datetime.datetime, + max_time_gap: datetime.timedelta, +) -> bool: + """ + Returns True if the two time ranges (start1 to end1 and start2 to end2) + overlap or are within max_time_gap of each other. + """ + overlaps = start1 <= end2 and end1 >= start2 + close_enough = abs(start1 - end2) <= max_time_gap or abs(start2 - end1) <= max_time_gap + return overlaps or close_enough + + def format_timedelta(duration: datetime.timedelta | None) -> str: """Format the duration for display. @TODO try the humanize library