Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions app/models/emails.js
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,30 @@ Emails.post('save', async function (email, next) {
}
});

// Update user SMTP counters when email is successfully sent
Emails.post('save', async function (email) {
// Only update counters for new emails that are sent/delivered
if (!email._isNew || !['sent', 'delivered'].includes(email.status)) return;

try {
// Increment the user's SMTP counters
await Users.findByIdAndUpdate(email.user, {
$inc: {
smtp_emails_sent_1h: 1,
smtp_emails_sent_24h: 1,
smtp_emails_sent_72h: 1,
smtp_emails_sent_total: 1
},
$set: {
smtp_last_email_sent_at: new Date()
}
});
} catch (err) {
// Log error but don't fail the email save
logger.error(err, { email_id: email._id, user_id: email.user });
}
});

Emails.statics.getMessage = async function (obj, returnString = false) {
if (Buffer.isBuffer(obj)) {
if (returnString) return obj.toString();
Expand Down
34 changes: 34 additions & 0 deletions app/models/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,40 @@ object[config.userFields.smtpLimit] = {
max: 100000
};

// SMTP count tracking for admin dashboard
object.smtp_emails_sent_1h = {
type: Number,
default: 0,
min: 0,
index: true
};

object.smtp_emails_sent_24h = {
type: Number,
default: 0,
min: 0,
index: true
};

object.smtp_emails_sent_72h = {
type: Number,
default: 0,
min: 0,
index: true
};

object.smtp_emails_sent_total = {
type: Number,
default: 0,
min: 0,
index: true
};

object.smtp_last_email_sent_at = {
type: Date,
index: true
};

// Custom receipt email
object[config.userFields.receiptEmail] = {
type: String,
Expand Down
36 changes: 35 additions & 1 deletion app/views/admin/users/_table.pug
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ include ../../_pagination
+sortHeader('max_quota_per_alias', 'Max Qouta', '#table-users')
th(scope="col")
+sortHeader('smtp_limit', 'SMTP Limit', '#table-users')
th(scope="col")
+sortHeader('smtp_emails_sent_total', 'Emails Sent', '#table-users')
th(scope="col")
+sortHeader('smtp_emails_sent_1h', '1hr', '#table-users')
th(scope="col")
+sortHeader('smtp_emails_sent_24h', '24hr', '#table-users')
th(scope="col")
+sortHeader('smtp_emails_sent_72h', '72hr', '#table-users')
th(scope="col")
+sortHeader('smtp_last_email_sent_at', 'Last Email', '#table-users')
th(scope="col")
+sortHeader('created_at', 'Created', '#table-users')
th(scope="col")
Expand All @@ -37,7 +47,7 @@ include ../../_pagination
th.text-center.align-middle(scope="col")= t("Actions")
tbody
if users.length === 0
td.alert.alert-info(colspan=passport && passport.otp ? "14" : "13")= t("No users exist for that search.")
td.alert.alert-info(colspan=passport && passport.otp ? "19" : "18")= t("No users exist for that search.")
else
each user in users
tr
Expand Down Expand Up @@ -116,6 +126,30 @@ include ../../_pagination
data-toggle="tooltip",
data-title=t("Update")
): i.fa.fa-fw.fa-save
td.align-middle.text-right
if user.smtp_emails_sent_total > 0
a(href=l(`/admin/emails?user=${user._id}`), target="_blank")= user.smtp_emails_sent_total.toLocaleString()
else
= user.smtp_emails_sent_total || 0
td.align-middle.text-right
span.badge(
class=user.smtp_emails_sent_1h > 0 ? "badge-warning" : "badge-light"
)= user.smtp_emails_sent_1h || 0
td.align-middle.text-right
span.badge(
class=user.smtp_emails_sent_24h > 0 ? "badge-info" : "badge-light"
)= user.smtp_emails_sent_24h || 0
td.align-middle.text-right
span.badge(
class=user.smtp_emails_sent_72h > 0 ? "badge-primary" : "badge-light"
)= user.smtp_emails_sent_72h || 0
td.align-middle
if user.smtp_last_email_sent_at
.dayjs(
data-time=new Date(user.smtp_last_email_sent_at).getTime()
)= dayjs(user.smtp_last_email_sent_at).tz(user.timezone === 'Etc/Unknown' ? 'UTC' : user.timezone).format("M/D/YY h:mm A z")
else
= "Never"
td.align-middle.dayjs(
data-time=new Date(user.created_at).getTime()
)= dayjs(user.created_at).tz(user.timezone === 'Etc/Unknown' ? 'UTC' : user.timezone).format("M/D/YY h:mm A z")
Expand Down
6 changes: 6 additions & 0 deletions jobs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ let jobs = [
interval: '1d',
timeout: 0
},
// update SMTP counters every hour
{
name: 'update-smtp-counters',
interval: '1h',
timeout: 0
},
// session management
{
name: 'session-management',
Expand Down
128 changes: 128 additions & 0 deletions jobs/update-smtp-counters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright (c) Forward Email LLC
* SPDX-License-Identifier: BUSL-1.1
*/

// eslint-disable-next-line import/no-unassigned-import
require('#config/env');

const process = require('node:process');
const { parentPort } = require('node:worker_threads');

// eslint-disable-next-line import/no-unassigned-import
require('#config/mongoose');

const Graceful = require('@ladjs/graceful');
const mongoose = require('mongoose');

const Emails = require('#models/emails');
const Users = require('#models/users');
const logger = require('#helpers/logger');
const monitorServer = require('#helpers/monitor-server');
const setupMongoose = require('#helpers/setup-mongoose');

monitorServer();

const graceful = new Graceful({
mongooses: [mongoose],
logger
});

graceful.listen();

(async () => {
await setupMongoose(logger);

try {
const now = new Date();
const oneHourAgo = new Date(now - 60 * 60 * 1000);
const twentyFourHoursAgo = new Date(now - 24 * 60 * 60 * 1000);
const seventyTwoHoursAgo = new Date(now - 72 * 60 * 60 * 1000);

logger.info('Starting SMTP counter update job');

// Get SMTP counts per user with time-based breakdowns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a cursor instead of aggregate. You can refer to other jobs as to how cursor is used for iterating in a loop. Basically copy-pasta it over. Otherwise this will timeout and error since aggregate will take too long (there are millions of emails and only growing).

You also might want to keep a track of emails rejected and soft bounced or something, not sure. It would probably be very clear who the abusers are if we saw at a glance how many got rejected or bounced. Or the number with err.bounce_category of "spam" or "virus". Just sharing thoughts.

const emailCounts = await Emails.aggregate([
{
$match: {
// Only count delivered/sent emails (not failed/bounced)
status: { $in: ['delivered', 'deferred', 'sent'] }
}
},
{
$group: {
_id: '$user',
totalEmails: { $sum: 1 },
lastEmailAt: { $max: '$created_at' },
// Count emails within time periods
emailsLast1Hour: {
$sum: {
$cond: [{ $gte: ['$created_at', oneHourAgo] }, 1, 0]
}
},
emailsLast24Hours: {
$sum: {
$cond: [{ $gte: ['$created_at', twentyFourHoursAgo] }, 1, 0]
}
},
emailsLast72Hours: {
$sum: {
$cond: [{ $gte: ['$created_at', seventyTwoHoursAgo] }, 1, 0]
}
}
}
}
]);

logger.info(`Found email counts for ${emailCounts.length} users`);

// Create bulk operations to update all users
const bulkOperations = [];

// First, reset all counters to 0 for all users
bulkOperations.push({
updateMany: {
filter: {},
update: {
$set: {
smtp_emails_sent_1h: 0,
smtp_emails_sent_24h: 0,
smtp_emails_sent_72h: 0,
smtp_emails_sent_total: 0,
smtp_last_email_sent_at: null
}
}
}
});

// Then update users who have email counts
for (const count of emailCounts) {
bulkOperations.push({
updateOne: {
filter: { _id: count._id },
update: {
$set: {
smtp_emails_sent_1h: count.emailsLast1Hour,
smtp_emails_sent_24h: count.emailsLast24Hours,
smtp_emails_sent_72h: count.emailsLast72Hours,
smtp_emails_sent_total: count.totalEmails,
smtp_last_email_sent_at: count.lastEmailAt
}
}
}
});
}

if (bulkOperations.length > 0) {
await Users.bulkWrite(bulkOperations, { ordered: false });
logger.info(`Updated SMTP counters for all users`);
}

logger.info('SMTP counter update job completed successfully');
} catch (err) {
await logger.error(err);
}

if (parentPort) parentPort.postMessage('done');
else process.exit(0);
})();
Loading