@@ -38,6 +38,7 @@ CREATE OR REPLACE TABLE logs31
3838 INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0 .001 ) GRANULARITY 1 ,
3939 INDEX idx_mat_body_ipv4_matches mat_body_ipv4_matches TYPE bloom_filter(0 .01 ) GRANULARITY 1 ,
4040 INDEX idx_body_ngram3 body TYPE ngrambf_v1(3 , 25000 , 2 , 0 ) GRANULARITY 1 ,
41+ INDEX idx_observed_minmax observed_timestamp TYPE minmax GRANULARITY 1 ,
4142 PROJECTION projection_aggregate_counts
4243 (
4344 SELECT
@@ -180,6 +181,33 @@ mapSort(mapFilter((k, v) -> isNotNull(v), mapApply((k,v) -> (concat(k, '__float'
180181mapSort(mapFilter((k, v) - > isNotNull(v), mapApply((k,v) - > (concat(k, ' __datetime' ), parseDateTimeBestEffortOrNull(JSONExtract(v, ' String' ), 6 )), attributes))) as attributes_map_datetime,
181182mapSort(resource_attributes) as resource_attributes,
182183toInt32OrZero(_headers .value [indexOf(_headers .name , ' team_id' )]) as team_id
183- FROM kafka_logs_avro;
184+ FROM kafka_logs_avro settings min_insert_block_size_rows= 0 , min_insert_block_size_bytes= 0 ;
185+
186+ create or replace table logs_kafka_metrics
187+ (
188+ ` _partition` UInt32,
189+ ` _topic` String,
190+ ` max_offset` SimpleAggregateFunction(max, UInt64),
191+ ` max_observed_timestamp` SimpleAggregateFunction(max, DateTime64(9 )),
192+ ` max_timestamp` SimpleAggregateFunction(max, DateTime64(9 )),
193+ ` max_created_at` SimpleAggregateFunction(max, DateTime64(9 )),
194+ ` max_lag` SimpleAggregateFunction(max, UInt64)
195+ )
196+ ENGINE = MergeTree
197+ ORDER BY (_topic, _partition);
198+
199+ drop view if exists kafka_logs_avro_kafka_metrics_mv;
200+ CREATE MATERIALIZED VIEW kafka_logs_avro_kafka_metrics_mv TO logs_kafka_metrics
201+ AS
202+ SELECT
203+ _partition,
204+ _topic,
205+ maxSimpleState(_offset) as max_offset,
206+ maxSimpleState(observed_timestamp) as max_observed_timestamp,
207+ maxSimpleState(timestamp ) as max_timestamp,
208+ maxSimpleState(now()) as max_created_at,
209+ maxSimpleState(now() - observed_timestamp) as max_lag
210+ FROM kafka_logs_avro
211+ group by _partition, _topic;
184212
185213select ' clickhouse logs tables initialised successfully!' ;
0 commit comments