@@ -273,7 +273,7 @@ func AnalyticsCopyCaseEvents(ctx context.Context, exec AnalyticsExecutor, req An
273273 // completely skip the last group, which will be picked up by the next
274274 // iteration.
275275 cte := WithCtesRaw ("q" , func (b squirrel.StatementBuilderType ) squirrel.SelectBuilder {
276- q := b .Select (
276+ q1 := b .Select (
277277 "ce.id" ,
278278 "ce.org_id" ,
279279 "d.scenario_id" ,
@@ -283,6 +283,7 @@ func AnalyticsCopyCaseEvents(ctx context.Context, exec AnalyticsExecutor, req An
283283 "ce.created_at as created_at" ,
284284 "d.trigger_object_type" ,
285285 "row_number() over (partition by ce.case_id order by ce.created_at desc) rnk" ,
286+ "dense_rank() over (order by ce.created_at, ce.id) as deduplicate" ,
286287 ).
287288 From (dbmodels .TABLE_CASE_EVENTS + " ce" ).
288289 InnerJoin (dbmodels .TABLE_CASES + " c on c.id = ce.case_id" ).
@@ -297,11 +298,13 @@ func AnalyticsCopyCaseEvents(ctx context.Context, exec AnalyticsExecutor, req An
297298 Limit (uint64 (req .Limit ))
298299
299300 if req .Watermark != nil {
300- q = q .Where ("(ce.created_at, ce.id) > (?::timestamp with time zone, ?)" ,
301+ q1 = q1 .Where ("(ce.created_at, ce.id) > (?::timestamp with time zone, ?)" ,
301302 req .Watermark .WatermarkTime , req .Watermark .WatermarkId )
302303 }
303304
304- return q
305+ return b .
306+ Select ("*" , "max(deduplicate) over() as max_deduplicate" ).
307+ FromSelect (q1 , "i" )
305308 })
306309
307310 inner := squirrel .
@@ -320,7 +323,7 @@ func AnalyticsCopyCaseEvents(ctx context.Context, exec AnalyticsExecutor, req An
320323 From ("q" ).
321324 PrefixExpr (cte ).
322325 InnerJoin (dbmodels .TABLE_DECISIONS + " d on d.case_id = q.case_id" ).
323- Where ("q.rnk = 1" )
326+ Where ("q.rnk = 1 and deduplicate < max_deduplicate " )
324327
325328 for _ , f := range req .TriggerObjectFields {
326329 inner = analyticsAddTriggerObjectField (inner , f , false )
0 commit comments