Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion containers/api-proxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY package*.json ./
RUN npm ci --omit=dev

# Copy application files
COPY server.js logging.js metrics.js rate-limiter.js ./
COPY server.js logging.js metrics.js rate-limiter.js token-extractor.js ./

# Create non-root user
RUN addgroup -S apiproxy && adduser -S apiproxy -G apiproxy
Expand Down
83 changes: 77 additions & 6 deletions containers/api-proxy/rate-limiter.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/**
* Sliding Window Counter Rate Limiter for AWF API Proxy.
*
* Provides per-provider rate limiting with three limit types:
* Provides per-provider rate limiting with four limit types:
* - RPM: requests per minute (1-second granularity, 60 slots)
* - RPH: requests per hour (1-minute granularity, 60 slots)
* - Bytes/min: request bytes per minute (1-second granularity, 60 slots)
* - TPM: tokens per minute (1-second granularity, 60 slots)
*
* Algorithm: sliding window counter — counts in the current window plus a
* weighted portion of the previous window based on elapsed time.
Expand All @@ -20,6 +21,7 @@
const DEFAULT_RPM = 600;
const DEFAULT_RPH = 1000;
const DEFAULT_BYTES_PM = 50 * 1024 * 1024; // 50 MB
const DEFAULT_TPM = 0; // 0 means disabled/unlimited — opt-in like other limits

// ── Window sizes ────────────────────────────────────────────────────────
const MINUTE_SLOTS = 60; // 1-second granularity for per-minute windows
Expand Down Expand Up @@ -143,6 +145,8 @@ class ProviderState {
this.rphWindow = createWindow(HOUR_SLOTS);
// Bytes per minute: 1-second granularity
this.bytesWindow = createWindow(MINUTE_SLOTS);
// Tokens per minute: 1-second granularity
this.tpmWindow = createWindow(MINUTE_SLOTS);
}
}

Expand All @@ -152,12 +156,14 @@ class RateLimiter {
* @param {number} [config.rpm=600] - Max requests per minute
* @param {number} [config.rph=1000] - Max requests per hour
* @param {number} [config.bytesPm=52428800] - Max bytes per minute (50 MB)
* @param {number} [config.tpm=0] - Max tokens per minute (0 = unlimited)
* @param {boolean} [config.enabled=true] - Whether rate limiting is active
*/
constructor(config = {}) {
this.rpm = config.rpm ?? DEFAULT_RPM;
this.rph = config.rph ?? DEFAULT_RPH;
this.bytesPm = config.bytesPm ?? DEFAULT_BYTES_PM;
this.tpm = config.tpm ?? DEFAULT_TPM;
this.enabled = config.enabled !== false;

/** @type {Map<string, ProviderState>} */
Expand All @@ -184,6 +190,10 @@ class RateLimiter {
* If allowed, the request is counted (recorded in windows).
* If denied, no recording happens — the caller should return 429.
*
* Note: TPM check uses previously recorded token consumption. Tokens
* are recorded post-response via recordTokens(), so the check here
* decides if the NEXT request is allowed based on PREVIOUS consumption.
*
* @param {string} provider - e.g. "openai", "anthropic", "copilot"
* @param {number} [requestBytes=0] - Size of the request body in bytes
* @returns {{
Expand Down Expand Up @@ -253,6 +263,23 @@ class RateLimiter {
};
}

// Check TPM (tokens per minute) — only if configured
if (this.tpm > 0) {
const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS);
if (tpmCount >= this.tpm) {
const retryAfter = estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm);
const resetAt = nowSec + retryAfter;
return {
allowed: false,
limitType: 'tpm',
limit: this.tpm,
remaining: 0,
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check() can now return limitType: 'tpm'. The API proxy’s 429 builder in containers/api-proxy/server.js currently only maps rpm/rph/bytes_pm for the human label and window field, so a TPM rejection will produce a confusing label and an incorrect window value (it will fall through to the bytes-per-minute case). Please update the server-side mapping to include 'tpm' (and a distinct window like per_minute_tokens) so clients can interpret the error correctly.

Suggested change
remaining: 0,
remaining: Math.max(0, this.tpm - tpmCount),

Copilot uses AI. Check for mistakes.
retryAfter,
resetAt,
};
}
}

// All checks passed — record the request
recordInWindow(state.rpmWindow, nowSec, MINUTE_SLOTS, 1);
recordInWindow(state.rphWindow, nowMin, HOUR_SLOTS, 1);
Expand All @@ -275,10 +302,32 @@ class RateLimiter {
}
}

/**
* Record token usage for a provider after a response completes.
*
* This is separate from check() because token counts are only known
* after the response is received and parsed. The recorded tokens are
* used by subsequent check() calls to enforce the TPM limit.
*
* @param {string} provider - e.g. "openai", "anthropic", "copilot"
* @param {number} tokenCount - Number of tokens consumed by the response
*/
recordTokens(provider, tokenCount) {
if (!this.enabled || this.tpm <= 0 || !tokenCount || tokenCount <= 0) return;

try {
const state = this._getState(provider);
const nowSec = Math.floor(Date.now() / 1000);
recordInWindow(state.tpmWindow, nowSec, MINUTE_SLOTS, tokenCount);
} catch (_err) {
// Fail-open: ignore recording errors
}
}

/**
* Get rate limit status for a single provider.
* @param {string} provider
* @returns {object} Status with rpm, rph windows
* @returns {object} Status with rpm, rph, tpm windows
*/
getStatus(provider) {
if (!this.enabled) {
Expand All @@ -288,11 +337,15 @@ class RateLimiter {
try {
const state = this.providers.get(provider);
if (!state) {
return {
const status = {
enabled: true,
rpm: { limit: this.rpm, remaining: this.rpm, reset: 0 },
rph: { limit: this.rph, remaining: this.rph, reset: 0 },
};
if (this.tpm > 0) {
status.tpm = { limit: this.tpm, remaining: this.tpm, reset: 0 };
}
return status;
}

const nowMs = Date.now();
Expand All @@ -309,7 +362,7 @@ class RateLimiter {
? estimateRetryAfter(state.rphWindow, nowMin, HOUR_SLOTS, this.rph) * 60
: 0;

return {
const status = {
enabled: true,
rpm: {
limit: this.rpm,
Expand All @@ -322,6 +375,21 @@ class RateLimiter {
reset: rphRetry > 0 ? Math.floor(nowMs / 1000) + rphRetry : 0,
},
};

// Include TPM status if configured
if (this.tpm > 0) {
const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS);
const tpmRetry = tpmCount >= this.tpm
? estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm)
: 0;
status.tpm = {
limit: this.tpm,
remaining: Math.max(0, this.tpm - tpmCount),
reset: tpmRetry > 0 ? nowSec + tpmRetry : 0,
};
}

return status;
} catch (_err) {
return { enabled: true, error: 'internal_error' };
}
Expand All @@ -344,9 +412,10 @@ class RateLimiter {
* Create a RateLimiter from environment variables.
*
* Reads:
* - AWF_RATE_LIMIT_RPM (default: 60)
* - AWF_RATE_LIMIT_RPM (default: 600)
* - AWF_RATE_LIMIT_RPH (default: 1000)
* - AWF_RATE_LIMIT_BYTES_PM (default: 52428800)
* - AWF_RATE_LIMIT_TPM (default: 0 — disabled)
* - AWF_RATE_LIMIT_ENABLED (default: "false" — rate limiting is opt-in)
*
* @returns {RateLimiter}
Expand All @@ -355,13 +424,15 @@ function create() {
const rawRpm = parseInt(process.env.AWF_RATE_LIMIT_RPM, 10);
const rawRph = parseInt(process.env.AWF_RATE_LIMIT_RPH, 10);
const rawBytesPm = parseInt(process.env.AWF_RATE_LIMIT_BYTES_PM, 10);
const rawTpm = parseInt(process.env.AWF_RATE_LIMIT_TPM, 10);

const rpm = (Number.isFinite(rawRpm) && rawRpm > 0) ? rawRpm : DEFAULT_RPM;
const rph = (Number.isFinite(rawRph) && rawRph > 0) ? rawRph : DEFAULT_RPH;
const bytesPm = (Number.isFinite(rawBytesPm) && rawBytesPm > 0) ? rawBytesPm : DEFAULT_BYTES_PM;
const tpm = (Number.isFinite(rawTpm) && rawTpm > 0) ? rawTpm : DEFAULT_TPM;
const enabled = process.env.AWF_RATE_LIMIT_ENABLED === 'true';

return new RateLimiter({ rpm, rph, bytesPm, enabled });
return new RateLimiter({ rpm, rph, bytesPm, tpm, enabled });
}

module.exports = { RateLimiter, create };
32 changes: 31 additions & 1 deletion containers/api-proxy/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const { HttpsProxyAgent } = require('https-proxy-agent');
const { generateRequestId, sanitizeForLog, logRequest } = require('./logging');
const metrics = require('./metrics');
const rateLimiter = require('./rate-limiter');
const { createTokenExtractor } = require('./token-extractor');

// Create rate limiter from environment variables
const limiter = rateLimiter.create();
Expand Down Expand Up @@ -319,7 +320,36 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider) {
// Copy response headers and add X-Request-ID
const resHeaders = { ...proxyRes.headers, 'x-request-id': requestId };
res.writeHead(proxyRes.statusCode, resHeaders);
proxyRes.pipe(res);

// Extract token counts from response (best-effort, fail-open)
const resContentType = proxyRes.headers['content-type'] || '';
const resContentEncoding = proxyRes.headers['content-encoding'] || '';
const tokenExtractor = createTokenExtractor({
provider,
contentType: resContentType,
contentEncoding: resContentEncoding,
});

tokenExtractor.on('tokens', (tokens) => {
if (tokens.input > 0) {
metrics.increment('tokens_input_total', { provider }, tokens.input);
}
if (tokens.output > 0) {
metrics.increment('tokens_output_total', { provider }, tokens.output);
}
if (tokens.total > 0 && typeof limiter.recordTokens === 'function') {
limiter.recordTokens(provider, tokens.total);
}
logRequest('info', 'tokens_recorded', {
request_id: requestId,
provider,
input_tokens: tokens.input,
output_tokens: tokens.output,
total_tokens: tokens.total,
});
});

proxyRes.pipe(tokenExtractor).pipe(res);
});

proxyReq.on('error', (err) => {
Expand Down
Loading
Loading