@@ -10,6 +10,7 @@ import (
1010 "github.com/Masterminds/squirrel"
1111 "github.com/checkmarble/marble-backend/models"
1212 "github.com/checkmarble/marble-backend/models/analytics"
13+ "github.com/checkmarble/marble-backend/repositories/dbmodels"
1314 "github.com/cockroachdb/errors"
1415 "github.com/google/uuid"
1516)
@@ -263,6 +264,88 @@ func AnalyticsCopyScreenings(ctx context.Context, exec AnalyticsExecutor, req An
263264 return int (nRows ), nil
264265}
265266
267+ func AnalyticsCopyCaseEvents (ctx context.Context , exec AnalyticsExecutor , req AnalyticsCopyRequest ) (int , error ) {
268+ cte := WithCtesRaw ("q" , func (b squirrel.StatementBuilderType ) squirrel.SelectBuilder {
269+ q := b .Select (
270+ "ce.id" ,
271+ "ce.org_id" ,
272+ "d.scenario_id" ,
273+ "dr.rule_id as rule_id" ,
274+ "ce.case_id as case_id" ,
275+ "ce.new_value as outcome" ,
276+ "ce.created_at as created_at" ,
277+ "d.trigger_object_type" ,
278+ "row_number() over (partition by ce.case_id order by ce.created_at desc) rnk" ,
279+ ).
280+ From (dbmodels .TABLE_CASE_EVENTS + " ce" ).
281+ InnerJoin (dbmodels .TABLE_CASES + " c on c.id = ce.case_id" ).
282+ InnerJoin (dbmodels .TABLE_DECISIONS + " d on d.case_id = c.id" ).
283+ InnerJoin (dbmodels .TABLE_DECISION_RULES + " dr on dr.decision_id = d.id" ).
284+ Where ("ce.org_id = ?" , req .OrgId ).
285+ Where ("ce.event_type = 'outcome_updated'" ).
286+ Where ("c.status = 'closed'" ).
287+ Where ("d.trigger_object_type = ?" , req .TriggerObject ).
288+ Where ("ce.created_at < ?" , req .EndTime ).
289+ OrderBy ("ce.created_at, ce.id" ).
290+ Limit (uint64 (req .Limit ))
291+
292+ if req .Watermark != nil {
293+ q = q .Where ("(ce.created_at, ce.id) > (?::timestamp with time zone, ?)" ,
294+ req .Watermark .WatermarkTime , req .Watermark .WatermarkId )
295+ }
296+
297+ return q
298+ })
299+
300+ inner := squirrel .
301+ Select (
302+ "q.id" ,
303+ "q.scenario_id" ,
304+ "q.rule_id" ,
305+ "q.case_id" ,
306+ "q.outcome" ,
307+ "q.created_at" ,
308+ "q.org_id" ,
309+ "extract(year from q.created_at)::int as year" ,
310+ "extract(month from q.created_at)::int as month" ,
311+ "q.trigger_object_type" ,
312+ ).
313+ From ("q" ).
314+ PrefixExpr (cte ).
315+ InnerJoin (dbmodels .TABLE_DECISIONS + " d on d.case_id = q.case_id" ).
316+ Where ("q.rnk = 1" )
317+
318+ for _ , f := range req .TriggerObjectFields {
319+ inner = analyticsAddTriggerObjectField (inner , f , false )
320+ }
321+ for _ , f := range req .ExtraDbFields {
322+ inner = analyticsAddExtraField (inner , f , false )
323+ }
324+
325+ innerSql , args , err := inner .ToSql ()
326+ if err != nil {
327+ return 0 , err
328+ }
329+
330+ unsafeQuery , err := unsafeBuildSqlQuery (innerSql , args )
331+ if err != nil {
332+ return 0 , err
333+ }
334+
335+ query := fmt .Sprintf (`copy ( select * from postgres_query(?, ?) ) to '%s' (format parquet, compression zstd, partition_by (org_id, year, month, trigger_object_type), append)` , req .Table )
336+
337+ result , err := exec .ExecContext (ctx , query , "pg" , unsafeQuery )
338+ if err != nil {
339+ return 0 , err
340+ }
341+ nRows , err := result .RowsAffected ()
342+ if err != nil {
343+ return 0 , err
344+ }
345+
346+ return int (nRows ), nil
347+ }
348+
266349func analyticsAddTriggerObjectField (b squirrel.SelectBuilder , field models.Field , anyValue bool ) squirrel.SelectBuilder {
267350 sqlType := "text"
268351
0 commit comments