- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 867
fix(run-queue): Scan for queues using a duplicate redis client instead of the instance version #2451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…d of the instance version
| 
 | 
| WalkthroughThe change replaces scan operations to use a duplicated Redis connection (redis.duplicate()) instead of the primary client within concurrency-related streams. currentConcurrencyScanStream and processConcurrencySet now create a duplicate, use it to drive scanStream/sscanStream, and return the stream along with the duplicate Redis instance. After scanning completes (or on error), the duplicate connection is closed via redis.quit(). Error handling is added around scan operations, capturing scanError and logging failures in both scanConcurrencySets and processConcurrencySet. These updates are internal; no exported/public API signatures were changed. Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
 🧪 Generate unit tests
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type  Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️  Outside diff range comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
1956-2019: Mirror the same abort-safe completion for per-set SSCAN; fix log message; always quit in finally.Without closing on abort/close, this can also hang. Align handling with scanConcurrencySets.
Apply this diff:
private async processConcurrencySet(concurrencyKey: string) { - const redis = this.redis.duplicate(); - - const stream = redis.sscanStream(concurrencyKey, { + const redis = this.redis.duplicate(); + const stream = redis.sscanStream(concurrencyKey, { count: 100, }); const { promise, resolve, reject } = promiseWithResolvers<void>(); stream.on("end", () => { resolve(); }); stream.on("error", (error) => { this.logger.error("Error in sscanStream for concurrency set", { concurrencyKey, error, }); reject(error); }); + // Resolve if the stream is destroyed/closed or the RunQueue aborts. + stream.on("close", () => resolve()); + const onAbort = () => { + stream.destroy(); + resolve(); + }; + this.abortController.signal.addEventListener("abort", onAbort, { once: true }); + stream.on("data", async (runIds: string[]) => { stream.pause(); @@ - const [scanError] = await tryCatch(promise); - - if (scanError) { - this.logger.error("Error scanning concurrency sets", { - error: scanError, - }); - } - - await redis.quit(); - - return promise; + try { + await promise; + } catch (scanError) { + this.logger.error("Error scanning concurrency set", { concurrencyKey, error: scanError }); + } finally { + this.abortController.signal.removeEventListener("abort", onAbort); + await redis.quit(); + } return promise; }
🧹 Nitpick comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
1919-1924: Avoid accidental mid-string prefix removal when normalizing keys.replace() may remove the first occurrence anywhere in the key. Prefer startsWith + slice to strip only a true prefix.
Apply this diff:
- const uniqueKeys = Array.from(new Set<string>(keys)).map((key) => - key.replace(redis.options.keyPrefix ?? "", "") - ); + const prefix = redis.options.keyPrefix ?? ""; + const uniqueKeys = Array.from(new Set<string>(keys)).map((key) => + prefix && key.startsWith(prefix) ? key.slice(prefix.length) : key + );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
- internal-packages/run-engine/src/run-queue/index.ts(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
- internal-packages/run-engine/src/run-queue/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (22)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: typecheck / typecheck
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
internal-packages/run-engine/src/run-queue/index.ts (1)
912-931: Good switch to a duplicated Redis client for SCAN.Using a dedicated duplicate for scanStream avoids interference with the primary client. Returning it to the caller for explicit lifecycle management is clear.
| const [scanError] = await tryCatch(promise); | ||
|  | ||
| if (scanError) { | ||
| this.logger.error("Error scanning concurrency sets", { | ||
| error: scanError, | ||
| }); | ||
| } | ||
|  | ||
| await redis.quit(); | ||
|  | ||
| return promise; | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure scan always completes on abort and return concrete stats; close client in finally.
If the AbortSignal fires before any data/end/error, the promise can hang. Also, returning the already-settled promise is awkward and drops stats.
Apply this diff:
-    const [scanError] = await tryCatch(promise);
-
-    if (scanError) {
-      this.logger.error("Error scanning concurrency sets", {
-        error: scanError,
-      });
-    }
-
-    await redis.quit();
-
-    return promise;
+    const onAbort = () => {
+      stream.destroy();
+      resolve(stats);
+    };
+    this.abortController.signal.addEventListener("abort", onAbort, { once: true });
+
+    try {
+      const result = await promise;
+      return result;
+    } catch (scanError) {
+      this.logger.error("Error scanning concurrency sets", { error: scanError });
+      throw scanError;
+    } finally {
+      this.abortController.signal.removeEventListener("abort", onAbort);
+      await redis.quit();
+    }Committable suggestion skipped: line range outside the PR's diff.
No description provided.