@@ -11,8 +11,8 @@ export class KafkaManager {
1111 private kafka : Kafka ;
1212 private admin : Admin ;
1313 private producer : Producer ;
14- private consumer : Consumer ;
15- private consumer2 : Consumer ;
14+ private consumer_of_question_topic : Consumer ;
15+ private consumer_of_room_created_topic : Consumer ;
1616 private isConnected = false ;
1717
1818 constructor ( ) {
@@ -33,8 +33,8 @@ export class KafkaManager {
3333
3434 this . admin = this . kafka . admin ( ) ;
3535 this . producer = this . kafka . producer ( ) ;
36- this . consumer = this . kafka . consumer ( { groupId : 'matching-service-group' } ) ;
37- this . consumer2 = this . kafka . consumer ( { groupId : 'matching-service-group-2' } ) ;
36+ this . consumer_of_question_topic = this . kafka . consumer ( { groupId : 'matching-service-group' } ) ;
37+ this . consumer_of_room_created_topic = this . kafka . consumer ( { groupId : 'matching-service-group-2' } ) ;
3838 }
3939
4040 async initWithRetry ( maxRetries = 5 , retryDelayMs = 2000 ) : Promise < void > {
@@ -74,8 +74,8 @@ export class KafkaManager {
7474
7575 await this . admin . disconnect ( ) ;
7676 await this . producer . connect ( ) ;
77- await this . consumer . connect ( ) ;
78- await this . consumer2 . connect ( ) ;
77+ await this . consumer_of_question_topic . connect ( ) ;
78+ await this . consumer_of_room_created_topic . connect ( ) ;
7979 this . isConnected = true ;
8080 console . log ( 'Connected to Kafka' ) ;
8181 return ;
@@ -101,9 +101,9 @@ export class KafkaManager {
101101 } ) : Promise < void > {
102102 await this . initWithRetry ( ) ;
103103
104- await this . consumer . subscribe ( { topic : QUESTION_TOPIC , fromBeginning : false } ) ;
104+ await this . consumer_of_question_topic . subscribe ( { topic : QUESTION_TOPIC , fromBeginning : false } ) ;
105105
106- this . consumer . run ( {
106+ this . consumer_of_question_topic . run ( {
107107 eachMessage : async ( { topic, message } : EachMessagePayload ) => {
108108 if ( topic === QUESTION_TOPIC && handlers . onQuestionMessage ) {
109109 await handlers . onQuestionMessage ( {
@@ -114,9 +114,9 @@ export class KafkaManager {
114114 } ,
115115 } ) ;
116116
117- await this . consumer2 . subscribe ( { topic : ROOM_CREATED_TOPIC , fromBeginning : false } ) ;
117+ await this . consumer_of_room_created_topic . subscribe ( { topic : ROOM_CREATED_TOPIC , fromBeginning : false } ) ;
118118
119- this . consumer2 . run ( {
119+ this . consumer_of_room_created_topic . run ( {
120120 eachMessage : async ( { topic, message } : EachMessagePayload ) => {
121121 if ( topic === ROOM_CREATED_TOPIC && handlers . onRoomCreatedMessage ) {
122122 await handlers . onRoomCreatedMessage ( {
@@ -187,8 +187,8 @@ export class KafkaManager {
187187 }
188188
189189 async disconnect ( ) : Promise < void > {
190- try { await this . consumer . disconnect ( ) ; } catch { }
191- try { await this . consumer2 . disconnect ( ) ; } catch { }
190+ try { await this . consumer_of_question_topic . disconnect ( ) ; } catch { }
191+ try { await this . consumer_of_room_created_topic . disconnect ( ) ; } catch { }
192192 try { await this . producer . disconnect ( ) ; } catch { }
193193 try { await this . admin . disconnect ( ) ; } catch { }
194194 }
0 commit comments