3535
3636func TestMain (m * testing.M ) {
3737 var broker * kafkaLib.Broker
38-
3938 // setup test
4039 opts := dockertest.RunOptions {
4140 Repository : "moeenz/docker-kafka-kraft" ,
@@ -55,6 +54,7 @@ func TestMain(m *testing.M) {
5554 time .Sleep (30 * time .Second )
5655 conn , err := kafkaLib .NewClient ([]string {brokerHost }, nil )
5756 if err != nil {
57+ fmt .Printf ("error creating client " )
5858 return
5959 }
6060
@@ -70,9 +70,9 @@ func TestMain(m *testing.M) {
7070 conn .Close ()
7171 return
7272 }
73-
7473 return
7574 }
75+
7676 purgeContainer , err := utils .CreateContainer (opts , retryFn )
7777 if err != nil {
7878 log .Fatal (err )
@@ -86,8 +86,6 @@ func TestMain(m *testing.M) {
8686 // run tests
8787 code := m .Run ()
8888
89- conn .Close ()
90-
9189 // purge container
9290 if err := purgeContainer (); err != nil {
9391 log .Fatal (err )
@@ -179,7 +177,6 @@ func TestExtract(t *testing.T) {
179177 if err != nil {
180178 t .Fatal (err )
181179 }
182-
183180 emitter := mocks .NewEmitter ()
184181 err = extr .Extract (ctx , emitter .Push )
185182 assert .NoError (t , err )
@@ -226,12 +223,14 @@ func TestExtract(t *testing.T) {
226223}
227224
228225func setup (broker * kafkaLib.Broker ) (err error ) {
226+
229227 // create client connection to create topics
230228 conn , err := kafkaLib .NewClient ([]string {brokerHost }, nil )
231229 if err != nil {
232230 fmt .Printf ("error creating client " )
233231 return
234232 }
233+
235234 defer conn .Close ()
236235
237236 // create topics
@@ -241,6 +240,7 @@ func setup(broker *kafkaLib.Broker) (err error) {
241240 "meteor-test-topic-3" : {NumPartitions : 1 , ReplicationFactor : 1 },
242241 "__consumer_offsets" : {NumPartitions : 1 , ReplicationFactor : 1 },
243242 }
243+
244244 createTopicRequest := & kafkaLib.CreateTopicsRequest {TopicDetails : topicConfigs }
245245 _ , err = broker .CreateTopics (createTopicRequest )
246246 if err != nil {
0 commit comments