Skip to content

Commit f95c194

Browse files
feat: add config for kafka consumer partition assignment strategy (#209)
* feat: add config for kafka consumer partition assignment strategy * chore: version bump * chore: bump version to 0.7.1 * chore: version bump
1 parent 01f688d commit f95c194

File tree

4 files changed

+9
-1
lines changed

4 files changed

+9
-1
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'io.odpf'
36-
version '0.7.1'
36+
version '0.7.2'
3737

3838
def projName = "firehose"
3939

docs/docs/advance/generic.md

316 Bytes
Binary file not shown.

src/main/java/io/odpf/firehose/config/KafkaConsumerConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public interface KafkaConsumerConfig extends AppConfig {
5252
@Key("SOURCE_KAFKA_CONSUMER_CONFIG_MANUAL_COMMIT_MIN_INTERVAL_MS")
5353
@DefaultValue("-1")
5454
long getSourceKafkaConsumerManualCommitMinIntervalMs();
55+
56+
@Key("SOURCE_KAFKA_CONSUMER_CONFIG_PARTITION_ASSIGNMENT_STRATEGY")
57+
@DefaultValue("org.apache.kafka.clients.consumer.CooperativeStickyAssignor")
58+
String getSourceKafkaConsumerConfigPartitionAssignmentStrategy();
59+
5560
}

src/main/java/io/odpf/firehose/utils/KafkaUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class KafkaUtils {
3030
private static final String METADATA_MAX_AGE_MS = "metadata.max.age.ms";
3131
private static final String MAX_POLL_RECORDS = "max.poll.records";
3232
private static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
33+
private static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
34+
3335

3436

3537
/**
@@ -56,6 +58,7 @@ public static Map<String, Object> getConfig(KafkaConsumerConfig config, Map<Stri
5658
put(METADATA_MAX_AGE_MS, config.getSourceKafkaConsumerConfigMetadataMaxAgeMs());
5759
put(MAX_POLL_RECORDS, config.getSourceKafkaConsumerConfigMaxPollRecords());
5860
put(SESSION_TIMEOUT_MS, config.getSourceKafkaConsumerConfigSessionTimeoutMs());
61+
put(PARTITION_ASSIGNMENT_STRATEGY, config.getSourceKafkaConsumerConfigPartitionAssignmentStrategy());
5962
}};
6063

6164
return merge(consumerConfigurationMap, KafkaEnvironmentVariables.parse(extraParameters));

0 commit comments

Comments
 (0)