@@ -26,7 +26,7 @@ use actix_web::http::StatusCode;
2626use actix_web:: http:: header:: ContentType ;
2727use actix_web:: web:: { self , Json } ;
2828use actix_web:: { Either , FromRequest , HttpRequest , HttpResponse , Responder } ;
29- use arrow_array:: RecordBatch ;
29+ use arrow_array:: { ArrayRef , RecordBatch , StringArray , UInt64Array } ;
3030use bytes:: Bytes ;
3131use chrono:: { DateTime , Utc } ;
3232use datafusion:: error:: DataFusionError ;
@@ -50,7 +50,7 @@ use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
5050use crate :: parseable:: { DEFAULT_TENANT , PARSEABLE , StreamNotFound } ;
5151use crate :: query:: error:: ExecuteError ;
5252use crate :: query:: resolve_stream_names;
53- use crate :: query:: { CountsRequest , QUERY_SESSION , Query as LogicalQuery , execute} ;
53+ use crate :: query:: { CountsRecord , CountsRequest , QUERY_SESSION , Query as LogicalQuery , execute} ;
5454use crate :: rbac:: Users ;
5555use crate :: response:: QueryResponse ;
5656use crate :: storage:: ObjectStorageError ;
@@ -387,6 +387,14 @@ pub async fn get_counts(
387387 // if the user has given a sql query (counts call with filters applied), then use this flow
388388 // this could include filters or group by
389389 if body. conditions . is_some ( ) {
390+ let group_by_cols: Vec < String > = body
391+ . conditions
392+ . as_ref ( )
393+ . and_then ( |c| c. group_by . as_ref ( ) )
394+ . cloned ( )
395+ . unwrap_or_default ( ) ;
396+ let has_groups = !group_by_cols. is_empty ( ) ;
397+
390398 let time_partition = PARSEABLE
391399 . get_stream ( & body. stream , & tenant_id) ?
392400 . get_time_partition ( )
@@ -410,11 +418,66 @@ pub async fn get_counts(
410418
411419 if let Some ( records) = records {
412420 let json_records = record_batches_to_json ( & records) ?;
413- let records = json_records. into_iter ( ) . map ( Value :: Object ) . collect_vec ( ) ;
421+ let raw_records: Vec < Value > = json_records. into_iter ( ) . map ( Value :: Object ) . collect_vec ( ) ;
422+
423+ // Parse into CountsRecord to normalize timestamps
424+ let counts_records: Vec < CountsRecord > =
425+ serde_json:: from_value ( serde_json:: to_value ( & raw_records) ?) ?;
426+
427+ let start_time: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
428+ counts_records
429+ . iter ( )
430+ . map ( |v| format ! ( "{}+00:00" , v. start_time) ) ,
431+ ) ) ;
432+ let end_time: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
433+ counts_records
434+ . iter ( )
435+ . map ( |v| format ! ( "{}+00:00" , v. end_time) ) ,
436+ ) ) ;
437+ let count: ArrayRef = Arc :: new ( UInt64Array :: from_iter_values (
438+ counts_records. iter ( ) . map ( |v| v. count ) ,
439+ ) ) ;
440+
441+ let batch = RecordBatch :: try_from_iter ( vec ! [
442+ ( "start_time" , start_time) ,
443+ ( "end_time" , end_time) ,
444+ ( "count" , count) ,
445+ ] )
446+ . map_err ( |e| QueryError :: CustomError ( e. to_string ( ) ) ) ?;
447+
448+ let json_records = record_batches_to_json ( & [ batch] ) ?;
449+ let counts: Vec < Value > = json_records. into_iter ( ) . map ( Value :: Object ) . collect_vec ( ) ;
450+
451+ // When groupBy is present, enrich counts with group column values
452+ let counts = if has_groups {
453+ counts
454+ . into_iter ( )
455+ . enumerate ( )
456+ . map ( |( i, mut record) | {
457+ if let ( Value :: Object ( rec) , Some ( Value :: Object ( raw_obj) ) ) =
458+ ( & mut record, raw_records. get ( i) )
459+ {
460+ for col in & group_by_cols {
461+ if let Some ( val) = raw_obj. get ( col) {
462+ rec. insert ( col. clone ( ) , val. clone ( ) ) ;
463+ }
464+ }
465+ }
466+ record
467+ } )
468+ . collect ( )
469+ } else {
470+ counts
471+ } ;
472+
473+ let mut fields = vec ! [ "start_time" , "end_time" , "count" ] ;
474+ if has_groups {
475+ fields. extend ( group_by_cols. iter ( ) . map ( |s| s. as_str ( ) ) ) ;
476+ }
414477
415478 let res = json ! ( {
416- "fields" : vec! [ "start_time" , "endTime" , "count" ] ,
417- "records" : records ,
479+ "fields" : fields ,
480+ "records" : counts ,
418481 } ) ;
419482
420483 return Ok ( web:: Json ( res) ) ;
@@ -427,7 +490,7 @@ pub async fn get_counts(
427490
428491 let records = body. get_bin_density ( & tenant_id) . await ?;
429492 let res = json ! ( {
430- "fields" : vec![ "start_time" , "endTime " , "count" ] ,
493+ "fields" : vec![ "start_time" , "end_time " , "count" ] ,
431494 "records" : records,
432495 } ) ;
433496 Ok ( web:: Json ( res) )
0 commit comments