@@ -18,6 +18,7 @@ use pyo3::types::PyBytes;
1818
1919use crate :: config;
2020use crate :: factory:: ConsumerStrategyFactory ;
21+ use crate :: factory_v2:: ConsumerStrategyFactoryV2 ;
2122use crate :: logging:: { setup_logging, setup_sentry} ;
2223use crate :: metrics:: global_tags:: set_global_tag;
2324use crate :: metrics:: statsd:: StatsDBackend ;
@@ -47,29 +48,290 @@ pub fn consumer(
4748 max_dlq_buffer_length : Option < usize > ,
4849 custom_envoy_request_timeout : Option < u64 > ,
4950 join_timeout_ms : Option < u64 > ,
51+ consumer_version : Option < & str > ,
5052) -> usize {
51- py. allow_threads ( || {
52- consumer_impl (
53- consumer_group,
54- auto_offset_reset,
55- no_strict_offset_reset,
56- consumer_config_raw,
57- concurrency,
58- clickhouse_concurrency,
59- use_rust_processor,
60- enforce_schema,
61- max_poll_interval_ms,
62- async_inserts,
63- python_max_queue_depth,
64- health_check_file,
65- stop_at_timestamp,
66- batch_write_timeout_ms,
53+ if consumer_version == Some ( "v2" ) {
54+ py. allow_threads ( || {
55+ consumer_v2_impl (
56+ consumer_group,
57+ auto_offset_reset,
58+ no_strict_offset_reset,
59+ consumer_config_raw,
60+ concurrency,
61+ clickhouse_concurrency,
62+ use_rust_processor,
63+ enforce_schema,
64+ max_poll_interval_ms,
65+ async_inserts,
66+ python_max_queue_depth,
67+ health_check_file,
68+ stop_at_timestamp,
69+ batch_write_timeout_ms,
70+ max_dlq_buffer_length,
71+ custom_envoy_request_timeout,
72+ join_timeout_ms,
73+ health_check,
74+ )
75+ } )
76+ } else {
77+ py. allow_threads ( || {
78+ consumer_impl (
79+ consumer_group,
80+ auto_offset_reset,
81+ no_strict_offset_reset,
82+ consumer_config_raw,
83+ concurrency,
84+ clickhouse_concurrency,
85+ use_rust_processor,
86+ enforce_schema,
87+ max_poll_interval_ms,
88+ async_inserts,
89+ python_max_queue_depth,
90+ health_check_file,
91+ stop_at_timestamp,
92+ batch_write_timeout_ms,
93+ max_dlq_buffer_length,
94+ custom_envoy_request_timeout,
95+ join_timeout_ms,
96+ health_check,
97+ )
98+ } )
99+ }
100+ }
101+
102+ #[ allow( clippy:: too_many_arguments) ]
103+ pub fn consumer_v2_impl (
104+ consumer_group : & str ,
105+ auto_offset_reset : & str ,
106+ no_strict_offset_reset : bool ,
107+ consumer_config_raw : & str ,
108+ concurrency : usize ,
109+ clickhouse_concurrency : usize ,
110+ use_rust_processor : bool ,
111+ enforce_schema : bool ,
112+ max_poll_interval_ms : usize ,
113+ async_inserts : bool ,
114+ python_max_queue_depth : Option < usize > ,
115+ health_check_file : Option < & str > ,
116+ stop_at_timestamp : Option < i64 > ,
117+ batch_write_timeout_ms : Option < u64 > ,
118+ max_dlq_buffer_length : Option < usize > ,
119+ custom_envoy_request_timeout : Option < u64 > ,
120+ join_timeout_ms : Option < u64 > ,
121+ health_check : & str ,
122+ ) -> usize {
123+ setup_logging ( ) ;
124+
125+ let consumer_config = config:: ConsumerConfig :: load_from_str ( consumer_config_raw) . unwrap ( ) ;
126+ let max_batch_size = consumer_config. max_batch_size ;
127+ let max_batch_time = Duration :: from_millis ( consumer_config. max_batch_time_ms ) ;
128+
129+ let batch_write_timeout = match batch_write_timeout_ms {
130+ Some ( timeout_ms) => {
131+ if timeout_ms >= consumer_config. max_batch_time_ms {
132+ Some ( Duration :: from_millis ( timeout_ms) )
133+ } else {
134+ None
135+ }
136+ }
137+ None => None ,
138+ } ;
139+
140+ for storage in & consumer_config. storages {
141+ tracing:: info!(
142+ "Storage: {}, ClickHouse Table Name: {}, Message Processor: {:?}, ClickHouse host: {}, ClickHouse port: {}, ClickHouse HTTP port: {}, ClickHouse database: {}" ,
143+ storage. name,
144+ storage. clickhouse_table_name,
145+ & storage. message_processor,
146+ storage. clickhouse_cluster. host,
147+ storage. clickhouse_cluster. port,
148+ storage. clickhouse_cluster. http_port,
149+ storage. clickhouse_cluster. database,
150+ ) ;
151+ }
152+
153+ // TODO: Support multiple storages
154+ assert_eq ! ( consumer_config. storages. len( ) , 1 ) ;
155+
156+ let mut _sentry_guard = None ;
157+
158+ let env_config = consumer_config. env . clone ( ) ;
159+
160+ // setup sentry
161+ if let Some ( dsn) = consumer_config. env . sentry_dsn {
162+ tracing:: debug!( sentry_dsn = dsn) ;
163+ // this forces anyhow to record stack traces when capturing an error:
164+ std:: env:: set_var ( "RUST_BACKTRACE" , "1" ) ;
165+ _sentry_guard = Some ( setup_sentry ( & dsn) ) ;
166+ }
167+
168+ // setup arroyo metrics
169+ if let ( Some ( host) , Some ( port) ) = (
170+ consumer_config. env . dogstatsd_host ,
171+ consumer_config. env . dogstatsd_port ,
172+ ) {
173+ let storage_name = consumer_config
174+ . storages
175+ . iter ( )
176+ . map ( |s| s. name . clone ( ) )
177+ . collect :: < Vec < _ > > ( )
178+ . join ( "," ) ;
179+ set_global_tag ( "storage" . to_owned ( ) , storage_name) ;
180+ set_global_tag ( "consumer_group" . to_owned ( ) , consumer_group. to_owned ( ) ) ;
181+
182+ metrics:: init ( StatsDBackend :: new (
183+ & host,
184+ port,
185+ "snuba.consumer" ,
186+ env_config. ddm_metrics_sample_rate ,
187+ ) )
188+ . unwrap ( ) ;
189+ }
190+
191+ if !use_rust_processor {
192+ procspawn:: init ( ) ;
193+ }
194+
195+ let first_storage = consumer_config. storages [ 0 ] . clone ( ) ;
196+
197+ tracing:: info!(
198+ storage = first_storage. name,
199+ "Starting consumer for {:?}" ,
200+ first_storage. name,
201+ ) ;
202+
203+ let config = KafkaConfig :: new_consumer_config (
204+ vec ! [ ] ,
205+ consumer_group. to_owned ( ) ,
206+ auto_offset_reset. parse ( ) . expect (
207+ "Invalid value for `auto_offset_reset`. Valid values: `error`, `earliest`, `latest`" ,
208+ ) ,
209+ !no_strict_offset_reset,
210+ max_poll_interval_ms,
211+ Some ( consumer_config. raw_topic . broker_config ) ,
212+ ) ;
213+
214+ let logical_topic_name = consumer_config. raw_topic . logical_topic_name ;
215+
216+ // XXX: this variable must live for the lifetime of the entire consumer. we should do something
217+ // to ensure this statically, such as use actual Rust lifetimes or ensuring the runtime stays
218+ // alive by storing it inside of the DlqPolicy
219+ let dlq_concurrency_config = ConcurrencyConfig :: new ( 10 ) ;
220+
221+ // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be
222+ // writing to the DLQ topics in prod.
223+ let dlq_policy = consumer_config. dlq_topic . map ( |dlq_topic_config| {
224+ let producer_config =
225+ KafkaConfig :: new_producer_config ( vec ! [ ] , Some ( dlq_topic_config. broker_config ) ) ;
226+ let producer = KafkaProducer :: new ( producer_config) ;
227+
228+ let kafka_dlq_producer = Box :: new ( KafkaDlqProducer :: new (
229+ producer,
230+ Topic :: new ( & dlq_topic_config. physical_topic_name ) ,
231+ ) ) ;
232+
233+ let handle = dlq_concurrency_config. handle ( ) ;
234+ DlqPolicy :: new (
235+ handle,
236+ kafka_dlq_producer,
237+ DlqLimit {
238+ max_invalid_ratio : None ,
239+ max_consecutive_count : None ,
240+ } ,
67241 max_dlq_buffer_length,
68- custom_envoy_request_timeout,
69- join_timeout_ms,
70- health_check,
71242 )
72- } )
243+ } ) ;
244+
245+ let commit_log_producer = if let Some ( topic_config) = consumer_config. commit_log_topic {
246+ let producer_config =
247+ KafkaConfig :: new_producer_config ( vec ! [ ] , Some ( topic_config. broker_config ) ) ;
248+ let producer = KafkaProducer :: new ( producer_config) ;
249+ Some ( (
250+ Arc :: new ( producer) ,
251+ Topic :: new ( & topic_config. physical_topic_name ) ,
252+ ) )
253+ } else {
254+ None
255+ } ;
256+
257+ let replacements_config = if let Some ( topic_config) = consumer_config. replacements_topic {
258+ let producer_config =
259+ KafkaConfig :: new_producer_config ( vec ! [ ] , Some ( topic_config. broker_config ) ) ;
260+ Some ( (
261+ producer_config,
262+ Topic :: new ( & topic_config. physical_topic_name ) ,
263+ ) )
264+ } else {
265+ None
266+ } ;
267+
268+ let topic = Topic :: new ( & consumer_config. raw_topic . physical_topic_name ) ;
269+
270+ let mut rebalance_delay_secs = consumer_config
271+ . raw_topic
272+ . quantized_rebalance_consumer_group_delay_secs ;
273+ let config_rebalance_delay_secs = rebalancing:: get_rebalance_delay_secs ( consumer_group) ;
274+ if let Some ( secs) = config_rebalance_delay_secs {
275+ rebalance_delay_secs = Some ( secs) ;
276+ }
277+ if let Some ( secs) = rebalance_delay_secs {
278+ rebalancing:: delay_kafka_rebalance ( secs)
279+ }
280+
281+ let factory = ConsumerStrategyFactoryV2 {
282+ storage_config : first_storage,
283+ env_config,
284+ logical_topic_name,
285+ max_batch_size,
286+ max_batch_time,
287+ processing_concurrency : ConcurrencyConfig :: new ( concurrency) ,
288+ clickhouse_concurrency : ConcurrencyConfig :: new ( clickhouse_concurrency) ,
289+ commitlog_concurrency : ConcurrencyConfig :: new ( 2 ) ,
290+ replacements_concurrency : ConcurrencyConfig :: new ( 4 ) ,
291+ async_inserts,
292+ python_max_queue_depth,
293+ use_rust_processor,
294+ health_check_file : health_check_file. map ( ToOwned :: to_owned) ,
295+ enforce_schema,
296+ commit_log_producer,
297+ replacements_config,
298+ physical_consumer_group : consumer_group. to_owned ( ) ,
299+ physical_topic_name : Topic :: new ( & consumer_config. raw_topic . physical_topic_name ) ,
300+ accountant_topic_config : consumer_config. accountant_topic ,
301+ stop_at_timestamp,
302+ batch_write_timeout,
303+ custom_envoy_request_timeout,
304+ join_timeout_ms,
305+ health_check : health_check. to_string ( ) ,
306+ } ;
307+
308+ let processor = StreamProcessor :: with_kafka ( config, factory, topic, dlq_policy) ;
309+
310+ let mut handle = processor. get_handle ( ) ;
311+
312+ match rebalance_delay_secs {
313+ Some ( secs) => {
314+ ctrlc:: set_handler ( move || {
315+ rebalancing:: delay_kafka_rebalance ( secs) ;
316+ handle. signal_shutdown ( ) ;
317+ } )
318+ . expect ( "Error setting Ctrl-C handler" ) ;
319+ }
320+ None => {
321+ ctrlc:: set_handler ( move || {
322+ handle. signal_shutdown ( ) ;
323+ } )
324+ . expect ( "Error setting Ctrl-C handler" ) ;
325+ }
326+ }
327+
328+ if let Err ( error) = processor. run ( ) {
329+ let error: & dyn std:: error:: Error = & error;
330+ tracing:: error!( "{:?}" , error) ;
331+ 1
332+ } else {
333+ 0
334+ }
73335}
74336
75337#[ allow( clippy:: too_many_arguments) ]
0 commit comments