Skip to content

Commit fb743a9

Browse files
authored
fix: fix kafka extractor issues (#494)
* chore: fix lint issues * chore: fix build issues
1 parent 01a148e commit fb743a9

File tree

2 files changed

+21
-18
lines changed

2 files changed

+21
-18
lines changed

plugins/extractors/kafka/kafka.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
"github.com/IBM/sarama"
1415
"github.com/raystack/meteor/models"
1516
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
1617
"github.com/raystack/meteor/plugins"
@@ -110,21 +111,21 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
110111
}
111112

112113
consumerConfig := sarama.NewConfig()
113-
114114
if e.config.Auth.TLS.Enabled {
115115
tlsConfig, err := e.createTLSConfig()
116116
if err != nil {
117117
return fmt.Errorf("create tls config: %w", err)
118118
}
119119
consumerConfig.Net.TLS.Enable = true
120120
consumerConfig.Net.TLS.Config = tlsConfig
121+
}
121122

122-
if e.config.Auth.SASL.Enabled {
123-
consumerConfig.Net.SASL.Enable = true
124-
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
125-
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
126-
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
127-
}
123+
if e.config.Auth.SASL.Enabled {
124+
consumerConfig.Net.SASL.Enable = true
125+
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
126+
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
127+
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
128+
}
128129
}
129130

130131
consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig)
@@ -133,6 +134,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
133134
return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker,
134135
consumerConfig, err.Error())
135136
}
137+
136138
e.conn = consumer
137139
return nil
138140
}
@@ -162,6 +164,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
162164
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
163165
)
164166
}(time.Now())
167+
165168
topics, err := e.conn.Topics()
166169
if err != nil {
167170
return fmt.Errorf("fetch topics: %w", err)
@@ -200,11 +203,6 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
200203
}, nil
201204
}
202205

203-
cert, err := tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
204-
if err != nil {
205-
return nil, fmt.Errorf("create cert: %w", err)
206-
}
207-
208206
var cert tls.Certificate
209207
var err error
210208
if authConfig.CertFile != "" && authConfig.KeyFile != "" {
@@ -214,6 +212,11 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
214212
}
215213
}
216214

215+
caCert, err := os.ReadFile(authConfig.CAFile)
216+
if err != nil {
217+
return nil, fmt.Errorf("read ca cert file: %w", err)
218+
}
219+
217220
caCertPool := x509.NewCertPool()
218221
caCertPool.AppendCertsFromPEM(caCert)
219222

@@ -231,7 +234,7 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.
231234
Profile: &v1beta2.TopicProfile{
232235
NumberOfPartitions: int64(numOfPartitions),
233236
},
234-
Attributes: &structpb.Struct{},
237+
Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present
235238
})
236239
if err != nil {
237240
e.logger.Warn("error creating Any struct", "error", err)

plugins/extractors/kafka/kafka_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ var (
3535

3636
func TestMain(m *testing.M) {
3737
var broker *kafkaLib.Broker
38-
3938
// setup test
4039
opts := dockertest.RunOptions{
4140
Repository: "moeenz/docker-kafka-kraft",
@@ -55,6 +54,7 @@ func TestMain(m *testing.M) {
5554
time.Sleep(30 * time.Second)
5655
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
5756
if err != nil {
57+
fmt.Printf("error creating client ")
5858
return
5959
}
6060

@@ -70,9 +70,9 @@ func TestMain(m *testing.M) {
7070
conn.Close()
7171
return
7272
}
73-
7473
return
7574
}
75+
7676
purgeContainer, err := utils.CreateContainer(opts, retryFn)
7777
if err != nil {
7878
log.Fatal(err)
@@ -86,8 +86,6 @@ func TestMain(m *testing.M) {
8686
// run tests
8787
code := m.Run()
8888

89-
conn.Close()
90-
9189
// purge container
9290
if err := purgeContainer(); err != nil {
9391
log.Fatal(err)
@@ -179,7 +177,6 @@ func TestExtract(t *testing.T) {
179177
if err != nil {
180178
t.Fatal(err)
181179
}
182-
183180
emitter := mocks.NewEmitter()
184181
err = extr.Extract(ctx, emitter.Push)
185182
assert.NoError(t, err)
@@ -226,12 +223,14 @@ func TestExtract(t *testing.T) {
226223
}
227224

228225
func setup(broker *kafkaLib.Broker) (err error) {
226+
229227
// create client connection to create topics
230228
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
231229
if err != nil {
232230
fmt.Printf("error creating client ")
233231
return
234232
}
233+
235234
defer conn.Close()
236235

237236
// create topics
@@ -241,6 +240,7 @@ func setup(broker *kafkaLib.Broker) (err error) {
241240
"meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1},
242241
"__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1},
243242
}
243+
244244
createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs}
245245
_, err = broker.CreateTopics(createTopicRequest)
246246
if err != nil {

0 commit comments

Comments
 (0)