Skip to content

Commit 6e5ff98

Browse files
liuzenghLeoyoungxhFlash-LHRYoungFr
authored
Merge pull request #36 from trpc-ecosystem/kafka-changelog
* clickhouse: fix go reference API doc (#18) https://pkg.go.dev/trpc.group/trpc-go/trpc-database/clickhouse * kafka: update sarama dependence (#21) * kafka: update sarama dependence * fix unit test * kafka: release v1.1.0 (#22) * workflows: add cla.yaml (#26) * add localcache (#25) * feat: add localcache plugin * chore: update LICENSE * test: add localcache workflow * chore: yaml version * test: flaky test * add mongodb (#28) * add mongodb * revert cover.out * update LICENSE and fix variable name * update comments * kafka: allow users to set sarama's Metadata parameters (#33) sync from the internal merge request 1442 * kafka: change default metadataRefreshFrequency to 60s and use GroupStrategies instead of deprecated Strategy config (#34) sync from the internal merge request 1442 * kafka: release v1.2.0 --------- Co-authored-by: Leo <[email protected]> Co-authored-by: Flash-LHR <[email protected]> Co-authored-by: MengYinlei <[email protected]>
2 parents 1898adc + 27d1450 commit 6e5ff98

File tree

4 files changed

+114
-19
lines changed

4 files changed

+114
-19
lines changed

kafka/CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
# Change Log
22

3-
## [1.1.0](https://github.com/trpc-ecosystem/go-database/releases/tag/kafka%2Fv1.1.0) (2023-12-22)
3+
## [kafka/v1.2.0](https://github.com/trpc-ecosystem/go-database/releases/tag/kafka%2Fv1.2.0) (2024-12-11)
4+
5+
### Features
6+
7+
- kafka: change default metadataRefreshFrequency to 60s and use GroupStrategies instead of deprecated Strategy config ( sync from the internal merge request 1379) (#34)
8+
- kafka: allow users to set sarama's Metadata parameters(sync from the internal merge request 1442) (#33)
9+
10+
## [kafka/v1.1.0](https://github.com/trpc-ecosystem/go-database/releases/tag/kafka%2Fv1.1.0) (2023-12-22)
11+
412

513
### Breaking Changes
614

kafka/config.go

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,23 @@ type UserConfig struct {
4545
ScramClient *LSCRAMClient // LSCRAM safety certification
4646
// The maximum number of retries on failure,
4747
// the default is 0: retry all the time <0 means no retry
48-
MaxRetry int
49-
NetMaxOpenRequests int // Maximum number of requests
50-
MaxProcessingTime time.Duration
51-
NetDailTimeout time.Duration
52-
NetReadTimeout time.Duration
53-
NetWriteTimeout time.Duration
54-
GroupSessionTimeout time.Duration
55-
GroupRebalanceTimeout time.Duration
56-
GroupRebalanceRetryMax int
57-
IsolationLevel sarama.IsolationLevel
58-
RetryInterval time.Duration // Retry Interval Works with MaxRetry
59-
ProducerRetry struct {
48+
MaxRetry int
49+
NetMaxOpenRequests int // Maximum number of requests
50+
MaxProcessingTime time.Duration
51+
NetDailTimeout time.Duration
52+
NetReadTimeout time.Duration
53+
NetWriteTimeout time.Duration
54+
GroupSessionTimeout time.Duration
55+
GroupRebalanceTimeout time.Duration
56+
GroupRebalanceRetryMax int
57+
MetadataRetryMax int
58+
MetadataRetryBackoff time.Duration
59+
MetadataRefreshFrequency time.Duration
60+
MetadataFull bool
61+
MetadataAllowAutoTopicCreation bool
62+
IsolationLevel sarama.IsolationLevel
63+
RetryInterval time.Duration // Retry Interval Works with MaxRetry
64+
ProducerRetry struct {
6065
Max int // Maximum number of retries
6166
RetryInterval time.Duration // RetryInterval retry interval
6267
}
@@ -83,9 +88,11 @@ func (uc *UserConfig) getServerConfig() *sarama.Config {
8388
sc.ClientID = uc.ClientID
8489
}
8590

86-
sc.Metadata.Full = false // Disable pulling all metadata
87-
sc.Metadata.Retry.Max = 1 // Metadata Update Repeat Times
88-
sc.Metadata.Retry.Backoff = time.Second // Metadata update wait time
91+
sc.Metadata.Retry.Max = uc.MetadataRetryMax
92+
sc.Metadata.Retry.Backoff = uc.MetadataRetryBackoff
93+
sc.Metadata.RefreshFrequency = uc.MetadataRefreshFrequency
94+
sc.Metadata.Full = uc.MetadataFull
95+
sc.Metadata.AllowAutoTopicCreation = uc.MetadataAllowAutoTopicCreation
8996

9097
sc.Net.MaxOpenRequests = uc.NetMaxOpenRequests
9198
sc.Net.DialTimeout = uc.NetDailTimeout
@@ -97,7 +104,7 @@ func (uc *UserConfig) getServerConfig() *sarama.Config {
97104
sc.Consumer.Fetch.Max = int32(uc.FetchMax)
98105
sc.Consumer.Offsets.Initial = uc.Initial
99106
sc.Consumer.Offsets.AutoCommit.Interval = 3 * time.Second // How often to submit consumption progress
100-
sc.Consumer.Group.Rebalance.Strategy = uc.Strategy
107+
sc.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{uc.Strategy}
101108
sc.Consumer.Group.Rebalance.Timeout = uc.GroupRebalanceTimeout
102109
sc.Consumer.Group.Rebalance.Retry.Max = uc.GroupRebalanceRetryMax
103110
sc.Consumer.Group.Session.Timeout = uc.GroupSessionTimeout
@@ -150,7 +157,13 @@ func GetDefaultConfig() *UserConfig {
150157
GroupSessionTimeout: 10 * time.Second,
151158
GroupRebalanceTimeout: 60 * time.Second,
152159
GroupRebalanceRetryMax: 4,
153-
IsolationLevel: 0,
160+
MetadataRetryMax: 1,
161+
MetadataRetryBackoff: 1000 * time.Millisecond,
162+
// The default time for sarama is 10 minutes, which results in a slow detection of new partitions, so it is shortened to 2 minutes.
163+
MetadataRefreshFrequency: 120 * time.Second,
164+
MetadataFull: false, // disable pull all metadata
165+
MetadataAllowAutoTopicCreation: true,
166+
IsolationLevel: 0,
154167
// Message consumption error retry interval The default is 3s The unit of this parameter is time.Millisecond
155168
RetryInterval: 3000 * time.Millisecond,
156169
// production retries the default configuration to align with the default configuration of sarama.NewConfig

kafka/config_parser.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ func newConfigParsers() map[string]configParseFunc {
2424
parserBasicConfig(m)
2525
parserAdvanceConfig(m)
2626
parserBatchConfig(m)
27+
parserMetadataConfig(m)
2728
parserAuthConfig(m)
2829
parserDiscoverConfig(m)
2930
return m
@@ -218,6 +219,52 @@ func parserBatchConfig(m map[string]configParseFunc) {
218219
}
219220
}
220221

222+
func parserMetadataConfig(m map[string]configParseFunc) {
223+
m["metadataRetryMax"] = func(config *UserConfig, s string) error {
224+
metadataRetryMax, err := strconv.Atoi(s)
225+
if err != nil {
226+
return err
227+
}
228+
if metadataRetryMax < 0 {
229+
return errors.New("param not support: metadataRetryMax expect a value of no less than 0")
230+
}
231+
config.MetadataRetryMax = metadataRetryMax
232+
return nil
233+
}
234+
m["metadataRetryBackoff"] = func(config *UserConfig, s string) error {
235+
metadataRetryBackoff, err := strconv.Atoi(s)
236+
if err != nil {
237+
return err
238+
}
239+
if metadataRetryBackoff < 0 {
240+
return errors.New("param not support: metadataRetryBackoff expect a value of no less than 0")
241+
}
242+
config.MetadataRetryBackoff = time.Duration(metadataRetryBackoff) * time.Millisecond
243+
return nil
244+
}
245+
m["metadataRefreshFrequency"] = func(config *UserConfig, s string) error {
246+
metadataRefreshFrequency, err := strconv.Atoi(s)
247+
if err != nil {
248+
return err
249+
}
250+
if metadataRefreshFrequency < 0 {
251+
return errors.New("param not support: metadataRefreshFrequency expect a value of no less than 0")
252+
}
253+
config.MetadataRefreshFrequency = time.Duration(metadataRefreshFrequency) * time.Second
254+
return nil
255+
}
256+
m["metadataFull"] = func(config *UserConfig, s string) error {
257+
var err error
258+
config.MetadataFull, err = strconv.ParseBool(s)
259+
return err
260+
}
261+
m["metadataAllowAutoTopicCreation"] = func(config *UserConfig, s string) error {
262+
var err error
263+
config.MetadataAllowAutoTopicCreation, err = strconv.ParseBool(s)
264+
return err
265+
}
266+
}
267+
221268
// parserAuthConfig returns configParseFunc for kafka auth,
222269
// These parameter items can assign the user's input configuration to userConfig.
223270
// The parameter items may validate the content of the user's input during execution,

kafka/config_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ func TestParseAddress(t *testing.T) {
2424
assert.Equal(t, 54455, cfg.FetchMax)
2525
assert.Equal(t, 35326236, cfg.MaxMessageBytes)
2626
assert.Equal(t, sarama.OffsetOldest, cfg.Initial)
27+
assert.Equal(t, 10, cfg.FlushMessages)
28+
assert.Equal(t, 100, cfg.FlushMaxMessages)
29+
assert.Equal(t, 10000000, cfg.FlushBytes)
30+
assert.Equal(t, 100*time.Millisecond, cfg.FlushFrequency)
31+
assert.Equal(t, true, cfg.Idempotent)
2732

2833
RegisterAddrConfig("test_registered", cfg)
2934

@@ -36,8 +41,10 @@ func TestParseAddress(t *testing.T) {
3641
func Test_parseAddress(t *testing.T) {
3742
address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512"
3843
_, err := ParseAddress(address)
44+
assert.Nil(t, err)
3945
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&protocol=SASL_SSL&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512"
4046
_, err = ParseAddress(address)
47+
assert.Nil(t, err)
4148
address = "127.0.0.1:9092?topics=Topic1,Topic2,Topic3&clientid=client1&version=0.10.2.0&strategy=sticky&batch=2&batchFlush=3000&group=test&maxRetry=10"
4249
_, err = ParseAddress(address)
4350
assert.Nil(t, err)
@@ -72,7 +79,27 @@ func Test_parseAddressWithmaxWaitTime(t *testing.T) {
7279
assert.Nil(t, err)
7380
assert.Equal(t, 250*time.Millisecond, conf.MaxWaitTime)
7481
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&maxWaitTime=test"
75-
conf, err = ParseAddress(address)
82+
_, err = ParseAddress(address)
83+
assert.NotNil(t, err)
84+
}
85+
86+
func Test_parseAddressWithMetadata(t *testing.T) {
87+
address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=100&metadataFull=true&metadataAllowAutoTopicCreation=true"
88+
conf, err := ParseAddress(address)
89+
assert.Nil(t, err)
90+
assert.Equal(t, 3, conf.MetadataRetryMax)
91+
assert.Equal(t, 10*time.Millisecond, conf.MetadataRetryBackoff)
92+
assert.Equal(t, 100*time.Second, conf.MetadataRefreshFrequency)
93+
assert.Equal(t, true, conf.MetadataFull)
94+
assert.Equal(t, true, conf.MetadataAllowAutoTopicCreation)
95+
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=-3&metadataRetryBackoff=10&metadataRefreshFrequency=100"
96+
_, err = ParseAddress(address)
97+
assert.NotNil(t, err)
98+
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=-10&metadataRefreshFrequency=100"
99+
_, err = ParseAddress(address)
100+
assert.NotNil(t, err)
101+
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=-100"
102+
_, err = ParseAddress(address)
76103
assert.NotNil(t, err)
77104
}
78105

0 commit comments

Comments
 (0)