Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 85 additions & 17 deletions app/models/emails.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,22 @@ const Emails = new mongoose.Schema(
index: true
},
priority: {
type: Number,
default: 1, // PRIORITY_LEVELS.NORMAL
index: true
},
// abuse score (0-100, higher = more suspicious)
abuse_score: {
type: Number,
default: 0,
min: 0
min: 0,
max: 100,
index: true
},
// temporary throttling (if set, email won't be processed until this date)
throttled_until: {
type: Date,
index: true
},
alias: {
type: mongoose.Schema.ObjectId,
Expand Down Expand Up @@ -344,7 +357,9 @@ Emails.plugin(mongooseCommonPlugin, {
'message',
'locked_by',
'locked_at',
'priority'
'priority',
'abuse_score',
'throttled_until'
]
});

Expand All @@ -354,6 +369,24 @@ Emails.index(
{ partialFilterExpression: { locked_at: { $exists: true } } }
);

// composite index for fair queue processing (most important queries first)
Emails.index({
status: 1,
priority: -1, // higher priority first
domain: 1,
user: 1,
created_at: 1 // FIFO within same priority/domain/user
});

// index for throttled emails cleanup
Emails.index(
{ throttled_until: 1, status: 1 },
{ partialFilterExpression: { throttled_until: { $exists: true } } }
);

// index for abuse score queries
Emails.index({ abuse_score: 1, user: 1 });

// DSN
Emails.pre('validate', function (next) {
if (this.dsn === undefined) return next();
Expand Down Expand Up @@ -701,27 +734,62 @@ Emails.pre('save', function (next) {
}
});

// determine priority
// determine priority using fair queue system
Emails.pre('save', async function (next) {
try {
const domain = await Domains.findById(this.domain);
// we return here instead of erroring
if (!domain) {
// Only calculate priority for new emails or when priority is not set
if (!this.isNew && this.priority !== undefined) {
return next();
}

const { PRIORITY_LEVELS } = require('#config/priority-levels');
const dayjs = require('dayjs-with-plugins');

// Default priority
let priority = PRIORITY_LEVELS.NORMAL;

const [domain, user] = await Promise.all([
Domains.findById(this.domain).populate('members.user', 'id group plan created_at'),
Users.findById(this.user).select('id group plan created_at').lean()
]);

// If domain or user not found, set to pending with low priority
if (!domain || !user) {
this.status = 'pending';
this.priority = 0;
this.priority = PRIORITY_LEVELS.LOW;
return next();
}

// if any of the domain admins are admins then set priority to 1
const adminExists = await Users.exists({
_id: {
$in: domain.members
.filter((m) => m.group === 'admin')
.map((m) => m.user)
},
group: 'admin'
});
this.priority = adminExists ? 1 : 0;
// Check if user is an admin
const isUserAdmin = user.group === 'admin';

// Check if domain has admin users with premium plans
const hasAdminMembers = domain.members.some(
m => m.user &&
m.group === 'admin' &&
m.user.group === 'admin' &&
['enhanced_protection', 'team'].includes(m.user.plan)
);

// Determine priority based on user/domain status
if (isUserAdmin || hasAdminMembers) {
priority = PRIORITY_LEVELS.HIGH;
} else if (user.plan && ['enhanced_protection', 'team'].includes(user.plan)) {
priority = PRIORITY_LEVELS.HIGH;
} else if (dayjs().diff(user.created_at, 'days') < 7) {
// New accounts get lower priority for the first week
priority = PRIORITY_LEVELS.LOW;
} else {
priority = PRIORITY_LEVELS.NORMAL;
}

this.priority = priority;

// Initialize abuse score if not set
if (this.abuse_score === undefined) {
this.abuse_score = 0;
}

next();
} catch (err) {
next(err);
Expand Down
49 changes: 49 additions & 0 deletions config/priority-levels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright (c) Forward Email LLC
* SPDX-License-Identifier: BUSL-1.1
*/

// Priority levels for fair queue system
const PRIORITY_LEVELS = {
HIGH: 2, // Paid users, good reputation, admin domains
NORMAL: 1, // Regular users (default)
LOW: 0, // Free tier, new accounts
THROTTLED: -1 // Detected abuse/suspicious activity
};

// Human readable names for priority levels
const PRIORITY_NAMES = {
[PRIORITY_LEVELS.HIGH]: 'HIGH',
[PRIORITY_LEVELS.NORMAL]: 'NORMAL',
[PRIORITY_LEVELS.LOW]: 'LOW',
[PRIORITY_LEVELS.THROTTLED]: 'THROTTLED'
};

// Configuration for priority-based processing limits
const PRIORITY_CONFIG = {
// Multipliers for queue allocation by priority
QUEUE_MULTIPLIERS: {
[PRIORITY_LEVELS.HIGH]: 1.5, // 50% more queue allocation
[PRIORITY_LEVELS.NORMAL]: 1.0, // Normal allocation
[PRIORITY_LEVELS.LOW]: 0.5, // Half allocation
[PRIORITY_LEVELS.THROTTLED]: 0.1 // Minimal allocation
},

// Maximum concurrent processing by priority
CONCURRENCY_LIMITS: {
[PRIORITY_LEVELS.HIGH]: 0.4, // 40% of total concurrency
[PRIORITY_LEVELS.NORMAL]: 0.4, // 40% of total concurrency
[PRIORITY_LEVELS.LOW]: 0.15, // 15% of total concurrency
[PRIORITY_LEVELS.THROTTLED]: 0.05 // 5% of total concurrency
},

// Abuse detection thresholds
ABUSE_THRESHOLD: 50, // Score above which user gets throttled
MAX_THROTTLE_DURATION: 24 * 60 * 60 * 1000 // 24 hours max throttle
};

module.exports = {
PRIORITY_LEVELS,
PRIORITY_NAMES,
PRIORITY_CONFIG
};
Loading
Loading