diff --git a/package-lock.json b/package-lock.json index c9e9bb8..545a482 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,7 +15,7 @@ "@types/request": "^2.48.1", "axios": "^1.7.7", "body-parser": "^1.20.3", - "express": "^4.21.0", + "express": "^4.21.1", "js-yaml": "^3.13.1", "json-rules-engine": "^2.3.6", "moment-timezone": "^0.5.31", @@ -23,6 +23,7 @@ "nats": "2.10.0", "notifme-sdk": "^1.16.13", "pg": "^8.2.1", + "prom-client": "^15.1.3", "reflect-metadata": "^0.1.13", "typeorm": "0.3.17", "winston": "^3.2.1" @@ -423,7 +424,6 @@ "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", "license": "Apache-2.0", - "optional": true, "engines": { "node": ">=8.0.0" } @@ -975,6 +975,12 @@ "node": ">=8" } }, + "node_modules/bintrees": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", + "integrity": "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==", + "license": "MIT" + }, "node_modules/bl": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", @@ -1407,9 +1413,10 @@ } }, "node_modules/cookie": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.6.0.tgz", - "integrity": "sha512-U71cyTamuh1CRNCfpGY6to28lxvNwPG4Guz/EVjgf3Jmzv0vlDp1atT9eS5dDjMYHucpHbWns6Lwf3BKz6svdw==", + "version": "0.7.1", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.7.1.tgz", + "integrity": "sha512-6DnInpx7SJ2AK3+CTUE/ZM0vWTUboZCegxhC2xiIydHR9jNuTAASBrfEpHhiGOZw/nX51bHt6YQl8jsGo4y/0w==", + "license": "MIT", "engines": { "node": ">= 0.6" } @@ -1716,9 +1723,9 @@ } }, "node_modules/express": { - "version": "4.21.0", - "resolved": "https://registry.npmjs.org/express/-/express-4.21.0.tgz", - "integrity": "sha512-VqcNGcj/Id5ZT1LZ/cfihi3ttTn+NJmkli2eZADigjq29qTlWi/hAQ43t/VLPq8+UX06FCEx3ByOYet6ZFblng==", + "version": "4.21.1", + "resolved": "https://registry.npmjs.org/express/-/express-4.21.1.tgz", + "integrity": "sha512-YSFlK1Ee0/GC8QaO91tHcDxJiE/X4FbpAyQWkxAvG6AXCuR65YzK8ua6D9hvi/TzUfZMpc+BwuM1IPw8fmQBiQ==", "license": "MIT", "dependencies": { "accepts": "~1.3.8", @@ -1726,7 +1733,7 @@ "body-parser": "1.20.3", "content-disposition": "0.5.4", "content-type": "~1.0.4", - "cookie": "0.6.0", + "cookie": "0.7.1", "cookie-signature": "1.0.6", "debug": "2.6.9", "depd": "2.0.0", @@ -3439,6 +3446,19 @@ "node": ">=10" } }, + "node_modules/prom-client": { + "version": "15.1.3", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-15.1.3.tgz", + "integrity": "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==", + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/api": "^1.4.0", + "tdigest": "^0.1.1" + }, + "engines": { + "node": "^16 || ^18 || >=20" + } + }, "node_modules/proto3-json-serializer": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/proto3-json-serializer/-/proto3-json-serializer-2.0.2.tgz", @@ -4049,6 +4069,15 @@ "node": ">=6" } }, + "node_modules/tdigest": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", + "integrity": "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==", + "license": "MIT", + "dependencies": { + "bintrees": "1.0.2" + } + }, "node_modules/teeny-request": { "version": "9.0.0", "resolved": "https://registry.npmjs.org/teeny-request/-/teeny-request-9.0.0.tgz", diff --git a/package.json b/package.json index e6da95f..f30bab0 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,7 @@ "@types/request": "^2.48.1", "axios": "^1.7.7", "body-parser": "^1.20.3", - "express": "^4.21.0", + "express": "^4.21.1", "js-yaml": "^3.13.1", "json-rules-engine": "^2.3.6", "moment-timezone": "^0.5.31", @@ -37,6 +37,7 @@ "nats": "2.10.0", "notifme-sdk": "^1.16.13", "pg": "^8.2.1", + "prom-client": "^15.1.3", "reflect-metadata": "^0.1.13", "typeorm": "0.3.17", "winston": "^3.2.1" diff --git a/src/common/metrics.ts b/src/common/metrics.ts new file mode 100644 index 0000000..2201494 --- /dev/null +++ b/src/common/metrics.ts @@ -0,0 +1,28 @@ +import { Counter, Histogram, register } from "prom-client" + +export const successNotificationMetricsCounter = new Counter({ + name: 'successfulNotifications', + help: 'Number of successful notifications', +}) + +export const failedNotificationMetricsCounter = new Counter({ + name: 'failedNotifications', + help: 'Number of failed notifications', +}) + +export const httpRequestMetricsCounter = new Counter({ + name: 'httpRequestsCounter', + help: 'Number of requests on http endpoints', + labelNames: ['method', 'endpoint', 'statusCode'] +}) + +export const natsHistogram = new Histogram({ + name: 'natsConsumerHistogram', + help: 'nats consumer duration histogram', + labelNames: ['streamName', 'consumerName'] +}) + +register.registerMetric(successNotificationMetricsCounter) +register.registerMetric(failedNotificationMetricsCounter) +register.registerMetric(httpRequestMetricsCounter) +register.registerMetric(natsHistogram) diff --git a/src/destination/destinationHandlers/sesHandler.ts b/src/destination/destinationHandlers/sesHandler.ts index b5eb8c7..b180c53 100644 --- a/src/destination/destinationHandlers/sesHandler.ts +++ b/src/destination/destinationHandlers/sesHandler.ts @@ -52,7 +52,7 @@ export class SESService implements Handler { this.mh = mh } - handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): boolean { + async handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): Promise { let sesTemplate: NotificationTemplates = templates.find(t => { return 'ses' == t.channel_type }) @@ -65,7 +65,7 @@ export class SESService implements Handler { this.sesConfig = null for (const element of providersSet) { if (element['dest'] === "ses") { - this.getDefaultConfig(providersSet, event, sesTemplate, setting, destinationMap, configsMap) + await this.getDefaultConfig(providersSet, event, sesTemplate, setting, destinationMap, configsMap) break } } @@ -82,7 +82,7 @@ export class SESService implements Handler { from_email: config['from_email'] } if(this.sesConfig && this.sesConfig.from_email){ - providersSet.forEach(p => { + for (const p of providersSet) { if (p['dest'] == "ses") { let userId = p['configId'] let recipient = p['recipient'] @@ -93,11 +93,11 @@ export class SESService implements Handler { configKey = p['dest'] + '-' + userId } if (!configsMap.get(configKey)) { - this.processNotification(userId, recipient, event, sesTemplate, setting, p, emailMap) + await this.processNotification(userId, recipient, event, sesTemplate, setting, p, emailMap) configsMap.set(configKey, true) } } - }); + }; } } catch (error) { this.logger.error('getDefaultConfig', error) @@ -105,7 +105,7 @@ export class SESService implements Handler { } } - private preparePaylodAndSend(event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string){ + private async preparePayloadAndSend(event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string){ let sdk: NotifmeSdk = new NotifmeSdk({ channels: { email: { @@ -125,49 +125,67 @@ export class SESService implements Handler { // let options = { allowUndefinedFacts: true } let conditions: string = p['rule']['conditions']; if (conditions) { - engine.addRule({conditions: conditions, event: event}); - engine.run(event).then(e => { - this.sendNotification(event, sdk, sesTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { - this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); - }) + engine.addRule({ conditions: conditions, event: event }); + try { + await engine.run(event); + const result = await this.sendNotification( + event, + sdk, + sesTemplate.template_payload + ); + await this.saveNotificationEventSuccessLog( + result, + event, + p, + setting + ); + } catch (error: any) { + this.logger.error(error.message); + await this.saveNotificationEventFailureLog(event, p, setting); + } } else { - this.sendNotification(event, sdk, sesTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { - this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); + try { + const result = await this.sendNotification( + event, + sdk, + sesTemplate.template_payload + ); + await this.saveNotificationEventSuccessLog( + result, + event, + p, + setting + ); + } catch (error: any) { + this.logger.error(error.message); + await this.saveNotificationEventFailureLog(event, p, setting); + } } } - private processNotification(userId: number, recipient: string, event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { + private async processNotification(userId: number, recipient: string, event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { if(userId) { - this.usersRepository.findByUserId(userId).then(user => { - if (!user) { - this.logger.info('no user found for id - ' + userId) - this.logger.info(event.correlationId) - return - } - this.sendEmailIfNotDuplicate(user['email_id'], event, sesTemplate, setting, p, emailMap) - }) + const user = await this.usersRepository.findByUserId(userId) + if (!user) { + this.logger.info('no user found for id - ' + userId) + this.logger.info(event.correlationId) + return + } + await this.sendEmailIfNotDuplicate(user['email_id'], event, sesTemplate, setting, p, emailMap) }else{ if (!recipient) { this.logger.error('recipient is blank') return } - this.sendEmailIfNotDuplicate(recipient, event, sesTemplate, setting, p, emailMap) + await this.sendEmailIfNotDuplicate(recipient, event, sesTemplate, setting, p, emailMap) } } - private sendEmailIfNotDuplicate(recipient : string, event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { + private async sendEmailIfNotDuplicate(recipient : string, event: Event, sesTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { if (!emailMap.get(recipient)) { emailMap.set(recipient, true) event.payload['toEmail'] = recipient - this.preparePaylodAndSend(event, sesTemplate, setting, p) + await this.preparePayloadAndSend(event, sesTemplate, setting, p) } else { this.logger.info('duplicate email filtered out') } @@ -205,17 +223,17 @@ export class SESService implements Handler { } } - private saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { if (result["status"] == "error") { - this.saveNotificationEventFailureLog(event, p, setting) + await this.saveNotificationEventFailureLog(event, p, setting) } else { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, true, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } - private saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, false, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } diff --git a/src/destination/destinationHandlers/slackHandler.ts b/src/destination/destinationHandlers/slackHandler.ts index 7fe7920..0bdf44d 100644 --- a/src/destination/destinationHandlers/slackHandler.ts +++ b/src/destination/destinationHandlers/slackHandler.ts @@ -44,7 +44,7 @@ export class SlackService implements Handler { this.mh = mh; } - handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): boolean { + async handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): Promise { let slackTemplate: NotificationTemplates = templates.find(t => { return 'slack' == t.channel_type @@ -57,21 +57,21 @@ export class SlackService implements Handler { const providerObjects = setting.config const providersSet = new Set(providerObjects); - providersSet.forEach(p => { + for (const p of providersSet) { if (p['dest'] == "slack") { let slackConfigId = p['configId'] let configKey = p['dest'] + '-' + slackConfigId if (!configsMap.get(configKey)) { - this.processNotification(slackConfigId, event, slackTemplate, setting, p, destinationMap) + await this.processNotification(slackConfigId, event, slackTemplate, setting, p, destinationMap) configsMap.set(configKey, true) } } - }); + } return true } - private processNotification(slackConfigId: number, event: Event, slackTemplate: NotificationTemplates, setting: NotificationSettings, p: string, webhookMap: Map) { - this.slackConfigRepository.findBySlackConfigId(slackConfigId).then(config => { + private async processNotification(slackConfigId: number, event: Event, slackTemplate: NotificationTemplates, setting: NotificationSettings, p: string, webhookMap: Map) { + const config = await this.slackConfigRepository.findBySlackConfigId(slackConfigId) if (!config) { this.logger.info('no slack config found for event') this.logger.info(event.correlationId) @@ -97,30 +97,45 @@ export class SlackService implements Handler { }); let engine = new Engine(); // let options = { allowUndefinedFacts: true } - let conditions: string = p['rule']['conditions']; + let conditions: string = p["rule"]["conditions"]; if (conditions) { - engine.addRule({conditions: conditions, event: event}); - engine.run(event).then(e => { - this.sendNotification(event, sdk, slackTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { - this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); - }) + engine.addRule({ conditions: conditions, event: event }); + try { + await engine.run(event); + const result = await this.sendNotification( + event, + sdk, + slackTemplate.template_payload + ); + await this.saveNotificationEventSuccessLog( + result, + event, + p, + setting + ); + } catch (error: any) { + this.logger.error(error.message); + await this.saveNotificationEventFailureLog(event, p, setting); + } } else { - this.sendAndLogNotification(event, sdk, setting, p, slackTemplate); + await this.sendAndLogNotification( + event, + sdk, + setting, + p, + slackTemplate + ); } - }) } - public sendAndLogNotification(event: Event, sdk: NotifmeSdk, setting: NotificationSettings, p: any, slackTemplate: NotificationTemplates){ - this.sendNotification(event, sdk, slackTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { + public async sendAndLogNotification(event: Event, sdk: NotifmeSdk, setting: NotificationSettings, p: any, slackTemplate: NotificationTemplates){ + try { + const result = await this.sendNotification(event, sdk, slackTemplate.template_payload) + await this.saveNotificationEventSuccessLog(result, event, p, setting); + } catch(error: any) { this.logger.error(error.message); this.saveNotificationEventFailureLog(event, p, setting); - }); + }; } public async sendNotification(event: Event, sdk: NotifmeSdk, template: string) { @@ -149,17 +164,17 @@ export class SlackService implements Handler { } } - private saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { if (result["status"] == "error") { - this.saveNotificationEventFailureLog(event, p, setting) + await this.saveNotificationEventFailureLog(event, p, setting) } else { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, true, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } - private saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, false, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } diff --git a/src/destination/destinationHandlers/smtpHandler.ts b/src/destination/destinationHandlers/smtpHandler.ts index 1b27d51..69ea341 100644 --- a/src/destination/destinationHandlers/smtpHandler.ts +++ b/src/destination/destinationHandlers/smtpHandler.ts @@ -52,7 +52,7 @@ export class SMTPService implements Handler { this.mh = mh } - handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): boolean { + async handle(event: Event, templates: NotificationTemplates[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): Promise { let sesTemplate: NotificationTemplates = templates.find(t => { return 'ses' == t.channel_type }) @@ -65,7 +65,7 @@ export class SMTPService implements Handler { this.smtpConfig = null for (const element of providersSet) { if (element['dest'] === "smtp") { - this.getDefaultConfig(providersSet, event, sesTemplate, setting, destinationMap, configsMap) + await this.getDefaultConfig(providersSet, event, sesTemplate, setting, destinationMap, configsMap) break } } @@ -83,7 +83,7 @@ export class SMTPService implements Handler { from_email: config['from_email'] } if(this.smtpConfig && this.smtpConfig.from_email){ - providersSet.forEach(p => { + for (const p of providersSet) { if (p['dest'] == "smtp") { let userId = p['configId'] let recipient = p['recipient'] @@ -94,11 +94,11 @@ export class SMTPService implements Handler { configKey = p['dest'] + '-' + userId } if (!configsMap.get(configKey)) { - this.processNotification(userId, recipient, event, sesTemplate, setting, p, emailMap) + await this.processNotification(userId, recipient, event, sesTemplate, setting, p, emailMap) configsMap.set(configKey, true) } } - }); + }; } } catch (error) { this.logger.error('getDefaultConfig', error) @@ -106,7 +106,7 @@ export class SMTPService implements Handler { } } - private preparePaylodAndSend(event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string){ + private async preparePayloadAndSend(event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string){ const smtpConfig = this.smtpConfig; // Create the email provider configuration let emailProviderConfig: any = { @@ -143,48 +143,48 @@ export class SMTPService implements Handler { if (conditions) { engine.addRule({conditions: conditions, event: event}); - engine.run(event).then(e => { - this.sendNotification(event, sdk, smtpTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { - this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); - }) + try { + await engine.run(event) + const result = await this.sendNotification(event, sdk, smtpTemplate.template_payload) + await this.saveNotificationEventSuccessLog(result, event, p, setting);} + catch(error: any) { + this.logger.error(error.message); + await this.saveNotificationEventFailureLog(event, p, setting); + }; } else { - this.sendNotification(event, sdk, smtpTemplate.template_payload).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { + try { + const result = this.sendNotification(event, sdk, smtpTemplate.template_payload) + await this.saveNotificationEventSuccessLog(result, event, p, setting);} + catch(error: any) { this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); + await this.saveNotificationEventFailureLog(event, p, setting); + }; } } - private processNotification(userId: number, recipient: string, event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { + private async processNotification(userId: number, recipient: string, event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { if(userId) { - this.usersRepository.findByUserId(userId).then(user => { - if (!user) { - this.logger.info('no user found for id - ' + userId) - this.logger.info(event.correlationId) - return - } - this.sendEmailIfNotDuplicate(user['email_id'], event, smtpTemplate, setting, p, emailMap) - }) + const user = await this.usersRepository.findByUserId(userId) + if (!user) { + this.logger.info('no user found for id - ' + userId) + this.logger.info(event.correlationId) + return + } + await this.sendEmailIfNotDuplicate(user['email_id'], event, smtpTemplate, setting, p, emailMap) }else{ if (!recipient) { this.logger.error('recipient is blank') return } - this.sendEmailIfNotDuplicate(recipient, event, smtpTemplate, setting, p, emailMap) + await this.sendEmailIfNotDuplicate(recipient, event, smtpTemplate, setting, p, emailMap) } } - private sendEmailIfNotDuplicate(recipient : string, event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { + private async sendEmailIfNotDuplicate(recipient : string, event: Event, smtpTemplate: NotificationTemplates, setting: NotificationSettings, p: string, emailMap: Map) { if (!emailMap.get(recipient)) { emailMap.set(recipient, true) event.payload['toEmail'] = recipient - this.preparePaylodAndSend(event, smtpTemplate, setting, p) + await this.preparePayloadAndSend(event, smtpTemplate, setting, p) } else { this.logger.info('duplicate email filtered out') } @@ -220,17 +220,17 @@ export class SMTPService implements Handler { } } - private saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { if (result["status"] == "error") { - this.saveNotificationEventFailureLog(event, p, setting) + await this.saveNotificationEventFailureLog(event, p, setting) } else { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, true, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } - private saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, false, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } diff --git a/src/destination/destinationHandlers/webhookHandler.ts b/src/destination/destinationHandlers/webhookHandler.ts index 3210b30..bfa5e90 100644 --- a/src/destination/destinationHandlers/webhookHandler.ts +++ b/src/destination/destinationHandlers/webhookHandler.ts @@ -41,7 +41,7 @@ export class WebhookService implements Handler{ this.logger = logger this.mh = mh; } - handle(event: Event, templates: WebhookConfig[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): boolean{ + async handle(event: Event, templates: WebhookConfig[], setting: NotificationSettings, configsMap: Map, destinationMap: Map): Promise{ let webhookTemplate: WebhookConfig = templates.find(t => { return t }) @@ -52,32 +52,34 @@ export class WebhookService implements Handler{ const providerObjects = setting.config const providersSet = new Set(providerObjects); - providersSet.forEach(p => { + for (const p of providersSet) { if (p['dest'] == "webhook" && p['configId']==webhookTemplate.id) { let webhookConfigId = p['configId'] let configKey = p['dest'] + '-' + webhookConfigId if (!configsMap.get(configKey)) { - this.processNotification(webhookConfigId, event, webhookTemplate, setting, p, destinationMap) + await this.processNotification(webhookConfigId, event, webhookTemplate, setting, p, destinationMap) configsMap.set(configKey, true) } } - }); + }; return true } - public sendAndLogNotification(event: Event, webhookTemplate: WebhookConfig, setting: NotificationSettings, p: any) { + public async sendAndLogNotification(event: Event, webhookTemplate: WebhookConfig, setting: NotificationSettings, p: any) { const payload=typeof webhookTemplate.payload==="object"?JSON.stringify(webhookTemplate.payload) : webhookTemplate.payload; - this.sendNotification(event, webhookTemplate.web_hook_url, payload,webhookTemplate.header).then(result => { - this.saveNotificationEventSuccessLog(result, event, p, setting); - }).catch((error) => { + + try { + const result = await this.sendNotification(event, webhookTemplate.web_hook_url, payload,webhookTemplate.header) + await this.saveNotificationEventSuccessLog(result, event, p, setting); + } catch (error: any) { this.logger.error(error.message); - this.saveNotificationEventFailureLog(event, p, setting); - }); + await this.saveNotificationEventFailureLog(event, p, setting); + } } - private processNotification(webhookConfigId: number, event: Event, webhookTemplate: WebhookConfig, setting: NotificationSettings, p: string, webhookMap: Map) { - this.webhookConfigRepository.findByWebhookConfigId(webhookConfigId).then(config => { + private async processNotification(webhookConfigId: number, event: Event, webhookTemplate: WebhookConfig, setting: NotificationSettings, p: string, webhookMap: Map) { + this.webhookConfigRepository.findByWebhookConfigId(webhookConfigId).then(async (config) => { if (!config) { this.logger.info('no webhook config found for event') this.logger.info(event.correlationId) @@ -94,11 +96,10 @@ export class WebhookService implements Handler{ let conditions: string = p['rule']['conditions']; if (conditions) { engine.addRule({conditions: conditions, event: event}); - engine.run(event).then(e => { - this.sendAndLogNotification(event, webhookTemplate, setting, p); - }) + await engine.run(event) + await this.sendAndLogNotification(event, webhookTemplate, setting, p); } else { - this.sendAndLogNotification(event, webhookTemplate, setting, p); + await this.sendAndLogNotification(event, webhookTemplate, setting, p); } }) } @@ -134,19 +135,18 @@ export class WebhookService implements Handler{ } } - private saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { - + private async saveNotificationEventSuccessLog(result: any, event: Event, p: any, setting: NotificationSettings) { if (!result || result["status"] == "error") { - this.saveNotificationEventFailureLog(event, p, setting) + await this.saveNotificationEventFailureLog(event, p, setting) } else { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, true, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } - private saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { + private async saveNotificationEventFailureLog(event: Event, p: any, setting: NotificationSettings) { let eventLog = this.eventLogBuilder.buildEventLog(event, p.dest, false, setting); - this.eventLogRepository.saveEventLog(eventLog); + await this.eventLogRepository.saveEventLog(eventLog); } } diff --git a/src/notification/service/notificationService.ts b/src/notification/service/notificationService.ts index 49bd435..31de056 100644 --- a/src/notification/service/notificationService.ts +++ b/src/notification/service/notificationService.ts @@ -29,7 +29,7 @@ import {NotifmeSdk} from 'notifme-sdk' import {CustomError, CustomResponse} from "../../entities/events"; export interface Handler { - handle(event: Event, templates: (NotificationTemplates[] | WebhookConfig[]), setting: NotificationSettings, configMap: Map, destinationMap: Map): boolean + handle(event: Event, templates: (NotificationTemplates[] | WebhookConfig[]), setting: NotificationSettings, configMap: Map, destinationMap: Map): Promise sendNotification(event: Event, sdk: any, template: string) } @@ -48,7 +48,7 @@ class NotificationService { this.templatesRepository = templatesRepository this.logger = logger } - public sendApprovalNotificaton(event:Event){ + public async sendApprovalNotification(event:Event){ try { if (!this.isValidEventForApproval(event)) { throw new CustomError("Event is not valid for approval ", 400) @@ -70,7 +70,8 @@ class NotificationService { }); - this.templatesRepository.findByEventTypeId(event.eventTypeId).then((templateResults: NotificationTemplates[]) => { + try { + const templateResults: NotificationTemplates[] = await this.templatesRepository.findByEventTypeId(event.eventTypeId) if (!templateResults) { this.logger.info("no templates found for event ", event); throw new CustomError("no templates found for event", 404) @@ -81,33 +82,35 @@ class NotificationService { settings.event_type_id = event.eventTypeId for (let h of this.handlers) { if ((h instanceof SESService) || (h instanceof SMTPService)) { - h.handle(event, templateResults, settings, configsMap, destinationMap) + await h.handle(event, templateResults, settings, configsMap, destinationMap) } - } - }).catch(err => this.logger.error("err" + err)) - }catch (e:any){ + }} + catch(err) { + this.logger.error("err" + err) + } + } catch(e:any) { throw e instanceof CustomError?e:new CustomError(e.message,400) } } // this function is used to send webhook notification for scoop notification event type - private sendWebhookNotification(event: Event) { - this.handlers.forEach((h) => { + private async sendWebhookNotification(event: Event) { + for (const h of this.handlers) { if (h instanceof WebhookService){ let setting = new NotificationSettings() setting.event_type_id = event.eventTypeId setting.pipeline_id = 0 setting.config = event.payload - h.sendAndLogNotification(event, event.payload.scoopNotificationConfig.webhookConfig as WebhookConfig, setting, {"dest": "webhook"}) + await h.sendAndLogNotification(event, event.payload.scoopNotificationConfig.webhookConfig as WebhookConfig, setting, {"dest": "webhook"}) } - }) + } } // this function is used to send slack notification for scoop notification event type - private sendSlackNotification(event: Event) { - this.handlers.forEach((h) => { + private async sendSlackNotification(event: Event) { + for (const h of this.handlers) { if (h instanceof SlackService){ - this.templatesRepository.findByEventTypeIdAndChannelType(event.eventTypeId, "slack").then((templateResults:NotificationTemplates[]) => { + const templateResults: NotificationTemplates[] = await this.templatesRepository.findByEventTypeIdAndChannelType(event.eventTypeId, "slack") if (!templateResults) { this.logger.info("no templates found for event ", event); return @@ -129,28 +132,27 @@ class NotificationService { setting.event_type_id = event.eventTypeId setting.pipeline_id = 0 setting.config = event.payload - h.sendAndLogNotification(event, sdk,setting,{"dest": "slack"}, slackTemplateConfig) - }) + await h.sendAndLogNotification(event, sdk,setting,{"dest": "slack"}, slackTemplateConfig) + } } - }) - } + } public async sendNotification(event: Event):Promise { try { if (event.payload.providers && event.payload.providers.length > 0) { - this.sendApprovalNotificaton(event) + await this.sendApprovalNotification(event) return new CustomResponse("notification sent",200) } // check webhook for scoop notification event type if (event.eventTypeId == EVENT_TYPE.ScoopNotification && event.payload.scoopNotificationConfig.webhookConfig) { - this.sendWebhookNotification(event) + await this.sendWebhookNotification(event) return new CustomResponse("notification sent",200) } // check slack for scoop notification event type if (event.eventTypeId == EVENT_TYPE.ScoopNotification && event.payload.scoopNotificationConfig.slackConfig) { - this.sendSlackNotification(event) + await this.sendSlackNotification(event) return new CustomResponse("notification sent",200) } @@ -159,7 +161,7 @@ class NotificationService { throw new CustomError("Event is not valid", 400) } - const settingsResults=await this.notificationSettingsRepository.findByEventSource(event.pipelineType, event.pipelineId, event.eventTypeId, event.appId, event.envId, event.teamId, event.clusterId, event.isProdEnv); + const settingsResults=await this.notificationSettingsRepository.findByEventSource(event.pipelineType, event.pipelineId, event.eventTypeId, event.appId, event.envId, event.teamId, event.clusterId, event.isProdEnv,event.envIdsForCiPipeline); this.logger.info('notificationSettingsRepository.findByEventSource') if (!settingsResults || settingsResults.length == 0) { this.logger.info("no notification settings found for event " + event.correlationId); @@ -178,7 +180,7 @@ class NotificationService { }); }); - settingsResults.forEach((setting) => { + for (const setting of settingsResults) { const configArray = setting.config as any; if (Array.isArray(configArray)) { @@ -186,8 +188,8 @@ class NotificationService { if (webhookConfig.length) { const webhookConfigRepository = new WebhookConfigRepository(); - webhookConfig.forEach(config => { - webhookConfigRepository.getAllWebhookConfigs().then((templateResults: WebhookConfig[]) => { + for (const config of webhookConfig) { + const templateResults: WebhookConfig[] = await webhookConfigRepository.getAllWebhookConfigs() const newTemplateResult = templateResults.filter((t) => t.id === config.configId); if (newTemplateResult.length === 0) { @@ -202,27 +204,25 @@ class NotificationService { for (const h of this.handlers) { if (h instanceof WebhookService) { if (event.eventTypeId === EVENT_TYPE.ImageScan && !!event.payload.imageScanExecutionInfo) { - h.handle(ImageScanEvent, newTemplateResult, setting, configsMap, destinationMap); + await h.handle(ImageScanEvent, newTemplateResult, setting, configsMap, destinationMap); } - h.handle(event, newTemplateResult, setting, configsMap, destinationMap); + await h.handle(event, newTemplateResult, setting, configsMap, destinationMap); } } - }); - }); - } + }; + }; if (configArray.length > webhookConfig.length) { - this.templatesRepository.findByEventTypeIdAndNodeType(event.eventTypeId, event.pipelineType).then((templateResults: NotificationTemplates[]) => { + const templateResults: NotificationTemplates[] = await this.templatesRepository.findByEventTypeIdAndNodeType(event.eventTypeId, event.pipelineType) if (!templateResults) { this.logger.info("no templates found for event ", event); return new CustomResponse("",0,new CustomError("no templates found for event", 404)); } for (let h of this.handlers) { - h.handle(event, templateResults, setting, configsMap, destinationMap) + await h.handle(event, templateResults, setting, configsMap, destinationMap) } - }) - } + } } - }); + }; this.logger.info("notification sent"); return new CustomResponse("notification sent",200) }catch (error:any){ @@ -256,5 +256,6 @@ class Event { clusterId: number isProdEnv: boolean baseUrl?: string + envIdsForCiPipeline?: number[] } export {NotificationService, Event} \ No newline at end of file diff --git a/src/pubSub/pubSub.ts b/src/pubSub/pubSub.ts index 889d139..51b9a7d 100644 --- a/src/pubSub/pubSub.ts +++ b/src/pubSub/pubSub.ts @@ -9,20 +9,18 @@ import { NatsStreamWiseConfigMapping, NatsTopic, NatsTopicMapping, numberOfRetries, - } from "./utils"; import {ConsumerOptsBuilderImpl} from "nats/lib/nats-base-client/jsconsumeropts"; - -import {ConsumerInfo, ConsumerUpdateConfig, JetStreamManager, StreamConfig} from "nats/lib/nats-base-client/types"; +import {ConsumerInfo, JetStreamManager, StreamConfig} from "nats/lib/nats-base-client/types"; +import { natsHistogram } from "../common/metrics"; const consumerNotFoundErrorCode = 10014; const streamNotFoundErrorCode = 10059; export interface PubSubService { - Subscribe(topic: string, callback: (msg: string) => void): void + Subscribe(topic: string, callback: (msg: string) => Promise): void } - export class PubSubServiceImpl implements PubSubService { private nc: NatsConnection private js: JetStreamClient @@ -39,7 +37,7 @@ export class PubSubServiceImpl implements PubSubService { // ********** Subscribe function provided by consumer - async Subscribe(topic: string, callback: (msg: string) => void) { + async Subscribe(topic: string, callback: (msg: string) => Promise) { const natsTopicConfig: NatsTopic = NatsTopicMapping.get(topic) const streamName = natsTopicConfig.streamName const consumerName = natsTopicConfig.consumerName @@ -58,10 +56,12 @@ export class PubSubServiceImpl implements PubSubService { ack_policy:AckPolicy.Explicit, deliver_policy:DeliverPolicy.Last, max_ack_pending:1, - }).bindStream(streamName).callback((err, msg) => { + }).bindStream(streamName).callback(async (err, msg) => { + const timer = natsHistogram.startTimer() try { const msgString = getJsonString(msg.data) - callback(msgString) + await callback(msgString) + timer({streamName, consumerName}) } catch (err) { this.logger.error("msg: "+msg.data+" err: "+err); } diff --git a/src/repository/notificationSettingsRepository.ts b/src/repository/notificationSettingsRepository.ts index 1cdaf40..8bd4a07 100644 --- a/src/repository/notificationSettingsRepository.ts +++ b/src/repository/notificationSettingsRepository.ts @@ -28,7 +28,8 @@ export class NotificationSettingsRepository { envId: number, teamId: number, clusterId: number, - isProdEnv: boolean + isProdEnv: boolean, + envIdsForCiPipeline: number[] ): Promise { if (eventTypeId == 6) { //this is the case when deployment is blocked and pipeline is set to auto trigger @@ -72,16 +73,29 @@ export class NotificationSettingsRepository { .andWhere("ns.team_id is NULL") .andWhere("ns.pipeline_id is NULL"); }) - ) + ); - .orWhere( + if (envIdsForCiPipeline && envIdsForCiPipeline.length > 0) { + qb.orWhere( new Brackets((qb) => { qb.where("ns.app_id is NULL") - .andWhere("ns.env_id is NULL") - .andWhere("ns.team_id = :teamId", { teamId: teamId }) + .andWhere("ns.env_id IN (:...envIds)", { + envIds: envIdsForCiPipeline, + }) + .andWhere("ns.team_id is NULL") .andWhere("ns.pipeline_id is NULL"); }) - ) + ); + } + + qb.orWhere( + new Brackets((qb) => { + qb.where("ns.app_id is NULL") + .andWhere("ns.env_id is NULL") + .andWhere("ns.team_id = :teamId", { teamId: teamId }) + .andWhere("ns.pipeline_id is NULL"); + }) + ) .orWhere( new Brackets((qb) => { qb.where("ns.app_id is NULL") @@ -109,8 +123,21 @@ export class NotificationSettingsRepository { .andWhere("ns.team_id = :teamId", { teamId: teamId }) .andWhere("ns.pipeline_id is NULL"); }) - ) - .orWhere( + ); + + if (envIdsForCiPipeline && envIdsForCiPipeline.length > 0) { + qb.orWhere( + new Brackets((qb) => { + qb.where("ns.app_id is NULL") + .andWhere("ns.env_id IN (:...envIds)", { + envIds: envIdsForCiPipeline, + }) + .andWhere("ns.team_id = :teamId", { teamId: teamId }) + .andWhere("ns.pipeline_id is NULL"); + }) + ); + } + qb.orWhere( new Brackets((qb) => { qb.where("ns.app_id = :appId", { appId: appId }) .andWhere("ns.team_id is NULL") @@ -127,8 +154,21 @@ export class NotificationSettingsRepository { }) .andWhere("ns.pipeline_id is NULL"); }) - ) - .orWhere( + ); + + if (envIdsForCiPipeline && envIdsForCiPipeline.length > 0) { + qb.orWhere( + new Brackets((qb) => { + qb.where("ns.app_id = :appId", { appId: appId }) + .andWhere("ns.team_id is NULL") + .andWhere("ns.env_id IN (:...envIds)", { + envIds: envIdsForCiPipeline, + }) + .andWhere("ns.pipeline_id is NULL"); + }) + ); + } + qb.orWhere( new Brackets((qb) => { qb.where("ns.app_id = :appId", { appId: appId }) .andWhere("ns.env_id = :envId", { envId: envId }) @@ -137,8 +177,22 @@ export class NotificationSettingsRepository { pipelineId: pipelineId, }); }) - ) - .orWhere( + ); + if (envIdsForCiPipeline && envIdsForCiPipeline.length > 0) { + qb.orWhere( + new Brackets((qb) => { + qb.where("ns.app_id = :appId", { appId: appId }) + .andWhere("ns.env_id IN (:...envIds)", { + envIds: envIdsForCiPipeline, + }) + .andWhere("ns.team_id = :teamId", { teamId: teamId }) + .orWhere("ns.pipeline_id = :pipelineId", { + pipelineId: pipelineId, + }); + }) + ); + } + qb.orWhere( new Brackets((qb) => { qb.where("ns.app_id =:appId", { appId: appId }) .andWhere("ns.env_id is NULL") diff --git a/src/server.ts b/src/server.ts index ea47c95..360548f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -17,7 +17,7 @@ import express from 'express'; import { NotificationService, Event, Handler } from './notification/service/notificationService' import "reflect-metadata" -import {ConnectionOptions, createConnection, getConnectionOptions, getManager} from "typeorm" +import { ConnectionOptions, createConnection } from "typeorm" import { NotificationSettingsRepository } from "./repository/notificationSettingsRepository" import { SlackService } from './destination/destinationHandlers/slackHandler' import { SESService } from './destination/destinationHandlers/sesHandler' @@ -46,15 +46,16 @@ import { WebhookConfig } from './entities/webhookconfig'; import * as process from "process"; import bodyParser from 'body-parser'; import {connect, NatsConnection} from "nats"; - +import { register } from 'prom-client' import {NOTIFICATION_EVENT_TOPIC} from "./pubSub/utils"; import {PubSubServiceImpl} from "./pubSub/pubSub"; +import { failedNotificationMetricsCounter, httpRequestMetricsCounter, successNotificationMetricsCounter } from './common/metrics'; + const app = express(); const natsUrl = process.env.NATS_URL app.use(bodyParser.json({ limit: '10mb' })); app.use(express.json()); - let logger = winston.createLogger({ level: 'info', format: winston.format.combine( @@ -124,12 +125,28 @@ createConnection(dbOptions).then(async connection => { logger.error("shutting down notifier due to un-successful database connection...") process.exit(1) }); -const natsEventHandler = (msg: string) => { + +const natsEventHandler = async (msg: string) => { const eventAsString = JSON.parse(msg) const event = JSON.parse(eventAsString) as Event - notificationService.sendNotification(event) + logger.info({natsEventBody: event}) + const response = await notificationService.sendNotification(event) + if (response.status != 0){ + successNotificationMetricsCounter.inc() + } else{ + failedNotificationMetricsCounter.inc() + } } -app.get('/', (req, res) => res.send('Welcome to notifier Notifier!')) + +// Request counter for all endpoints +app.use((req, res, next) => { + httpRequestMetricsCounter.labels({method: req.method, endpoint: req.url, statusCode: res.statusCode}).inc() + next() + }) + +app.get('/', (req, res) => { + res.send('Welcome to notifier Notifier!') +}) app.get('/health', (req, res) => { res.status(200).send("healthy") @@ -142,12 +159,21 @@ app.get('/test', (req, res) => { app.post('/notify', async(req, res) => { logger.info("notifications Received") + console.log('endpoint:', JSON.stringify(req.baseUrl)) + console.log('payload:', JSON.stringify(req.body)) const response=await notificationService.sendNotification(req.body); if (response.status!=0){ res.status(response.status).json({message:response.message}).send() + successNotificationMetricsCounter.inc() }else{ res.status(response.error.statusCode).json({message:response.error.message}).send() + failedNotificationMetricsCounter.inc() } }); +app.get('/metrics', async (req, res) => { + res.setHeader('Content-Type', register.contentType) + res.send(await register.metrics()) +}); + app.listen(3000, () => logger.info('Notifier app listening on port 3000!'))