4
4
YELLOW=' \033[1;33m'
5
5
RED=' \033[1;31m'
6
6
GREEN=' \033[0;32m'
7
+ CYAN=' \033[0;36m'
7
8
NC=' \033[0m' # No Color
8
9
10
+ MTLS_ON=false
11
+ K_CONNECT_ON=false
12
+
13
+ # ### ARGS AND INPUTS
14
+ while [ ! $# -eq 0 ]
15
+ do
16
+ case " $1 " in
17
+ -t)
18
+ echo " ${CYAN} mTLS/SSL Support enabled.${NC} "
19
+ MTLS_ON=true
20
+ ;;
21
+ -k)
22
+ echo " ${CYAN} Kafka Connect test enabled.${NC} "
23
+ K_CONNECT_ON=true
24
+ ;;
25
+ esac
26
+ shift
27
+ done
28
+
9
29
# ### FUNCTIONS --------------------------------
10
30
create_and_deploy_kafka_test_topic_yaml ()
11
31
{
@@ -62,6 +82,57 @@ cleanup_cluster()
62
82
kubectl delete --recursive -f ./temp/${TESTING_TOPIC}
63
83
}
64
84
85
+ deploy_kafka_connect_connector ()
86
+ {
87
+ # Get MongoDB Credentials from Env Vars.
88
+
89
+ # Create connector payload
90
+ CONNECTOR_FILE=" ./temp/${TESTING_TOPIC} /create-connector.json"
91
+ cp ../examples/kafka_connect/connectors/mongoDBSink/CreateMongoSinkConnector.json $CONNECTOR_FILE
92
+
93
+ # 2 "name"
94
+ KCONNECT_NAME=" ${TESTING_TOPIC} -connector"
95
+ echo " Kafka Connect Sink Name: ${KCONNECT_NAME} "
96
+ sed -i.bak " 2s|.*| \" name\" : \" ${KCONNECT_NAME} \" ,|" $CONNECTOR_FILE
97
+ # 4 "topic"
98
+ sed -i.bak " 4s|.*| \" topics\" : \" ${TESTING_TOPIC} \" ,|" $CONNECTOR_FILE
99
+ # 12 "connection.uri"
100
+ sed -i.bak " 12s|.*| \" connection.uri\" : \" ${MONGODB_CONN_URL} \" ,|" $CONNECTOR_FILE
101
+ # 13 "database"
102
+ sed -i.bak " 13s|.*| \" database\" : \" ${DATABASE} \" ,|" $CONNECTOR_FILE
103
+ # 14 "collection"
104
+ sed -i.bak " 14s|.*| \" collection\" : \" ${COLLECTION} \" ,|" $CONNECTOR_FILE
105
+
106
+ # Portforward to kafkaconnect pod & Create Request
107
+ KCONNECT_POD=` kubectl get pods -n kafka | grep kconnect-cluster-connect | awk ' {print $1}' `
108
+ echo " Kafka Connect Pod: ${KCONNECT_POD} "
109
+ kubectl -n kafka port-forward $KCONNECT_POD 8083:8083 &
110
+ KCONNECT_PORT_FORWARD_PID=$!
111
+
112
+ sleep 5
113
+
114
+ curl -H ' Content-Type: application/json' -X POST -d @$CONNECTOR_FILE http://localhost:8083/connectors
115
+
116
+ sleep 5
117
+ kill $KCONNECT_PORT_FORWARD_PID
118
+ }
119
+
120
+ remove_kafka_connect_connector ()
121
+ {
122
+ # Portforward to kafkaconnect pod & Create Request
123
+ KCONNECT_POD=` kubectl get pods -n kafka | grep kconnect-cluster-connect | awk ' {print $1}' `
124
+ echo " Kafka Connect Pod: ${KCONNECT_POD} "
125
+ kubectl -n kafka port-forward $KCONNECT_POD 8083:8083 &
126
+ KCONNECT_PORT_FORWARD_PID=$!
127
+
128
+ sleep 5
129
+
130
+ curl -X DELETE http://localhost:8083/connectors/$KCONNECT_NAME
131
+
132
+ sleep 5
133
+ kill $KCONNECT_PORT_FORWARD_PID
134
+ }
135
+
65
136
# ### MAIN --------------------------------
66
137
67
138
# Get Broker LoadBalancer Address
@@ -92,7 +163,7 @@ kubectl apply -f temp/${TESTING_TOPIC}/kafka-test-topic.yaml
92
163
sleep 2
93
164
94
165
# Create Kafkacat configuration based if TLS/SSL enforcement is enabled
95
- if [ $1 == " -t " ]; then
166
+ if [ $MTLS_ON == true ]; then
96
167
# TLS Enabled on cluster
97
168
echo " ${YELLOW} Configuring Kafkacat with SSL${NC} "
98
169
# Deploy test user with access to test topic
@@ -119,6 +190,11 @@ if [ $1 == "-t" ]; then
119
190
echo " bootstrap.servers=${BROKER_EXTERNAL_ADDRESS} " > temp/${TESTING_TOPIC} /kafkacat.config
120
191
fi
121
192
193
+ # Deploy Kafka Connect Sink
194
+ if [ $K_CONNECT_ON == true ]; then
195
+ deploy_kafka_connect_connector
196
+ fi
197
+
122
198
# Create random test messages
123
199
MESSAGE_INPUT_FILE=" ./temp/${TESTING_TOPIC} /input-messages.txt"
124
200
@@ -145,6 +221,11 @@ kill $CONSUMER_PID
145
221
# Delete test topic and user
146
222
cleanup_cluster $TESTING_TOPIC
147
223
224
+ # Deploy Kafka Connect Sink
225
+ if [ $K_CONNECT_ON == true ]; then
226
+ remove_kafka_connect_connector
227
+ fi
228
+
148
229
# Compare contents of input and output
149
230
SORTED_INPUT=" ./temp/${TESTING_TOPIC} /sorted-input.txt"
150
231
SORTED_OUTPUT=" ./temp/${TESTING_TOPIC} /sorted-output.txt"
0 commit comments