Skip to content

Commit f372824

Browse files
Fix Kafka SASL_SSL listener inside Docker network (#588)
1 parent ee0a4e1 commit f372824

File tree

2 files changed

+77
-6
lines changed

2 files changed

+77
-6
lines changed

src/modules/kafka/kafka-container.test.ts

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ describe("KafkaContainer", () => {
4646
const zooKeeperPort = 2181;
4747
const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
4848
.withNetwork(network)
49-
.withNetworkAliases("zookeeper")
49+
.withNetworkAliases(zooKeeperHost)
5050
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
5151
.withExposedPorts(zooKeeperPort)
5252
.start();
@@ -74,11 +74,11 @@ describe("KafkaContainer", () => {
7474
await originalKafkaContainer.stop();
7575
});
7676

77-
describe("when a set of certificates is provided", () => {
77+
describe("when SASL SSL config listener provided", () => {
7878
const certificatesDir = path.resolve(__dirname, ".", "test-certs");
7979

8080
// ssl {
81-
it(`should expose SASL_SSL listener if configured`, async () => {
81+
it(`should connect locally`, async () => {
8282
const kafkaContainer = await new KafkaContainer()
8383
.withSaslSslListener({
8484
port: 9094,
@@ -114,6 +114,72 @@ describe("KafkaContainer", () => {
114114
await kafkaContainer.stop();
115115
});
116116
// }
117+
118+
it(`should connect within Docker network`, async () => {
119+
const network = await new Network().start();
120+
121+
const kafkaContainer = await new KafkaContainer()
122+
.withNetwork(network)
123+
.withNetworkAliases("kafka")
124+
.withSaslSslListener({
125+
port: 9094,
126+
sasl: {
127+
mechanism: "SCRAM-SHA-512",
128+
user: {
129+
name: "app-user",
130+
password: "userPassword",
131+
},
132+
},
133+
keystore: {
134+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
135+
passphrase: "serverKeystorePassword",
136+
},
137+
truststore: {
138+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
139+
passphrase: "serverTruststorePassword",
140+
},
141+
})
142+
.start();
143+
144+
const kafkaCliContainer = await new GenericContainer(KAFKA_IMAGE)
145+
.withNetwork(network)
146+
.withCommand(["bash", "-c", "echo 'START'; sleep infinity"])
147+
.withCopyFilesToContainer([
148+
{
149+
source: path.resolve(certificatesDir, "kafka.client.truststore.pem"),
150+
target: "/truststore.pem",
151+
},
152+
])
153+
.withCopyContentToContainer([
154+
{
155+
content: `
156+
security.protocol=SASL_SSL
157+
ssl.truststore.location=/truststore.pem
158+
ssl.truststore.type=PEM
159+
ssl.endpoint.identification.algorithm=
160+
sasl.mechanism=SCRAM-SHA-512
161+
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\
162+
username="app-user" \\
163+
password="userPassword";
164+
`,
165+
target: "/etc/kafka/consumer.properties",
166+
},
167+
])
168+
.start();
169+
170+
await kafkaCliContainer.exec(
171+
"kafka-topics --create --topic test-topic --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties"
172+
);
173+
const { output, exitCode } = await kafkaCliContainer.exec(
174+
"kafka-topics --list --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties"
175+
);
176+
177+
expect(exitCode).toBe(0);
178+
expect(output).toContain("test-topic");
179+
180+
await kafkaCliContainer.stop();
181+
await kafkaContainer.stop();
182+
});
117183
});
118184

119185
const testPubSub = async (kafkaContainer: StartedTestContainer, additionalConfig: Partial<KafkaConfig> = {}) => {

src/modules/kafka/kafka-container.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,16 @@ export class KafkaContainer extends GenericContainer {
147147

148148
private async updateAdvertisedListeners(container: StartedTestContainer, inspectResult: InspectResult) {
149149
const brokerAdvertisedListener = `BROKER://${inspectResult.hostname}:${KAFKA_BROKER_PORT}`;
150+
150151
let bootstrapServers = `PLAINTEXT://${container.getHost()}:${container.getMappedPort(KAFKA_PORT)}`;
151152
if (this.saslSslConfig) {
152-
bootstrapServers = `${bootstrapServers},SECURE://${container.getHost()}:${container.getMappedPort(
153-
this.saslSslConfig.port
154-
)}`;
153+
if (this.networkMode) {
154+
bootstrapServers = `${bootstrapServers},SECURE://${inspectResult.hostname}:${this.saslSslConfig.port}`;
155+
} else {
156+
bootstrapServers = `${bootstrapServers},SECURE://${container.getHost()}:${container.getMappedPort(
157+
this.saslSslConfig.port
158+
)}`;
159+
}
155160
}
156161

157162
const { output, exitCode } = await container.exec([

0 commit comments

Comments
 (0)