@@ -44,6 +44,7 @@ def create_kafka_consumer(
4444 "bootstrap.servers" : config ["bootstrap_servers" ],
4545 "group.id" : config ["group_id" ],
4646 "auto.offset.reset" : config ["auto_offset_reset" ],
47+ "arroyo.strict.offset.reset" : config ["arroyo_strict_offset_reset" ],
4748 "enable.auto.commit" : False ,
4849 "enable.auto.offset.store" : False ,
4950 }
@@ -128,6 +129,9 @@ def get_kafka_config() -> Dict[str, Any]:
128129 if not topics_env :
129130 raise ValueError ("KAFKA_TOPICS env var is required" )
130131
132+ # Parse arroyo_strict_offset_reset as boolean, default to None if invalid
133+ arroyo_strict_offset_reset = {"true" : True , "false" : False }.get (os .getenv ("ARROYO_STRICT_OFFSET_RESET" , "" ).lower ())
134+
131135 # Optional configuration with defaults
132136 return {
133137 "bootstrap_servers" : bootstrap_servers ,
@@ -137,4 +141,5 @@ def get_kafka_config() -> Dict[str, Any]:
137141 "max_pending_futures" : int (os .getenv ("KAFKA_MAX_PENDING_FUTURES" , "100" )),
138142 "healthcheck_file" : os .getenv ("KAFKA_HEALTHCHECK_FILE" ),
139143 "auto_offset_reset" : os .getenv ("KAFKA_AUTO_OFFSET_RESET" , "latest" ), # latest = skip old messages
144+ "arroyo_strict_offset_reset" : arroyo_strict_offset_reset ,
140145 }
0 commit comments