-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add workerStop handler, graceful exits #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds worker-stop lifecycle hooks and signal handling across adapters and servers, replaces a manual consume loop with event-driven consumption in Server, calls AMQP channel stopConsume() on close, standardizes test Docker mounts, and installs PCNTL in test images. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant Adapter
participant Consumer
participant Broker
Client->>Server: start()
Server->>Adapter: register workerStart(handler)
Server->>Adapter: register workerStop(() => Consumer.close())
Note over Server,Consumer: Event-driven consumption (consume with callbacks)
Adapter->>Consumer: consume(onMessage)
Consumer->>Broker: fetch message
Broker-->>Consumer: message
Consumer->>Server: onMessage(message)
rect #dff0d8
Server->>Server: initHooks -> execute job action -> success log
end
rect #f8d7da
alt on error
Server->>Server: errorHooks -> log error
end
end
Server->>Server: finally: record processDuration
Note over Adapter: Worker stop flow
Adapter-->>Server: invoke workerStop callback
Server->>Consumer: close()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/Queue/Adapter/Swoole.php (1)
50-58
: Same fixes for workerStop handlerAlign types and PHPMD naming.
- $this->onStop = $callback; - $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { + $this->onStop = $callback; + $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) { call_user_func($callback, $workerId); });src/Queue/Server.php (1)
54-55
: Avoid uninitialized typed property access for workerStartHookHook may not be set before start(); accessing a non-initialized typed property can fatal. Make it nullable and lazily init in getter.
- protected Hook $workerStartHook; + protected ?Hook $workerStartHook = null;- if (!is_null($this->workerStartHook)) { + if ($this->workerStartHook) { call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); }- public function getWorkerStart(): Hook - { - return $this->workerStartHook; - } + public function getWorkerStart(): Hook + { + if (!$this->workerStartHook) { + $this->workerStartHook = new Hook(); + $this->workerStartHook->groups(['*']); + } + return $this->workerStartHook; + }Also applies to: 216-219, 316-319
🧹 Nitpick comments (2)
src/Queue/Broker/AMQP.php (1)
132-136
: Graceful shutdown: null out channel and guard for lib differencesCalling stopConsume() before closing is good. Two tweaks:
- Set $this->channel to null after closing to avoid reusing a dead channel.
- If older php-amqplib versions are in play, consider guarding stopConsume().
public function close(): void { - $this->channel?->stopConsume(); - $this->channel?->getConnection()?->close(); + try { + // php-amqplib >= 3.x + $this->channel?->stopConsume(); + } catch (\Throwable) { + // ignore – not all versions expose stopConsume() + } + $this->channel?->getConnection()?->close(); + $this->channel = null; }src/Queue/Adapter/Swoole.php (1)
14-16
: Make onStop nullable-typedDoc says callable but it’s null until set. Use a proper nullable type.
-/** @var callable */ -private $onStop; +private ?callable $onStop = null;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
src/Queue/Adapter/Swoole.php
(4 hunks)src/Queue/Broker/AMQP.php
(1 hunks)src/Queue/Server.php
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (3)
src/Queue/Server.php (2)
shutdown
(168-174)workerStart
(304-310)src/Queue/Adapter.php (1)
workerStart
(35-35)src/Queue/Adapter/Workerman.php (1)
workerStart
(34-41)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
consume
(75-130)close
(132-136)src/Queue/Consumer.php (2)
consume
(17-22)close
(27-27)src/Queue/Broker/Redis.php (2)
consume
(19-90)close
(92-95)src/Queue/Message.php (4)
Message
(5-81)getPid
(52-55)getTimestamp
(62-65)getPayload
(67-70)src/Queue/Job.php (2)
getHook
(36-39)hook
(24-29)src/Queue/Adapter/Swoole.php (1)
workerStop
(50-58)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
43-43: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
53-53: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
🔇 Additional comments (2)
src/Queue/Server.php (2)
220-253
: LGTM: cleaner consume flow with metricsSwitching to the adapter’s consume loop and recording wait/process durations in seconds is solid. Returning the job action result to drive ack semantics is correct.
288-289
: LGTM: worker-stop cleanupRegistering workerStop to close the consumer matches the AMQP stopConsume() addition and ensures graceful shutdown.
public function stop(): self | ||
{ | ||
if ($this->onStop) { | ||
call_user_func($this->onStop); | ||
} | ||
$this->pool->shutdown(); | ||
return $this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t invoke onStop() from stop(); it can break callback signatures and double-fire
stop() runs in the master process without a workerId; the registered callback expects one (as per workerStop). Let EVENT_WORKER_STOP handle it per worker.
public function stop(): self
{
- if ($this->onStop) {
- call_user_func($this->onStop);
- }
$this->pool->shutdown();
return $this;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public function stop(): self | |
{ | |
if ($this->onStop) { | |
call_user_func($this->onStop); | |
} | |
$this->pool->shutdown(); | |
return $this; | |
} | |
public function stop(): self | |
{ | |
$this->pool->shutdown(); | |
return $this; | |
} |
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 32 to 39, remove the direct
invocation of $this->onStop() from stop() because stop() runs in the master
process without a workerId and calling the callback here can break expected
callback signatures and cause it to fire twice; instead only perform pool
shutdown and return $this, leaving invocation of the onStop callback to the
per-worker EVENT_WORKER_STOP handler so it receives the correct workerId and is
not double-fired.
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { | ||
call_user_func($callback, $workerId); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix workerId type and silence PHPMD unused $pool
Swoole passes int $workerId. Also rename $pool to $_pool.
- $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
+ $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
🧰 Tools
🪛 PHPMD (2.15.0)
43-43: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 43 to 45, update the Pool::on
callback signature to accept an int $workerId and rename the unused $pool
parameter to $_pool to silence PHPMD; specifically change the anonymous function
parameters from (Pool $pool, string $workerId) to (Pool $_pool, int $workerId)
and keep the call_user_func($callback, $workerId) body unchanged.
foreach ($this->initHooks as $hook) { // Group init hooks | ||
if (in_array($group, $hook->getGroups())) { | ||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||
\call_user_func_array($hook->getAction(), $arguments); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Honor Job::getHook() for group hooks as well
Global hooks are gated by getHook(), but group hooks always run. Gate both for consistency.
- foreach ($this->job->getGroups() as $group) {
- foreach ($this->initHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+ foreach ($this->job->getGroups() as $group) {
+ foreach ($this->initHooks as $hook) { // Group init hooks
if (in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments($hook, $message->getPayload());
\call_user_func_array($hook->getAction(), $arguments);
}
- }
- }
+ }
+ }
+ }
- foreach ($this->job->getGroups() as $group) {
- foreach ($this->shutdownHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+ foreach ($this->job->getGroups() as $group) {
+ foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
if (in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments($hook, $message->getPayload());
\call_user_func_array($hook->getAction(), $arguments);
}
- }
- }
+ }
+ }
+ }
Also applies to: 265-272
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 241-247, group init hooks are invoked
unconditionally when the group matches; modify the condition to also respect the
hook selector by only running the hook if its getHook() is null (applies to all)
or matches the current job/message hook (e.g. $message->getHook()), i.e. add a
check like ($hook->getHook() === null || $hook->getHook() ===
$message->getHook()) combined with the existing group check; apply the same
change to the corresponding group hook block at lines 265-272.
a8a21e0
to
f51092a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/Queue/Server.php (1)
100-106
: Undefined index risk in getResource()Reading self::$resourcesCallbacks[$name]['reset'] before verifying the key exists can emit notices/fatals. Check existence first.
- if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { - if (!\array_key_exists($name, self::$resourcesCallbacks)) { - throw new Exception("Failed to find resource: $name"); - } + $cb = self::$resourcesCallbacks[$name] ?? null; + if (!\array_key_exists($name, $this->resources) || $fresh || (($cb['reset'] ?? false) === true)) { + if ($cb === null) { + throw new Exception("Failed to find resource: $name"); + } - $this->resources[$name] = \call_user_func_array( - self::$resourcesCallbacks[$name]['callback'], - $this->getResources(self::$resourcesCallbacks[$name]['injections']) - ); + $this->resources[$name] = \call_user_func_array( + $cb['callback'], + $this->getResources($cb['injections']) + );
♻️ Duplicate comments (4)
src/Queue/Adapter/Swoole.php (2)
52-55
: Don’t invoke workerStop callback from stop(); let EVENT_WORKER_STOP handle itCalling $this->onStop() here fires the hook in the master process without a workerId and risks double-firing. Remove it and rely on the registered EVENT_WORKER_STOP handler.
Apply this diff:
public function stop(): self { - if ($this->onStop) { - call_user_func($this->onStop); - } - Console::info("[Swoole] Shutting down process pool..."); $this->pool->shutdown(); Console::success("[Swoole] Process pool stopped."); return $this; }
64-79
: Fix workerId type, silence PHPMD, and address PSR-12 spacing in closuresSwoole passes int $workerId. Also rename the unused $pool param to $_pool. PSR-12 requires a space after function. This also resolves the linter failure.
- $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) { // Register signal handlers in each worker process for graceful shutdown if (extension_loaded('pcntl')) { - pcntl_signal(SIGTERM, function() use ($workerId) { + pcntl_signal(SIGTERM, function () use ($workerId) { Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer..."); $this->consumer->close(); }); - pcntl_signal(SIGINT, function() use ($workerId) { + pcntl_signal(SIGINT, function () use ($workerId) { Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer..."); $this->consumer->close(); }); pcntl_async_signals(true); }src/Queue/Server.php (2)
236-244
: Honor Job::getHook() for group init hooks as wellGroup init hooks run unconditionally; align with global behavior by gating on getHook().
- } - - foreach ($this->job->getGroups() as $group) { + } + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { foreach ($this->initHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } } - } + } + }Also applies to: 245-252
270-277
: Honor Job::getHook() for group shutdown hooks as wellSame gating needed for shutdown group hooks.
- foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } - } - } + } + } + }
🧹 Nitpick comments (10)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)
5-5
: LGTM — pcntl install addedThis enables signal handling needed by workerStop. Consider squashing RUNs to reduce layers, but fine for tests.
tests/Queue/servers/Swoole/Dockerfile (1)
5-5
: LGTM — pcntl install addedMatches the new signal handling in the adapter. Optional: combine with apk add in a single RUN to shrink layers.
src/Queue/Adapter/Swoole.php (2)
15-17
: Type the onStop propertyMinor: make the property nullable-typed for clarity and static analysis.
- /** @var callable */ - private $onStop; + private ?callable $onStop = null;
30-44
: Optional: Prefer Swoole signal APIs for tighter integrationUsing pcntl_* works (with async signals), but Swoole\Process::signal integrates with Swoole’s event loop and avoids edge cases under coroutines. Consider switching in a follow-up.
docker-compose.yml (1)
6-8
: Volume path refactor looks consistent; consider setting working_dirMounts now target /usr/src/code across services and commands use absolute script paths—good. Optionally set working_dir: /usr/src/code on services running php commands to simplify relative includes and developer ergonomics.
Example:
swoole: container_name: swoole build: ./tests/Queue/servers/Swoole/. + working_dir: /usr/src/code command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
Also applies to: 20-22, 31-33, 43-45, 55-57
tests/Queue/servers/Workerman/worker.php (1)
32-36
: Include workerId in stop logs for observabilityInject and print the worker ID to aid debugging and correlate stop events.
$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); + ->workerStop() + ->inject('workerId') + ->action(function (string $workerId) { + echo "Worker {$workerId} Stopped" . PHP_EOL; + });tests/Queue/servers/Swoole/worker.php (1)
33-37
: Include workerId in stop logs for observabilitySame improvement as other servers: surface the worker ID.
$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); + ->workerStop() + ->inject('workerId') + ->action(function (string $workerId) { + echo "Worker {$workerId} Stopped" . PHP_EOL; + });tests/Queue/servers/SwooleRedisCluster/worker.php (1)
33-37
: Include workerId in stop logs for observabilityMirror the same enhancement here.
$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); + ->workerStop() + ->inject('workerId') + ->action(function (string $workerId) { + echo "Worker {$workerId} Stopped" . PHP_EOL; + });tests/Queue/servers/AMQP/worker.php (1)
32-36
: Include workerId in stop logs for observabilityConsistent with other servers.
$server - ->workerStop() - ->action(function () { - echo "Worker Stopped" . PHP_EOL; - }); + ->workerStop() + ->inject('workerId') + ->action(function (string $workerId) { + echo "Worker {$workerId} Stopped" . PHP_EOL; + });src/Queue/Server.php (1)
271-271
: Comment typo: “Group init hooks” → “Group shutdown hooks”Fix misleading comment to match the block.
- foreach ($this->shutdownHooks as $hook) { // Group init hooks + foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
docker-compose.yml
(5 hunks)src/Queue/Adapter/Swoole.php
(3 hunks)src/Queue/Broker/AMQP.php
(1 hunks)src/Queue/Server.php
(8 hunks)tests/Queue/servers/AMQP/Dockerfile
(1 hunks)tests/Queue/servers/AMQP/worker.php
(1 hunks)tests/Queue/servers/Swoole/Dockerfile
(1 hunks)tests/Queue/servers/Swoole/worker.php
(1 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile
(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php
(1 hunks)tests/Queue/servers/Workerman/worker.php
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/Queue/Broker/AMQP.php
🧰 Additional context used
🧬 Code graph analysis (6)
tests/Queue/servers/Swoole/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
workerStop
(86-94)src/Queue/Server.php (1)
workerStop
(342-348)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
tests/Queue/servers/Workerman/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
workerStop
(86-94)src/Queue/Server.php (1)
workerStop
(342-348)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
tests/Queue/servers/SwooleRedisCluster/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
workerStop
(86-94)src/Queue/Server.php (1)
workerStop
(342-348)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Adapter/Swoole.php (1)
workerStop
(86-94)src/Queue/Server.php (1)
workerStop
(342-348)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
src/Queue/Adapter/Swoole.php (4)
src/Queue/Broker/Pool.php (1)
Pool
(10-56)src/Queue/Adapter.php (4)
Adapter
(5-49)stop
(28-28)start
(22-22)workerStart
(35-35)src/Queue/Server.php (4)
stop
(185-196)start
(215-315)shutdown
(173-179)workerStart
(321-327)src/Queue/Consumer.php (1)
close
(27-27)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
consume
(75-130)close
(132-136)src/Queue/Broker/Redis.php (2)
consume
(19-90)close
(92-95)src/Queue/Consumer.php (2)
consume
(17-22)close
(27-27)src/Queue/Message.php (4)
Message
(5-81)getPid
(52-55)getTimestamp
(62-65)getPayload
(67-70)src/Queue/Job.php (2)
getHook
(36-39)hook
(24-29)src/Queue/Adapter/Swoole.php (1)
workerStop
(86-94)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
64-64: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
89-89: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
🪛 GitHub Actions: Linter
src/Queue/Adapter/Swoole.php
[error] 1-1: PSR 12: function_declaration violation detected by Pint. 29 files, 1 style issue. Failing file: src/Queue/Adapter/Swoole.php. Run 'vendor/bin/pint --test' to reproduce.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Adapter Tests (Workerman)
- GitHub Check: Adapter Tests (SwooleRedisCluster)
- GitHub Check: Adapter Tests (Pool)
🔇 Additional comments (2)
tests/Queue/servers/AMQP/Dockerfile (1)
3-5
: LGTM — pcntl install for AMQP test imageConsistent with other servers; enables graceful shutdown paths.
src/Queue/Server.php (1)
231-233
: Timestamp units are consistent:enqueue()
usestime()
(seconds) andgetTimestamp()
returns that value, somicrotime(true) – $message->getTimestamp()
yields the correct wait time in seconds.
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { | ||
call_user_func($callback, $workerId); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Mirror the same workerId typing and PHPMD fix in workerStop
Align with Swoole’s signature and silence PHPMD. Also adheres to PSR-12 by spacing after function.
- $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
+ $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
call_user_func($callback, $workerId);
});
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { | |
call_user_func($callback, $workerId); | |
}); | |
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) { | |
call_user_func($callback, $workerId); | |
}); |
🧰 Tools
🪛 PHPMD (2.15.0)
89-89: Avoid unused parameters such as '$pool'. (Unused Code Rules)
(UnusedFormalParameter)
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 89 to 91, update the anonymous
handler to mirror the same workerId typing and PHPMD fix used elsewhere: change
the second parameter type to int (e.g. Pool $_pool, int $workerId), rename the
unused Pool parameter to-prefixed $_pool to silence PHPMD about an unused
variable, and ensure there is a single space after the function keyword to
satisfy PSR-12; then call the callback with the typed $workerId as before.
if ($this->workerStartHook !== null) { | ||
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard uninitialized typed property access (workerStartHook)
Accessing a not-yet-initialized typed property throws. Use isset() and ensure an action exists.
- if ($this->workerStartHook !== null) {
- call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
- }
+ if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) {
+ call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if ($this->workerStartHook !== null) { | |
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); | |
} | |
if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) { | |
call_user_func_array( | |
$this->workerStartHook->getAction(), | |
$this->getArguments($this->workerStartHook) | |
); | |
} |
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 221 to 223, the typed property
$this->workerStartHook may be uninitialized and accessing it directly will
throw; guard the access with an isset() (or property_exists) check and also
verify that getAction() returns a callable before calling call_user_func_array.
Replace the direct access with a conditional that ensures $this->workerStartHook
is set and that $this->workerStartHook->getAction() is a callable, then call
call_user_func_array only when both checks pass (use $this->getArguments(...) as
before).
$this->adapter->workerStop(function ($workerId) { | ||
Console::info("[Worker] Worker {$workerId} stopping..."); | ||
self::setResource('workerId', fn () => $workerId); | ||
|
||
// Call user-defined workerStop hook if set | ||
if ($this->workerStopHook !== null) { | ||
call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); | ||
} | ||
|
||
// Close consumer connection | ||
$this->adapter->consumer->close(); | ||
Console::success("[Worker] Worker {$workerId} stopped gracefully."); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard uninitialized typed property access (workerStopHook)
Same issue as workerStartHook; also verify action exists before calling.
- $this->adapter->workerStop(function ($workerId) {
+ $this->adapter->workerStop(function ($workerId) {
Console::info("[Worker] Worker {$workerId} stopping...");
self::setResource('workerId', fn () => $workerId);
// Call user-defined workerStop hook if set
- if ($this->workerStopHook !== null) {
- call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
- }
+ if (isset($this->workerStopHook) && $this->workerStopHook->getAction()) {
+ call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
+ }
// Close consumer connection
$this->adapter->consumer->close();
Console::success("[Worker] Worker {$workerId} stopped gracefully.");
});
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 293 to 305, avoid accessing the possibly
uninitialized typed property workerStopHook and ensure the hook action is
callable before invoking it: first check that the property is set/non-null (e.g.
isset($this->workerStopHook) or $this->workerStopHook !== null) to prevent
uninitialized property access, then retrieve the action and verify
is_callable(...) (or use callable checks) before calling call_user_func_array;
only call the consumer-close and success log after safely handling the optional
hook.
public function getWorkerStop(): ?Hook | ||
{ | ||
return $this->workerStopHook; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make getWorkerStop() safe with uninitialized property
Return null when unset to match the declared return type.
- public function getWorkerStop(): ?Hook
- {
- return $this->workerStopHook;
- }
+ public function getWorkerStop(): ?Hook
+ {
+ return isset($this->workerStopHook) ? $this->workerStopHook : null;
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public function getWorkerStop(): ?Hook | |
{ | |
return $this->workerStopHook; | |
} | |
public function getWorkerStop(): ?Hook | |
{ | |
return isset($this->workerStopHook) | |
? $this->workerStopHook | |
: null; | |
} |
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 354-357, the getWorkerStop() method may
access an uninitialized $workerStopHook property; update the method to return
null when the property is unset (e.g. check isset($this->workerStopHook) and
return it only if set, otherwise return null) so the returned value always
matches the declared ?Hook type without triggering uninitialized property
notices.
f51092a
to
6a7d673
Compare
6a7d673
to
e078822
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/Queue/Adapter/Swoole.php (1)
90-95
: Mirror the same signature fix for EVENT_WORKER_STOPAlign types and PHPMD suppression.
- $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
♻️ Duplicate comments (6)
src/Queue/Adapter/Swoole.php (2)
54-57
: Don’t invoke onStop() from stop(); let EVENT_WORKER_STOP fire it per workerCalling it here runs in the master process, breaks expected signatures, and risks double-firing.
- if ($this->onStop) { - call_user_func($this->onStop); - }
66-83
: Fix Swoole callback signature: workerId is int; silence PHPMD unused $poolSwoole passes (Pool, int). Current string type can raise a TypeError; also rename the unused param.
- $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $_pool, int $workerId) use ($callback) {src/Queue/Server.php (4)
245-252
: Honor Job::getHook() for group hooks as wellGroup hooks should be gated the same as global hooks for consistency.
- foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } - } - } + } + } + }- foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks + if ($this->job->getHook()) { + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } - } - } + } + } + }Also applies to: 270-277
221-223
: Guard access to uninitialized typed property: workerStartHookAccessing an uninitialized typed property throws. Check isset() and action before calling.
- if ($this->workerStartHook !== null) { - call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); - } + if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) { + call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); + }
293-305
: Guard access to uninitialized typed property: workerStopHookSame issue as workerStartHook; also verify an action exists.
- if ($this->workerStopHook !== null) { - call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); - } + if (isset($this->workerStopHook) && $this->workerStopHook->getAction()) { + call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); + }
350-357
: Make getWorkerStop() safe when unsetReturn null if the hook wasn’t configured to match the ?Hook signature.
- public function getWorkerStop(): ?Hook - { - return $this->workerStopHook; - } + public function getWorkerStop(): ?Hook + { + return isset($this->workerStopHook) ? $this->workerStopHook : null; + }
🧹 Nitpick comments (2)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)
5-5
: Slim the image and avoid cache while installing pcntlUse --no-cache, parallel build, and remove build deps in the same layer. Keeps images smaller and reproducible.
Apply:
-FROM phpswoole/swoole:php8.3-alpine - -RUN apk add autoconf build-base - -RUN docker-php-ext-install pcntl -RUN docker-php-ext-enable redis +FROM phpswoole/swoole:php8.3-alpine + +RUN apk add --no-cache --virtual .build-deps autoconf build-base \ + && docker-php-ext-install -j"$(nproc)" pcntl \ + && docker-php-ext-enable redis \ + && apk del .build-depssrc/Queue/Server.php (1)
302-304
: Avoid double-closing the consumerSignal handlers in workers already close the consumer; workerStop also closes it. It’s mostly harmless but noisy. Consider making close() idempotent (no-op if already closed), or dropping one call.
Also applies to: 69-77
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
docker-compose.yml
(5 hunks)src/Queue/Adapter/Swoole.php
(3 hunks)src/Queue/Broker/AMQP.php
(1 hunks)src/Queue/Server.php
(8 hunks)tests/Queue/servers/AMQP/Dockerfile
(1 hunks)tests/Queue/servers/AMQP/worker.php
(1 hunks)tests/Queue/servers/Swoole/Dockerfile
(1 hunks)tests/Queue/servers/Swoole/worker.php
(1 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile
(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php
(1 hunks)tests/Queue/servers/Workerman/worker.php
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
- tests/Queue/servers/AMQP/Dockerfile
- src/Queue/Broker/AMQP.php
- tests/Queue/servers/Swoole/Dockerfile
- tests/Queue/servers/Workerman/worker.php
- tests/Queue/servers/AMQP/worker.php
- docker-compose.yml
- tests/Queue/servers/Swoole/worker.php
- tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Server.php (8)
src/Queue/Broker/AMQP.php (2)
consume
(75-130)close
(132-136)src/Queue/Consumer.php (2)
consume
(17-22)close
(27-27)src/Queue/Broker/Redis.php (2)
consume
(19-90)close
(92-95)src/Queue/Message.php (4)
Message
(5-81)getPid
(52-55)getTimestamp
(62-65)getPayload
(67-70)src/Queue/Job.php (2)
getHook
(36-39)hook
(24-29)src/Queue/Adapter/Swoole.php (1)
workerStop
(88-96)src/Queue/Adapter.php (1)
workerStop
(42-42)src/Queue/Adapter/Workerman.php (1)
workerStop
(42-49)
src/Queue/Adapter/Swoole.php (6)
src/Queue/Adapter.php (4)
Adapter
(5-49)stop
(28-28)start
(22-22)workerStart
(35-35)src/Queue/Server.php (3)
stop
(185-196)start
(215-315)workerStart
(321-327)src/Queue/Adapter/Workerman.php (3)
stop
(28-32)start
(22-26)workerStart
(34-41)src/Queue/Broker/AMQP.php (1)
close
(132-136)src/Queue/Consumer.php (1)
close
(27-27)src/Queue/Broker/Redis.php (1)
close
(92-95)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
66-66: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
91-91: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (3)
tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)
5-5
: Verify ifpcntl
is pre-enabled in the base image
Manually run:docker run --rm phpswoole/swoole:php8.3-alpine php -m | grep -i pcntl
If
pcntl
appears, remove theRUN docker-php-ext-install pcntl
line.src/Queue/Adapter/Swoole.php (1)
30-46
: Good: main-process signal handlers for graceful shutdownPSR-12 spacing is correct; async signals enabled; warning path covered when pcntl missing.
src/Queue/Server.php (1)
254-258
: Good: always record process duration in finallyEnsures telemetry completeness even on failures.
/** @var callable */ | ||
private $onStop; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Type the callback property and default it to null
Prevents accidental non-callable values and clarifies intent.
- /** @var callable */
- private $onStop;
+ private ?callable $onStop = null;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
/** @var callable */ | |
private $onStop; | |
// Replace the untyped property and docblock with a nullable typed property | |
private ?callable $onStop = null; |
🤖 Prompt for AI Agents
In src/Queue/Adapter/Swoole.php around lines 15 to 17, the private $onStop
callback is currently untyped and unset; change it to a nullable callable typed
property and default it to null (e.g., private ?callable $onStop = null) so that
non-callable assignments are prevented and the intent is explicit; update or
remove the redundant docblock accordingly and ensure any code that sets or
invokes $onStop accounts for a null value.
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { | ||
if (!\array_key_exists($name, self::$resourcesCallbacks)) { | ||
throw new Exception('Failed to find resource: "' . $name . '"'); | ||
throw new Exception("Failed to find resource: $name"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid undefined index on ‘reset’ for unset resources
If a resource exists in $this->resources but has no callback (e.g., 'error' before setResource), accessing ['reset'] throws a notice. Guard the access.
- if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
+ if (
+ !\array_key_exists($name, $this->resources)
+ || $fresh
+ || (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
+ ) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception("Failed to find resource: $name");
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { | |
if (!\array_key_exists($name, self::$resourcesCallbacks)) { | |
throw new Exception('Failed to find resource: "' . $name . '"'); | |
throw new Exception("Failed to find resource: $name"); | |
} | |
if ( | |
!\array_key_exists($name, $this->resources) | |
|| $fresh | |
|| (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset']) | |
) { | |
if (!\array_key_exists($name, self::$resourcesCallbacks)) { | |
throw new Exception("Failed to find resource: $name"); | |
} |
🤖 Prompt for AI Agents
In src/Queue/Server.php around lines 102 to 106, the condition accesses
self::$resourcesCallbacks[$name]['reset'] without ensuring
self::$resourcesCallbacks[$name] exists, which can trigger an undefined index
notice; update the condition to first verify the callback entry exists (using
isset() or array_key_exists on self::$resourcesCallbacks[$name]) before reading
the ['reset'] value—e.g., reorder or combine checks so the ['reset'] access is
guarded (e.g., check callback existence and then check its 'reset' flag or use
null-coalescing/isset to default to false).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.
- Add per-worker stop hooks and worker stop event handling.
- Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
- Standardize test container volumes and enable pcntl in images.
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
tests/Queue/servers/Workerman/worker.php | Adds workerStop hook usage for test worker lifecycle. |
tests/Queue/servers/SwooleRedisCluster/worker.php | Adds workerStop hook usage for cluster worker lifecycle. |
tests/Queue/servers/Swoole/worker.php | Adds workerStop hook usage for Swoole worker lifecycle. |
tests/Queue/servers/AMQP/worker.php | Adds workerStop hook usage for AMQP worker lifecycle. |
tests/Queue/servers/SwooleRedisCluster/Dockerfile | Enables pcntl for proper signal handling in tests. |
tests/Queue/servers/Swoole/Dockerfile | Enables pcntl for proper signal handling in tests. |
tests/Queue/servers/AMQP/Dockerfile | Enables pcntl for proper signal handling in tests. |
src/Queue/Server.php | Adds workerStop Hook API and integrates stop handling into worker lifecycle. |
src/Queue/Broker/AMQP.php | Attempts to stop consumption before closing channel/connection. |
src/Queue/Adapter/Swoole.php | Adds signal handlers and proper EVENT_* usage; introduces main-process stop path. |
docker-compose.yml | Standardizes container code paths and volumes for tests. |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
/** | ||
* Hook that is called when worker stops | ||
*/ | ||
protected Hook $workerStopHook; |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typed property $workerStopHook is declared as non-nullable Hook but is accessed with a null check before it is guaranteed to be initialized, which will trigger 'Typed property ... must not be accessed before initialization'. Make it nullable and initialize to null: protected ?Hook $workerStopHook = null;. The current usage (e.g., line 298) will then work as intended.
protected Hook $workerStopHook; | |
protected ?Hook $workerStopHook = null; |
Copilot uses AI. Check for mistakes.
|
||
public function close(): void | ||
{ | ||
$this->channel?->stopConsume(); |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AMQPChannel in php-amqplib does not provide stopConsume(); this will result in a fatal error. To stop consumers, cancel each active consumer tag via basic_cancel (e.g., iterating $this->channel->callbacks) before closing the channel/connection, or close the channel directly if you don't track consumer tags.
$this->channel?->stopConsume(); |
Copilot uses AI. Check for mistakes.
if ($this->onStop) { | ||
call_user_func($this->onStop); | ||
} | ||
|
||
Console::info("[Swoole] Shutting down process pool..."); | ||
$this->pool->shutdown(); | ||
Console::success("[Swoole] Process pool stopped."); | ||
return $this; |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call_user_func($this->onStop) is invoked with no arguments, but the callback you store in workerStop expects a $workerId. This will throw an ArgumentCountError. Either remove this invocation (rely on EVENT_WORKER_STOP to invoke the per-worker callback) or store a zero-argument wrapper for main-process shutdown.
Copilot uses AI. Check for mistakes.
public function workerStop(): Hook | ||
{ | ||
try { | ||
$this->adapter->workerStop(function (string $workerId) use ($callback) { | ||
Console::success("[Worker] Worker {$workerId} is ready!"); | ||
if (!is_null($callback)) { | ||
call_user_func($callback); | ||
} | ||
}); | ||
} catch (Throwable $error) { | ||
self::setResource('error', fn () => $error); | ||
foreach ($this->errorHooks as $hook) { | ||
call_user_func_array($hook->getAction(), $this->getArguments($hook)); | ||
} | ||
} | ||
$hook = new Hook(); | ||
$hook->groups(['*']); |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workerStop() changed from accepting an optional callable and returning self to returning Hook, which is a breaking change. To ease migration, consider accepting an optional callable and applying it to the returned Hook: public function workerStop(?callable $callback = null): Hook { $hook = new Hook(); ... if ($callback) { $hook->action($callback); } return $hook; }
Copilot uses AI. Check for mistakes.
if (extension_loaded('pcntl')) { | ||
pcntl_signal(SIGTERM, function () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use Swoole specifics, Process::signal etc, pcntl doesn't play nice
Summary by CodeRabbit
New Features
Bug Fixes
Chores