Skip to content

Conversation

loks0n
Copy link
Contributor

@loks0n loks0n commented Sep 5, 2025

Summary by CodeRabbit

  • New Features

    • Graceful worker shutdown with per-worker stop hooks and observable "Worker Stopped" events.
    • Event-driven message consumption replacing perpetual loops.
    • New lifecycle logging for worker start/stop, job receipt, success, and errors.
  • Bug Fixes

    • Ensure AMQP consumption is stopped before closing connections to avoid hanging consumers.
  • Chores

    • Test container images add PCNTL support; docker-compose volumes standardized for container code.

Copy link

coderabbitai bot commented Sep 5, 2025

Walkthrough

Adds worker-stop lifecycle hooks and signal handling across adapters and servers, replaces a manual consume loop with event-driven consumption in Server, calls AMQP channel stopConsume() on close, standardizes test Docker mounts, and installs PCNTL in test images.

Changes

Cohort / File(s) Summary
Swoole adapter
src/Queue/Adapter/Swoole.php
Adds use Swoole\Constant and use Utopia\CLI\Console; introduces private $onStop; registers main/per-worker SIGTERM/SIGINT handlers (async); replaces string event names with Constant::EVENT_*; stop() invokes $onStop and logs; workerStop() stores the stop callback.
Server control flow & hooks
src/Queue/Server.php
Replaces infinite worker loop with event-driven consume callbacks; adds protected Hook $workerStopHook, workerStop(): Hook, and getWorkerStop(): ?Hook; wires workerStop to close consumer and invoke configured stop hook; refactors init/shutdown hook wiring, error handling, and duration capture.
AMQP broker shutdown
src/Queue/Broker/AMQP.php
close() now calls $this->channel?->stopConsume() (nullsafe) before closing the connection.
Docker compose & test images
docker-compose.yml, tests/.../Dockerfile
Standardizes mounted code path to /usr/src/code and adds ./vendor:/usr/src/code/vendor mounts; test server Dockerfiles install pcntl (docker-php-ext-install pcntl).
Test servers — worker-stop usage
tests/Queue/servers/*/worker.php
Adds workerStop()->action(...) handlers in Swoole, SwooleRedisCluster, AMQP, and Workerman test worker scripts that echo "Worker Stopped".

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Server
  participant Adapter
  participant Consumer
  participant Broker

  Client->>Server: start()
  Server->>Adapter: register workerStart(handler)
  Server->>Adapter: register workerStop(() => Consumer.close())

  Note over Server,Consumer: Event-driven consumption (consume with callbacks)
  Adapter->>Consumer: consume(onMessage)

  Consumer->>Broker: fetch message
  Broker-->>Consumer: message
  Consumer->>Server: onMessage(message)

  rect #dff0d8
    Server->>Server: initHooks -> execute job action -> success log
  end

  rect #f8d7da
    alt on error
      Server->>Server: errorHooks -> log error
    end
  end

  Server->>Server: finally: record processDuration

  Note over Adapter: Worker stop flow
  Adapter-->>Server: invoke workerStop callback
  Server->>Consumer: close()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • abnegate
  • christyjacob4

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly highlights the main new feature—adding the workerStop handler—and conveys the associated improvement of graceful exit behavior, directly summarizing the primary intent and scope of the changes.

Poem

I twitch my whiskers, ears alert,
Replaced the loop with gentler spurt.
When workers sleep, I softly call—
AMQP pauses, hooks stand tall.
The burrow hums; all tasks befall. 🐇✨

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-workerstop

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/Queue/Adapter/Swoole.php (1)

50-58: Same fixes for workerStop handler

Align types and PHPMD naming.

- $this->onStop = $callback;
- $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
+ $this->onStop = $callback;
+ $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
     call_user_func($callback, $workerId);
 });
src/Queue/Server.php (1)

54-55: Avoid uninitialized typed property access for workerStartHook

Hook may not be set before start(); accessing a non-initialized typed property can fatal. Make it nullable and lazily init in getter.

- protected Hook $workerStartHook;
+ protected ?Hook $workerStartHook = null;
- if (!is_null($this->workerStartHook)) {
+ if ($this->workerStartHook) {
     call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
 }
- public function getWorkerStart(): Hook
- {
-     return $this->workerStartHook;
- }
+ public function getWorkerStart(): Hook
+ {
+     if (!$this->workerStartHook) {
+         $this->workerStartHook = new Hook();
+         $this->workerStartHook->groups(['*']);
+     }
+     return $this->workerStartHook;
+ }

Also applies to: 216-219, 316-319

🧹 Nitpick comments (2)
src/Queue/Broker/AMQP.php (1)

132-136: Graceful shutdown: null out channel and guard for lib differences

Calling stopConsume() before closing is good. Two tweaks:

  • Set $this->channel to null after closing to avoid reusing a dead channel.
  • If older php-amqplib versions are in play, consider guarding stopConsume().
 public function close(): void
 {
-    $this->channel?->stopConsume();
-    $this->channel?->getConnection()?->close();
+    try {
+        // php-amqplib >= 3.x
+        $this->channel?->stopConsume();
+    } catch (\Throwable) {
+        // ignore – not all versions expose stopConsume()
+    }
+    $this->channel?->getConnection()?->close();
+    $this->channel = null;
 }
src/Queue/Adapter/Swoole.php (1)

14-16: Make onStop nullable-typed

Doc says callable but it’s null until set. Use a proper nullable type.

-/** @var callable */
-private $onStop;
+private ?callable $onStop = null;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9f74bfc and a8a21e0.

📒 Files selected for processing (3)
  • src/Queue/Adapter/Swoole.php (4 hunks)
  • src/Queue/Broker/AMQP.php (1 hunks)
  • src/Queue/Server.php (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Server.php (2)
  • shutdown (168-174)
  • workerStart (304-310)
src/Queue/Adapter.php (1)
  • workerStart (35-35)
src/Queue/Adapter/Workerman.php (1)
  • workerStart (34-41)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
  • consume (75-130)
  • close (132-136)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Broker/Redis.php (2)
  • consume (19-90)
  • close (92-95)
src/Queue/Message.php (4)
  • Message (5-81)
  • getPid (52-55)
  • getTimestamp (62-65)
  • getPayload (67-70)
src/Queue/Job.php (2)
  • getHook (36-39)
  • hook (24-29)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (50-58)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

43-43: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)


53-53: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)

🔇 Additional comments (2)
src/Queue/Server.php (2)

220-253: LGTM: cleaner consume flow with metrics

Switching to the adapter’s consume loop and recording wait/process durations in seconds is solid. Returning the job action result to drive ack semantics is correct.


288-289: LGTM: worker-stop cleanup

Registering workerStop to close the consumer matches the AMQP stopConsume() addition and ensures graceful shutdown.

Comment on lines 32 to 62
public function stop(): self
{
if ($this->onStop) {
call_user_func($this->onStop);
}
$this->pool->shutdown();
return $this;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t invoke onStop() from stop(); it can break callback signatures and double-fire

stop() runs in the master process without a workerId; the registered callback expects one (as per workerStop). Let EVENT_WORKER_STOP handle it per worker.

 public function stop(): self
 {
-    if ($this->onStop) {
-        call_user_func($this->onStop);
-    }
     $this->pool->shutdown();
     return $this;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public function stop(): self
{
if ($this->onStop) {
call_user_func($this->onStop);
}
$this->pool->shutdown();
return $this;
}
public function stop(): self
{
$this->pool->shutdown();
return $this;
}
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 32 to 39, remove the direct
invocation of $this->onStop() from stop() because stop() runs in the master
process without a workerId and calling the callback here can break expected
callback signatures and cause it to fire twice; instead only perform pool
shutdown and return $this, leaving invocation of the onStop callback to the
per-worker EVENT_WORKER_STOP handler so it receives the correct workerId and is
not double-fired.

Comment on lines 43 to 83
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix workerId type and silence PHPMD unused $pool

Swoole passes int $workerId. Also rename $pool to $_pool.

- $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
+ $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) {
     call_user_func($callback, $workerId);
 });
🧰 Tools
🪛 PHPMD (2.15.0)

43-43: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)

🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 43 to 45, update the Pool::on
callback signature to accept an int $workerId and rename the unused $pool
parameter to $_pool to silence PHPMD; specifically change the anonymous function
parameters from (Pool $pool, string $workerId) to (Pool $_pool, int $workerId)
and keep the call_user_func($callback, $workerId) body unchanged.

Comment on lines +241 to 252
foreach ($this->initHooks as $hook) { // Group init hooks
if (in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments($hook, $message->getPayload());
\call_user_func_array($hook->getAction(), $arguments);
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor Job::getHook() for group hooks as well

Global hooks are gated by getHook(), but group hooks always run. Gate both for consistency.

- foreach ($this->job->getGroups() as $group) {
-     foreach ($this->initHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+     foreach ($this->job->getGroups() as $group) {
+         foreach ($this->initHooks as $hook) { // Group init hooks
              if (in_array($group, $hook->getGroups())) {
                  $arguments = $this->getArguments($hook, $message->getPayload());
                  \call_user_func_array($hook->getAction(), $arguments);
              }
-         }
-     }
+         }
+     }
+ }
- foreach ($this->job->getGroups() as $group) {
-     foreach ($this->shutdownHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+     foreach ($this->job->getGroups() as $group) {
+         foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
              if (in_array($group, $hook->getGroups())) {
                  $arguments = $this->getArguments($hook, $message->getPayload());
                  \call_user_func_array($hook->getAction(), $arguments);
              }
-     }
- }
+         }
+     }
+ }

Also applies to: 265-272

🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 241-247, group init hooks are invoked
unconditionally when the group matches; modify the condition to also respect the
hook selector by only running the hook if its getHook() is null (applies to all)
or matches the current job/message hook (e.g. $message->getHook()), i.e. add a
check like ($hook->getHook() === null || $hook->getHook() ===
$message->getHook()) combined with the existing group check; apply the same
change to the corresponding group hook block at lines 265-272.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/Queue/Server.php (1)

100-106: Undefined index risk in getResource()

Reading self::$resourcesCallbacks[$name]['reset'] before verifying the key exists can emit notices/fatals. Check existence first.

-        if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
-            if (!\array_key_exists($name, self::$resourcesCallbacks)) {
-                throw new Exception("Failed to find resource: $name");
-            }
+        $cb = self::$resourcesCallbacks[$name] ?? null;
+        if (!\array_key_exists($name, $this->resources) || $fresh || (($cb['reset'] ?? false) === true)) {
+            if ($cb === null) {
+                throw new Exception("Failed to find resource: $name");
+            }
 
-            $this->resources[$name] = \call_user_func_array(
-                self::$resourcesCallbacks[$name]['callback'],
-                $this->getResources(self::$resourcesCallbacks[$name]['injections'])
-            );
+            $this->resources[$name] = \call_user_func_array(
+                $cb['callback'],
+                $this->getResources($cb['injections'])
+            );
♻️ Duplicate comments (4)
src/Queue/Adapter/Swoole.php (2)

52-55: Don’t invoke workerStop callback from stop(); let EVENT_WORKER_STOP handle it

Calling $this->onStop() here fires the hook in the master process without a workerId and risks double-firing. Remove it and rely on the registered EVENT_WORKER_STOP handler.

Apply this diff:

     public function stop(): self
     {
-        if ($this->onStop) {
-            call_user_func($this->onStop);
-        }
-
         Console::info("[Swoole] Shutting down process pool...");
         $this->pool->shutdown();
         Console::success("[Swoole] Process pool stopped.");
         return $this;
     }

64-79: Fix workerId type, silence PHPMD, and address PSR-12 spacing in closures

Swoole passes int $workerId. Also rename the unused $pool param to $_pool. PSR-12 requires a space after function. This also resolves the linter failure.

-        $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) {
             // Register signal handlers in each worker process for graceful shutdown
             if (extension_loaded('pcntl')) {
-                pcntl_signal(SIGTERM, function() use ($workerId) {
+                pcntl_signal(SIGTERM, function () use ($workerId) {
                     Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
                     $this->consumer->close();
                 });
 
-                pcntl_signal(SIGINT, function() use ($workerId) {
+                pcntl_signal(SIGINT, function () use ($workerId) {
                     Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
                     $this->consumer->close();
                 });
 
                 pcntl_async_signals(true);
             }
src/Queue/Server.php (2)

236-244: Honor Job::getHook() for group init hooks as well

Group init hooks run unconditionally; align with global behavior by gating on getHook().

-                            }
-
-                            foreach ($this->job->getGroups() as $group) {
+                            }
+                            if ($this->job->getHook()) {
+                                foreach ($this->job->getGroups() as $group) {
                                     foreach ($this->initHooks as $hook) { // Group init hooks
                                         if (in_array($group, $hook->getGroups())) {
                                             $arguments = $this->getArguments($hook, $message->getPayload());
                                             \call_user_func_array($hook->getAction(), $arguments);
                                         }
                                     }
-                            }
+                                }
+                            }

Also applies to: 245-252


270-277: Honor Job::getHook() for group shutdown hooks as well

Same gating needed for shutdown group hooks.

-                        foreach ($this->job->getGroups() as $group) {
-                            foreach ($this->shutdownHooks as $hook) { // Group init hooks
+                        if ($this->job->getHook()) {
+                            foreach ($this->job->getGroups() as $group) {
+                                foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
                                     if (in_array($group, $hook->getGroups())) {
                                         $arguments = $this->getArguments($hook, $message->getPayload());
                                         \call_user_func_array($hook->getAction(), $arguments);
                                     }
-                            }
-                        }
+                                }
+                            }
+                        }
🧹 Nitpick comments (10)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)

5-5: LGTM — pcntl install added

This enables signal handling needed by workerStop. Consider squashing RUNs to reduce layers, but fine for tests.

tests/Queue/servers/Swoole/Dockerfile (1)

5-5: LGTM — pcntl install added

Matches the new signal handling in the adapter. Optional: combine with apk add in a single RUN to shrink layers.

src/Queue/Adapter/Swoole.php (2)

15-17: Type the onStop property

Minor: make the property nullable-typed for clarity and static analysis.

-    /** @var callable */
-    private $onStop;
+    private ?callable $onStop = null;

30-44: Optional: Prefer Swoole signal APIs for tighter integration

Using pcntl_* works (with async signals), but Swoole\Process::signal integrates with Swoole’s event loop and avoids edge cases under coroutines. Consider switching in a follow-up.

docker-compose.yml (1)

6-8: Volume path refactor looks consistent; consider setting working_dir

Mounts now target /usr/src/code across services and commands use absolute script paths—good. Optionally set working_dir: /usr/src/code on services running php commands to simplify relative includes and developer ergonomics.

Example:

   swoole:
     container_name: swoole
     build: ./tests/Queue/servers/Swoole/.
+    working_dir: /usr/src/code
     command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php

Also applies to: 20-22, 31-33, 43-45, 55-57

tests/Queue/servers/Workerman/worker.php (1)

32-36: Include workerId in stop logs for observability

Inject and print the worker ID to aid debugging and correlate stop events.

 $server
-    ->workerStop()
-    ->action(function () {
-        echo "Worker Stopped" . PHP_EOL;
-    });
+    ->workerStop()
+    ->inject('workerId')
+    ->action(function (string $workerId) {
+        echo "Worker {$workerId} Stopped" . PHP_EOL;
+    });
tests/Queue/servers/Swoole/worker.php (1)

33-37: Include workerId in stop logs for observability

Same improvement as other servers: surface the worker ID.

 $server
-    ->workerStop()
-    ->action(function () {
-        echo "Worker Stopped" . PHP_EOL;
-    });
+    ->workerStop()
+    ->inject('workerId')
+    ->action(function (string $workerId) {
+        echo "Worker {$workerId} Stopped" . PHP_EOL;
+    });
tests/Queue/servers/SwooleRedisCluster/worker.php (1)

33-37: Include workerId in stop logs for observability

Mirror the same enhancement here.

 $server
-    ->workerStop()
-    ->action(function () {
-        echo "Worker Stopped" . PHP_EOL;
-    });
+    ->workerStop()
+    ->inject('workerId')
+    ->action(function (string $workerId) {
+        echo "Worker {$workerId} Stopped" . PHP_EOL;
+    });
tests/Queue/servers/AMQP/worker.php (1)

32-36: Include workerId in stop logs for observability

Consistent with other servers.

 $server
-    ->workerStop()
-    ->action(function () {
-        echo "Worker Stopped" . PHP_EOL;
-    });
+    ->workerStop()
+    ->inject('workerId')
+    ->action(function (string $workerId) {
+        echo "Worker {$workerId} Stopped" . PHP_EOL;
+    });
src/Queue/Server.php (1)

271-271: Comment typo: “Group init hooks” → “Group shutdown hooks”

Fix misleading comment to match the block.

-                            foreach ($this->shutdownHooks as $hook) { // Group init hooks
+                            foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8a21e0 and f51092a.

📒 Files selected for processing (11)
  • docker-compose.yml (5 hunks)
  • src/Queue/Adapter/Swoole.php (3 hunks)
  • src/Queue/Broker/AMQP.php (1 hunks)
  • src/Queue/Server.php (8 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/Queue/Broker/AMQP.php
🧰 Additional context used
🧬 Code graph analysis (6)
tests/Queue/servers/Swoole/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (86-94)
src/Queue/Server.php (1)
  • workerStop (342-348)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
tests/Queue/servers/Workerman/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (86-94)
src/Queue/Server.php (1)
  • workerStop (342-348)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
tests/Queue/servers/SwooleRedisCluster/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (86-94)
src/Queue/Server.php (1)
  • workerStop (342-348)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (86-94)
src/Queue/Server.php (1)
  • workerStop (342-348)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
src/Queue/Adapter/Swoole.php (4)
src/Queue/Broker/Pool.php (1)
  • Pool (10-56)
src/Queue/Adapter.php (4)
  • Adapter (5-49)
  • stop (28-28)
  • start (22-22)
  • workerStart (35-35)
src/Queue/Server.php (4)
  • stop (185-196)
  • start (215-315)
  • shutdown (173-179)
  • workerStart (321-327)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
  • consume (75-130)
  • close (132-136)
src/Queue/Broker/Redis.php (2)
  • consume (19-90)
  • close (92-95)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Message.php (4)
  • Message (5-81)
  • getPid (52-55)
  • getTimestamp (62-65)
  • getPayload (67-70)
src/Queue/Job.php (2)
  • getHook (36-39)
  • hook (24-29)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (86-94)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

64-64: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)


89-89: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)

🪛 GitHub Actions: Linter
src/Queue/Adapter/Swoole.php

[error] 1-1: PSR 12: function_declaration violation detected by Pint. 29 files, 1 style issue. Failing file: src/Queue/Adapter/Swoole.php. Run 'vendor/bin/pint --test' to reproduce.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Adapter Tests (Workerman)
  • GitHub Check: Adapter Tests (SwooleRedisCluster)
  • GitHub Check: Adapter Tests (Pool)
🔇 Additional comments (2)
tests/Queue/servers/AMQP/Dockerfile (1)

3-5: LGTM — pcntl install for AMQP test image

Consistent with other servers; enables graceful shutdown paths.

src/Queue/Server.php (1)

231-233: Timestamp units are consistent: enqueue() uses time() (seconds) and getTimestamp() returns that value, so microtime(true) – $message->getTimestamp() yields the correct wait time in seconds.

Comment on lines +89 to 93
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mirror the same workerId typing and PHPMD fix in workerStop

Align with Swoole’s signature and silence PHPMD. Also adheres to PSR-12 by spacing after function.

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
             call_user_func($callback, $workerId);
         });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
🧰 Tools
🪛 PHPMD (2.15.0)

89-89: Avoid unused parameters such as '$pool'. (Unused Code Rules)

(UnusedFormalParameter)

🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 89 to 91, update the anonymous
handler to mirror the same workerId typing and PHPMD fix used elsewhere: change
the second parameter type to int (e.g. Pool $_pool, int $workerId), rename the
unused Pool parameter to-prefixed $_pool to silence PHPMD about an unused
variable, and ensure there is a single space after the function keyword to
satisfy PSR-12; then call the callback with the typed $workerId as before.

Comment on lines +221 to 223
if ($this->workerStartHook !== null) {
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard uninitialized typed property access (workerStartHook)

Accessing a not-yet-initialized typed property throws. Use isset() and ensure an action exists.

-                if ($this->workerStartHook !== null) {
-                    call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
-                }
+                if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) {
+                    call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
+                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ($this->workerStartHook !== null) {
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}
if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) {
call_user_func_array(
$this->workerStartHook->getAction(),
$this->getArguments($this->workerStartHook)
);
}
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 221 to 223, the typed property
$this->workerStartHook may be uninitialized and accessing it directly will
throw; guard the access with an isset() (or property_exists) check and also
verify that getAction() returns a callable before calling call_user_func_array.
Replace the direct access with a conditional that ensures $this->workerStartHook
is set and that $this->workerStartHook->getAction() is a callable, then call
call_user_func_array only when both checks pass (use $this->getArguments(...) as
before).

Comment on lines +293 to 305
$this->adapter->workerStop(function ($workerId) {
Console::info("[Worker] Worker {$workerId} stopping...");
self::setResource('workerId', fn () => $workerId);

// Call user-defined workerStop hook if set
if ($this->workerStopHook !== null) {
call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
}

// Close consumer connection
$this->adapter->consumer->close();
Console::success("[Worker] Worker {$workerId} stopped gracefully.");
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard uninitialized typed property access (workerStopHook)

Same issue as workerStartHook; also verify action exists before calling.

-            $this->adapter->workerStop(function ($workerId) {
+            $this->adapter->workerStop(function ($workerId) {
                 Console::info("[Worker] Worker {$workerId} stopping...");
                 self::setResource('workerId', fn () => $workerId);
 
                 // Call user-defined workerStop hook if set
-                if ($this->workerStopHook !== null) {
-                    call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
-                }
+                if (isset($this->workerStopHook) && $this->workerStopHook->getAction()) {
+                    call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
+                }
 
                 // Close consumer connection
                 $this->adapter->consumer->close();
                 Console::success("[Worker] Worker {$workerId} stopped gracefully.");
             });
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 293 to 305, avoid accessing the possibly
uninitialized typed property workerStopHook and ensure the hook action is
callable before invoking it: first check that the property is set/non-null (e.g.
isset($this->workerStopHook) or $this->workerStopHook !== null) to prevent
uninitialized property access, then retrieve the action and verify
is_callable(...) (or use callable checks) before calling call_user_func_array;
only call the consumer-close and success log after safely handling the optional
hook.

Comment on lines +354 to 357
public function getWorkerStop(): ?Hook
{
return $this->workerStopHook;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make getWorkerStop() safe with uninitialized property

Return null when unset to match the declared return type.

-    public function getWorkerStop(): ?Hook
-    {
-        return $this->workerStopHook;
-    }
+    public function getWorkerStop(): ?Hook
+    {
+        return isset($this->workerStopHook) ? $this->workerStopHook : null;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public function getWorkerStop(): ?Hook
{
return $this->workerStopHook;
}
public function getWorkerStop(): ?Hook
{
return isset($this->workerStopHook)
? $this->workerStopHook
: null;
}
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 354-357, the getWorkerStop() method may
access an uninitialized $workerStopHook property; update the method to return
null when the property is unset (e.g. check isset($this->workerStopHook) and
return it only if set, otherwise return null) so the returned value always
matches the declared ?Hook type without triggering uninitialized property
notices.

@loks0n loks0n changed the title feat: add workerStop handling feat: add workerStop handler, graceful exits Sep 10, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/Queue/Adapter/Swoole.php (1)

90-95: Mirror the same signature fix for EVENT_WORKER_STOP

Align types and PHPMD suppression.

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
♻️ Duplicate comments (6)
src/Queue/Adapter/Swoole.php (2)

54-57: Don’t invoke onStop() from stop(); let EVENT_WORKER_STOP fire it per worker

Calling it here runs in the master process, breaks expected signatures, and risks double-firing.

-        if ($this->onStop) {
-            call_user_func($this->onStop);
-        }

66-83: Fix Swoole callback signature: workerId is int; silence PHPMD unused $pool

Swoole passes (Pool, int). Current string type can raise a TypeError; also rename the unused param.

-        $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) {
src/Queue/Server.php (4)

245-252: Honor Job::getHook() for group hooks as well

Group hooks should be gated the same as global hooks for consistency.

-            foreach ($this->job->getGroups() as $group) {
-                foreach ($this->initHooks as $hook) { // Group init hooks
+            if ($this->job->getHook()) {
+                foreach ($this->job->getGroups() as $group) {
+                    foreach ($this->initHooks as $hook) { // Group init hooks
                         if (in_array($group, $hook->getGroups())) {
                             $arguments = $this->getArguments($hook, $message->getPayload());
                             \call_user_func_array($hook->getAction(), $arguments);
                         }
-                }
-            }
+                    }
+                }
+            }
-                        foreach ($this->job->getGroups() as $group) {
-                            foreach ($this->shutdownHooks as $hook) { // Group init hooks
+                        if ($this->job->getHook()) {
+                            foreach ($this->job->getGroups() as $group) {
+                                foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
                                     if (in_array($group, $hook->getGroups())) {
                                         $arguments = $this->getArguments($hook, $message->getPayload());
                                         \call_user_func_array($hook->getAction(), $arguments);
                                     }
-                            }
-                        }
+                                }
+                            }
+                        }

Also applies to: 270-277


221-223: Guard access to uninitialized typed property: workerStartHook

Accessing an uninitialized typed property throws. Check isset() and action before calling.

-                if ($this->workerStartHook !== null) {
-                    call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
-                }
+                if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) {
+                    call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
+                }

293-305: Guard access to uninitialized typed property: workerStopHook

Same issue as workerStartHook; also verify an action exists.

-                if ($this->workerStopHook !== null) {
-                    call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
-                }
+                if (isset($this->workerStopHook) && $this->workerStopHook->getAction()) {
+                    call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
+                }

350-357: Make getWorkerStop() safe when unset

Return null if the hook wasn’t configured to match the ?Hook signature.

-    public function getWorkerStop(): ?Hook
-    {
-        return $this->workerStopHook;
-    }
+    public function getWorkerStop(): ?Hook
+    {
+        return isset($this->workerStopHook) ? $this->workerStopHook : null;
+    }
🧹 Nitpick comments (2)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)

5-5: Slim the image and avoid cache while installing pcntl

Use --no-cache, parallel build, and remove build deps in the same layer. Keeps images smaller and reproducible.

Apply:

-FROM phpswoole/swoole:php8.3-alpine
-
-RUN apk add autoconf build-base
-
-RUN docker-php-ext-install pcntl
-RUN docker-php-ext-enable redis
+FROM phpswoole/swoole:php8.3-alpine
+
+RUN apk add --no-cache --virtual .build-deps autoconf build-base \
+ && docker-php-ext-install -j"$(nproc)" pcntl \
+ && docker-php-ext-enable redis \
+ && apk del .build-deps
src/Queue/Server.php (1)

302-304: Avoid double-closing the consumer

Signal handlers in workers already close the consumer; workerStop also closes it. It’s mostly harmless but noisy. Consider making close() idempotent (no-op if already closed), or dropping one call.

Also applies to: 69-77

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f51092a and e078822.

📒 Files selected for processing (11)
  • docker-compose.yml (5 hunks)
  • src/Queue/Adapter/Swoole.php (3 hunks)
  • src/Queue/Broker/AMQP.php (1 hunks)
  • src/Queue/Server.php (8 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
  • tests/Queue/servers/AMQP/Dockerfile
  • src/Queue/Broker/AMQP.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/Workerman/worker.php
  • tests/Queue/servers/AMQP/worker.php
  • docker-compose.yml
  • tests/Queue/servers/Swoole/worker.php
  • tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
  • consume (75-130)
  • close (132-136)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Broker/Redis.php (2)
  • consume (19-90)
  • close (92-95)
src/Queue/Message.php (4)
  • Message (5-81)
  • getPid (52-55)
  • getTimestamp (62-65)
  • getPayload (67-70)
src/Queue/Job.php (2)
  • getHook (36-39)
  • hook (24-29)
src/Queue/Adapter/Swoole.php (1)
  • workerStop (88-96)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
src/Queue/Adapter/Swoole.php (6)
src/Queue/Adapter.php (4)
  • Adapter (5-49)
  • stop (28-28)
  • start (22-22)
  • workerStart (35-35)
src/Queue/Server.php (3)
  • stop (185-196)
  • start (215-315)
  • workerStart (321-327)
src/Queue/Adapter/Workerman.php (3)
  • stop (28-32)
  • start (22-26)
  • workerStart (34-41)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/Redis.php (1)
  • close (92-95)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

66-66: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


91-91: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (3)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)

5-5: Verify if pcntl is pre-enabled in the base image
Manually run:

docker run --rm phpswoole/swoole:php8.3-alpine php -m | grep -i pcntl

If pcntl appears, remove the RUN docker-php-ext-install pcntl line.

src/Queue/Adapter/Swoole.php (1)

30-46: Good: main-process signal handlers for graceful shutdown

PSR-12 spacing is correct; async signals enabled; warning path covered when pcntl missing.

src/Queue/Server.php (1)

254-258: Good: always record process duration in finally

Ensures telemetry completeness even on failures.

Comment on lines +15 to +17
/** @var callable */
private $onStop;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Type the callback property and default it to null

Prevents accidental non-callable values and clarifies intent.

-    /** @var callable */
-    private $onStop;
+    private ?callable $onStop = null;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** @var callable */
private $onStop;
// Replace the untyped property and docblock with a nullable typed property
private ?callable $onStop = null;
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 15 to 17, the private $onStop
callback is currently untyped and unset; change it to a nullable callable typed
property and default it to null (e.g., private ?callable $onStop = null) so that
non-callable assignments are prevented and the intent is explicit; update or
remove the redundant docblock accordingly and ensure any code that sets or
invokes $onStop accounts for a null value.

Comment on lines 102 to 106
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception('Failed to find resource: "' . $name . '"');
throw new Exception("Failed to find resource: $name");
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid undefined index on ‘reset’ for unset resources

If a resource exists in $this->resources but has no callback (e.g., 'error' before setResource), accessing ['reset'] throws a notice. Guard the access.

-        if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
+        if (
+            !\array_key_exists($name, $this->resources)
+            || $fresh
+            || (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
+        ) {
             if (!\array_key_exists($name, self::$resourcesCallbacks)) {
                 throw new Exception("Failed to find resource: $name");
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception('Failed to find resource: "' . $name . '"');
throw new Exception("Failed to find resource: $name");
}
if (
!\array_key_exists($name, $this->resources)
|| $fresh
|| (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception("Failed to find resource: $name");
}
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 102 to 106, the condition accesses
self::$resourcesCallbacks[$name]['reset'] without ensuring
self::$resourcesCallbacks[$name] exists, which can trigger an undefined index
notice; update the condition to first verify the callback entry exists (using
isset() or array_key_exists on self::$resourcesCallbacks[$name]) before reading
the ['reset'] value—e.g., reorder or combine checks so the ['reset'] access is
guarded (e.g., check callback existence and then check its 'reset' flag or use
null-coalescing/isset to default to false).

@loks0n loks0n requested review from Meldiron and Copilot October 7, 2025 12:25
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.

  • Add per-worker stop hooks and worker stop event handling.
  • Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
  • Standardize test container volumes and enable pcntl in images.

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/Queue/servers/Workerman/worker.php Adds workerStop hook usage for test worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/worker.php Adds workerStop hook usage for cluster worker lifecycle.
tests/Queue/servers/Swoole/worker.php Adds workerStop hook usage for Swoole worker lifecycle.
tests/Queue/servers/AMQP/worker.php Adds workerStop hook usage for AMQP worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/Swoole/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/AMQP/Dockerfile Enables pcntl for proper signal handling in tests.
src/Queue/Server.php Adds workerStop Hook API and integrates stop handling into worker lifecycle.
src/Queue/Broker/AMQP.php Attempts to stop consumption before closing channel/connection.
src/Queue/Adapter/Swoole.php Adds signal handlers and proper EVENT_* usage; introduces main-process stop path.
docker-compose.yml Standardizes container code paths and volumes for tests.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

/**
* Hook that is called when worker stops
*/
protected Hook $workerStopHook;
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typed property $workerStopHook is declared as non-nullable Hook but is accessed with a null check before it is guaranteed to be initialized, which will trigger 'Typed property ... must not be accessed before initialization'. Make it nullable and initialize to null: protected ?Hook $workerStopHook = null;. The current usage (e.g., line 298) will then work as intended.

Suggested change
protected Hook $workerStopHook;
protected ?Hook $workerStopHook = null;

Copilot uses AI. Check for mistakes.


public function close(): void
{
$this->channel?->stopConsume();
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AMQPChannel in php-amqplib does not provide stopConsume(); this will result in a fatal error. To stop consumers, cancel each active consumer tag via basic_cancel (e.g., iterating $this->channel->callbacks) before closing the channel/connection, or close the channel directly if you don't track consumer tags.

Suggested change
$this->channel?->stopConsume();

Copilot uses AI. Check for mistakes.

Comment on lines +54 to 61
if ($this->onStop) {
call_user_func($this->onStop);
}

Console::info("[Swoole] Shutting down process pool...");
$this->pool->shutdown();
Console::success("[Swoole] Process pool stopped.");
return $this;
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call_user_func($this->onStop) is invoked with no arguments, but the callback you store in workerStop expects a $workerId. This will throw an ArgumentCountError. Either remove this invocation (rely on EVENT_WORKER_STOP to invoke the per-worker callback) or store a zero-argument wrapper for main-process shutdown.

Copilot uses AI. Check for mistakes.

Comment on lines +342 to +345
public function workerStop(): Hook
{
try {
$this->adapter->workerStop(function (string $workerId) use ($callback) {
Console::success("[Worker] Worker {$workerId} is ready!");
if (!is_null($callback)) {
call_user_func($callback);
}
});
} catch (Throwable $error) {
self::setResource('error', fn () => $error);
foreach ($this->errorHooks as $hook) {
call_user_func_array($hook->getAction(), $this->getArguments($hook));
}
}
$hook = new Hook();
$hook->groups(['*']);
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workerStop() changed from accepting an optional callable and returning self to returning Hook, which is a breaking change. To ease migration, consider accepting an optional callable and applying it to the returned Hook: public function workerStop(?callable $callback = null): Hook { $hook = new Hook(); ... if ($callback) { $hook->action($callback); } return $hook; }

Copilot uses AI. Check for mistakes.

Comment on lines +31 to +32
if (extension_loaded('pcntl')) {
pcntl_signal(SIGTERM, function () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Swoole specifics, Process::signal etc, pcntl doesn't play nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants