-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add workerStop handler, graceful exits #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,14 +2,19 @@ | |||||||||||||
|
||||||||||||||
namespace Utopia\Queue\Adapter; | ||||||||||||||
|
||||||||||||||
use Swoole\Constant; | ||||||||||||||
use Swoole\Process\Pool; | ||||||||||||||
use Utopia\CLI\Console; | ||||||||||||||
use Utopia\Queue\Adapter; | ||||||||||||||
use Utopia\Queue\Consumer; | ||||||||||||||
|
||||||||||||||
class Swoole extends Adapter | ||||||||||||||
{ | ||||||||||||||
protected Pool $pool; | ||||||||||||||
|
||||||||||||||
/** @var callable */ | ||||||||||||||
private $onStop; | ||||||||||||||
|
||||||||||||||
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') | ||||||||||||||
{ | ||||||||||||||
parent::__construct($workerNum, $queue, $namespace); | ||||||||||||||
|
@@ -21,19 +26,59 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s | |||||||||||||
public function start(): self | ||||||||||||||
{ | ||||||||||||||
$this->pool->set(['enable_coroutine' => true]); | ||||||||||||||
|
||||||||||||||
// Register signal handlers in the main process before starting pool | ||||||||||||||
if (extension_loaded('pcntl')) { | ||||||||||||||
pcntl_signal(SIGTERM, function () { | ||||||||||||||
Comment on lines
+31
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use Swoole specifics, Process::signal etc, pcntl doesn't play nice |
||||||||||||||
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown..."); | ||||||||||||||
$this->stop(); | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
pcntl_signal(SIGINT, function () { | ||||||||||||||
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown..."); | ||||||||||||||
$this->stop(); | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
// Enable async signals | ||||||||||||||
pcntl_async_signals(true); | ||||||||||||||
} else { | ||||||||||||||
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully."); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
$this->pool->start(); | ||||||||||||||
return $this; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
public function stop(): self | ||||||||||||||
{ | ||||||||||||||
if ($this->onStop) { | ||||||||||||||
call_user_func($this->onStop); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
Console::info("[Swoole] Shutting down process pool..."); | ||||||||||||||
$this->pool->shutdown(); | ||||||||||||||
Console::success("[Swoole] Process pool stopped."); | ||||||||||||||
return $this; | ||||||||||||||
Comment on lines
+54
to
61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call_user_func($this->onStop) is invoked with no arguments, but the callback you store in workerStop expects a $workerId. This will throw an ArgumentCountError. Either remove this invocation (rely on EVENT_WORKER_STOP to invoke the per-worker callback) or store a zero-argument wrapper for main-process shutdown. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
public function workerStart(callable $callback): self | ||||||||||||||
{ | ||||||||||||||
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { | ||||||||||||||
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { | ||||||||||||||
// Register signal handlers in each worker process for graceful shutdown | ||||||||||||||
if (extension_loaded('pcntl')) { | ||||||||||||||
pcntl_signal(SIGTERM, function () use ($workerId) { | ||||||||||||||
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer..."); | ||||||||||||||
$this->consumer->close(); | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
pcntl_signal(SIGINT, function () use ($workerId) { | ||||||||||||||
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer..."); | ||||||||||||||
$this->consumer->close(); | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
pcntl_async_signals(true); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
call_user_func($callback, $workerId); | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
|
@@ -42,7 +87,8 @@ public function workerStart(callable $callback): self | |||||||||||||
|
||||||||||||||
public function workerStop(callable $callback): self | ||||||||||||||
{ | ||||||||||||||
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { | ||||||||||||||
$this->onStop = $callback; | ||||||||||||||
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { | ||||||||||||||
call_user_func($callback, $workerId); | ||||||||||||||
}); | ||||||||||||||
Comment on lines
+91
to
93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Mirror the same workerId typing and PHPMD fix in workerStop Align with Swoole’s signature and silence PHPMD. Also adheres to PSR-12 by spacing after function. - $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
+ $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $_pool, int $workerId) use ($callback) {
call_user_func($callback, $workerId);
}); 📝 Committable suggestion
Suggested change
🧰 Tools🪛 PHPMD (2.15.0)89-89: Avoid unused parameters such as '$pool'. (Unused Code Rules) (UnusedFormalParameter) 🤖 Prompt for AI Agents
|
||||||||||||||
|
||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe | |||
|
||||
public function close(): void | ||||
{ | ||||
$this->channel?->stopConsume(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AMQPChannel in php-amqplib does not provide stopConsume(); this will result in a fatal error. To stop consumers, cancel each active consumer tag via basic_cancel (e.g., iterating $this->channel->callbacks) before closing the channel/connection, or close the channel directly if you don't track consumer tags.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||
$this->channel?->getConnection()?->close(); | ||||
} | ||||
|
||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -53,6 +53,11 @@ class Server | |||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
protected Hook $workerStartHook; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
* Hook that is called when worker stops | ||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
protected Hook $workerStopHook; | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The typed property $workerStopHook is declared as non-nullable Hook but is accessed with a null check before it is guaranteed to be initialized, which will trigger 'Typed property ... must not be accessed before initialization'. Make it nullable and initialize to null: protected ?Hook $workerStopHook = null;. The current usage (e.g., line 298) will then work as intended.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
* @var array | ||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
|
@@ -96,7 +101,7 @@ public function getResource(string $name, bool $fresh = false): mixed | |||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { | ||||||||||||||||||||||||||||
if (!\array_key_exists($name, self::$resourcesCallbacks)) { | ||||||||||||||||||||||||||||
throw new Exception('Failed to find resource: "' . $name . '"'); | ||||||||||||||||||||||||||||
throw new Exception("Failed to find resource: $name"); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Comment on lines
102
to
106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid undefined index on ‘reset’ for unset resources If a resource exists in $this->resources but has no callback (e.g., 'error' before setResource), accessing ['reset'] throws a notice. Guard the access. - if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
+ if (
+ !\array_key_exists($name, $this->resources)
+ || $fresh
+ || (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]['reset'])
+ ) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception("Failed to find resource: $name");
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
$this->resources[$name] = \call_user_func_array( | ||||||||||||||||||||||||||||
|
@@ -213,49 +218,23 @@ public function start(): self | |||||||||||||||||||||||||||
$this->adapter->workerStart(function (string $workerId) { | ||||||||||||||||||||||||||||
Console::success("[Worker] Worker {$workerId} is ready!"); | ||||||||||||||||||||||||||||
self::setResource('workerId', fn () => $workerId); | ||||||||||||||||||||||||||||
if (!is_null($this->workerStartHook)) { | ||||||||||||||||||||||||||||
if ($this->workerStartHook !== null) { | ||||||||||||||||||||||||||||
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Comment on lines
+221
to
223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard uninitialized typed property access (workerStartHook) Accessing a not-yet-initialized typed property throws. Use isset() and ensure an action exists. - if ($this->workerStartHook !== null) {
- call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
- }
+ if (isset($this->workerStartHook) && $this->workerStartHook->getAction()) {
+ call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
+ } 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
while (true) { | ||||||||||||||||||||||||||||
$this->adapter->consumer->consume( | ||||||||||||||||||||||||||||
$this->adapter->queue, | ||||||||||||||||||||||||||||
function (Message $message) { | ||||||||||||||||||||||||||||
$receivedAtTimestamp = microtime(true); | ||||||||||||||||||||||||||||
Console::info("[Job] Received Job ({$message->getPid()})."); | ||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||
$waitDuration = microtime(true) - $message->getTimestamp(); | ||||||||||||||||||||||||||||
$this->jobWaitTime->record($waitDuration); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
$this->resources = []; | ||||||||||||||||||||||||||||
self::setResource('message', fn () => $message); | ||||||||||||||||||||||||||||
if ($this->job->getHook()) { | ||||||||||||||||||||||||||||
foreach ($this->initHooks as $hook) { // Global init hooks | ||||||||||||||||||||||||||||
if (in_array('*', $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
foreach ($this->job->getGroups() as $group) { | ||||||||||||||||||||||||||||
foreach ($this->initHooks as $hook) { // Group init hooks | ||||||||||||||||||||||||||||
if (in_array($group, $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); | ||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||
$processDuration = microtime(true) - $receivedAtTimestamp; | ||||||||||||||||||||||||||||
$this->processDuration->record($processDuration); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
function (Message $message) { | ||||||||||||||||||||||||||||
$this->adapter->consumer->consume( | ||||||||||||||||||||||||||||
$this->adapter->queue, | ||||||||||||||||||||||||||||
function (Message $message) { | ||||||||||||||||||||||||||||
$receivedAtTimestamp = microtime(true); | ||||||||||||||||||||||||||||
Console::info("[Job] Received Job ({$message->getPid()})."); | ||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||
$waitDuration = microtime(true) - $message->getTimestamp(); | ||||||||||||||||||||||||||||
$this->jobWaitTime->record($waitDuration); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
$this->resources = []; | ||||||||||||||||||||||||||||
self::setResource('message', fn () => $message); | ||||||||||||||||||||||||||||
if ($this->job->getHook()) { | ||||||||||||||||||||||||||||
foreach ($this->shutdownHooks as $hook) { // Global init hooks | ||||||||||||||||||||||||||||
foreach ($this->initHooks as $hook) { // Global init hooks | ||||||||||||||||||||||||||||
if (in_array('*', $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
|
@@ -264,27 +243,65 @@ function (Message $message) { | |||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
foreach ($this->job->getGroups() as $group) { | ||||||||||||||||||||||||||||
foreach ($this->shutdownHooks as $hook) { // Group init hooks | ||||||||||||||||||||||||||||
foreach ($this->initHooks as $hook) { // Group init hooks | ||||||||||||||||||||||||||||
if (in_array($group, $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Comment on lines
+246
to
252
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Honor Job::getHook() for group hooks as well Global hooks are gated by getHook(), but group hooks always run. Gate both for consistency. - foreach ($this->job->getGroups() as $group) {
- foreach ($this->initHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+ foreach ($this->job->getGroups() as $group) {
+ foreach ($this->initHooks as $hook) { // Group init hooks
if (in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments($hook, $message->getPayload());
\call_user_func_array($hook->getAction(), $arguments);
}
- }
- }
+ }
+ }
+ } - foreach ($this->job->getGroups() as $group) {
- foreach ($this->shutdownHooks as $hook) { // Group init hooks
+ if ($this->job->getHook()) {
+ foreach ($this->job->getGroups() as $group) {
+ foreach ($this->shutdownHooks as $hook) { // Group shutdown hooks
if (in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments($hook, $message->getPayload());
\call_user_func_array($hook->getAction(), $arguments);
}
- }
- }
+ }
+ }
+ } Also applies to: 265-272 🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
Console::success("[Job] ({$message->getPid()}) successfully run."); | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
function (?Message $message, Throwable $th) { | ||||||||||||||||||||||||||||
Console::error("[Job] ({$message?->getPid()}) failed to run."); | ||||||||||||||||||||||||||||
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
self::setResource('error', fn () => $th); | ||||||||||||||||||||||||||||
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); | ||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||
$processDuration = microtime(true) - $receivedAtTimestamp; | ||||||||||||||||||||||||||||
$this->processDuration->record($processDuration); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
function (Message $message) { | ||||||||||||||||||||||||||||
if ($this->job->getHook()) { | ||||||||||||||||||||||||||||
foreach ($this->shutdownHooks as $hook) { // Global init hooks | ||||||||||||||||||||||||||||
if (in_array('*', $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
foreach ($this->errorHooks as $hook) { | ||||||||||||||||||||||||||||
($hook->getAction())(...$this->getArguments($hook)); | ||||||||||||||||||||||||||||
foreach ($this->job->getGroups() as $group) { | ||||||||||||||||||||||||||||
foreach ($this->shutdownHooks as $hook) { // Group init hooks | ||||||||||||||||||||||||||||
if (in_array($group, $hook->getGroups())) { | ||||||||||||||||||||||||||||
$arguments = $this->getArguments($hook, $message->getPayload()); | ||||||||||||||||||||||||||||
\call_user_func_array($hook->getAction(), $arguments); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Console::success("[Job] ({$message->getPid()}) successfully run."); | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
function (?Message $message, Throwable $th) { | ||||||||||||||||||||||||||||
Console::error("[Job] ({$message?->getPid()}) failed to run."); | ||||||||||||||||||||||||||||
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
self::setResource('error', fn () => $th); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
foreach ($this->errorHooks as $hook) { | ||||||||||||||||||||||||||||
$hook->getAction()(...$this->getArguments($hook)); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
$this->adapter->workerStop(function ($workerId) { | ||||||||||||||||||||||||||||
Console::info("[Worker] Worker {$workerId} stopping..."); | ||||||||||||||||||||||||||||
self::setResource('workerId', fn () => $workerId); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Call user-defined workerStop hook if set | ||||||||||||||||||||||||||||
if ($this->workerStopHook !== null) { | ||||||||||||||||||||||||||||
call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook)); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Close consumer connection | ||||||||||||||||||||||||||||
$this->adapter->consumer->close(); | ||||||||||||||||||||||||||||
Console::success("[Worker] Worker {$workerId} stopped gracefully."); | ||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||
Comment on lines
+293
to
305
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard uninitialized typed property access (workerStopHook) Same issue as workerStartHook; also verify action exists before calling. - $this->adapter->workerStop(function ($workerId) {
+ $this->adapter->workerStop(function ($workerId) {
Console::info("[Worker] Worker {$workerId} stopping...");
self::setResource('workerId', fn () => $workerId);
// Call user-defined workerStop hook if set
- if ($this->workerStopHook !== null) {
- call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
- }
+ if (isset($this->workerStopHook) && $this->workerStopHook->getAction()) {
+ call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
+ }
// Close consumer connection
$this->adapter->consumer->close();
Console::success("[Worker] Worker {$workerId} stopped gracefully.");
}); 🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
$this->adapter->start(); | ||||||||||||||||||||||||||||
|
@@ -320,27 +337,23 @@ public function getWorkerStart(): Hook | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
* Is called when a Worker stops. | ||||||||||||||||||||||||||||
* @param callable|null $callback | ||||||||||||||||||||||||||||
* @return self | ||||||||||||||||||||||||||||
* @throws Exception | ||||||||||||||||||||||||||||
* @return Hook | ||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
public function workerStop(?callable $callback = null): self | ||||||||||||||||||||||||||||
public function workerStop(): Hook | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||
$this->adapter->workerStop(function (string $workerId) use ($callback) { | ||||||||||||||||||||||||||||
Console::success("[Worker] Worker {$workerId} is ready!"); | ||||||||||||||||||||||||||||
if (!is_null($callback)) { | ||||||||||||||||||||||||||||
call_user_func($callback); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||
} catch (Throwable $error) { | ||||||||||||||||||||||||||||
self::setResource('error', fn () => $error); | ||||||||||||||||||||||||||||
foreach ($this->errorHooks as $hook) { | ||||||||||||||||||||||||||||
call_user_func_array($hook->getAction(), $this->getArguments($hook)); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
$hook = new Hook(); | ||||||||||||||||||||||||||||
$hook->groups(['*']); | ||||||||||||||||||||||||||||
Comment on lines
+342
to
+345
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. workerStop() changed from accepting an optional callable and returning self to returning Hook, which is a breaking change. To ease migration, consider accepting an optional callable and applying it to the returned Hook: public function workerStop(?callable $callback = null): Hook { $hook = new Hook(); ... if ($callback) { $hook->action($callback); } return $hook; } Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||||||||||||||
$this->workerStopHook = $hook; | ||||||||||||||||||||||||||||
return $hook; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return $this; | ||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
* Returns Worker stops hook. | ||||||||||||||||||||||||||||
* @return ?Hook | ||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
public function getWorkerStop(): ?Hook | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
return $this->workerStopHook; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Comment on lines
+354
to
357
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make getWorkerStop() safe with uninitialized property Return null when unset to match the declared return type. - public function getWorkerStop(): ?Hook
- {
- return $this->workerStopHook;
- }
+ public function getWorkerStop(): ?Hook
+ {
+ return isset($this->workerStopHook) ? $this->workerStopHook : null;
+ } 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
|
@@ -355,7 +368,7 @@ protected function getArguments(Hook $hook, array $payload = []): array | |||||||||||||||||||||||||||
$arguments = []; | ||||||||||||||||||||||||||||
foreach ($hook->getParams() as $key => $param) { // Get value from route or request object | ||||||||||||||||||||||||||||
$value = $payload[$key] ?? $param['default']; | ||||||||||||||||||||||||||||
$value = ($value === '' || is_null($value)) ? $param['default'] : $value; | ||||||||||||||||||||||||||||
$value = ($value === '' || $value === null) ? $param['default'] : $value; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
$this->validate($key, $param, $value); | ||||||||||||||||||||||||||||
$hook->setParamValue($key, $value); | ||||||||||||||||||||||||||||
|
@@ -384,7 +397,7 @@ protected function getArguments(Hook $hook, array $payload = []): array | |||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
protected function validate(string $key, array $param, mixed $value): void | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
if ('' !== $value && !is_null($value)) { | ||||||||||||||||||||||||||||
if ('' !== $value && $value !== null) { | ||||||||||||||||||||||||||||
$validator = $param['validator']; // checking whether the class exists | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if (\is_callable($validator)) { | ||||||||||||||||||||||||||||
|
@@ -399,7 +412,7 @@ protected function validate(string $key, array $param, mixed $value): void | |||||||||||||||||||||||||||
throw new Exception('Invalid ' .$key . ': ' . $validator->getDescription(), 400); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} elseif (!$param['optional']) { | ||||||||||||||||||||||||||||
throw new Exception('Param "' . $key . '" is not optional.', 400); | ||||||||||||||||||||||||||||
throw new Exception("Param $key is not optional.", 400); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
FROM phpswoole/swoole:php8.3-alpine | ||
|
||
RUN apk add autoconf build-base | ||
RUN apk add autoconf build-base | ||
|
||
RUN docker-php-ext-install pcntl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Type the callback property and default it to null
Prevents accidental non-callable values and clarifies intent.
📝 Committable suggestion
🤖 Prompt for AI Agents