Skip to content

Commit c3ac5df

Browse files
[12.x] Extend SQS FIFO and fair queue support (#57187)
* Add a test to ensure the deduplication ID specified by the job is used. * Make the payload and the queue name available to the deduplicationId() method logic. * Add support for setting a deduplicator callback method to generate the deduplication id. * Forward the mailable message group and deduplicator properties to the queued mailable job. * Forward the notification message group and deduplicator properties to the queued notification job. * Forward the event listener message group and deduplicator properties to the queued listener job. * Forward the anonymous queueable event listener message group and deduplicator properties to the queued listener job. * Ignore PHPStan false positive error. * Wrap deduplicator callbacks with a SerializableClosure to allow serializing the job sent to the queue. * Update the event dispatcher to use the withDeduplicator setter method so the closure is properly wrapped. * formatting --------- Co-authored-by: Taylor Otwell <[email protected]>
1 parent 8ef2934 commit c3ac5df

File tree

13 files changed

+829
-17
lines changed

13 files changed

+829
-17
lines changed

src/Illuminate/Bus/Queueable.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Queue\CallQueuedClosure;
77
use Illuminate\Support\Arr;
88
use Illuminate\Support\Collection;
9+
use Laravel\SerializableClosure\SerializableClosure;
910
use PHPUnit\Framework\Assert as PHPUnit;
1011
use RuntimeException;
1112

@@ -34,6 +35,13 @@ trait Queueable
3435
*/
3536
public $messageGroup;
3637

38+
/**
39+
* The job deduplicator callback the job should use to generate the deduplication ID.
40+
*
41+
* @var \Laravel\SerializableClosure\SerializableClosure|null
42+
*/
43+
public $deduplicator;
44+
3745
/**
3846
* The number of seconds before the job should be made available.
3947
*
@@ -124,6 +132,23 @@ public function onGroup($group)
124132
return $this;
125133
}
126134

135+
/**
136+
* Set the desired job deduplicator callback.
137+
*
138+
* This feature is only supported by some queues, such as Amazon SQS FIFO.
139+
*
140+
* @param callable|null $deduplicator
141+
* @return $this
142+
*/
143+
public function withDeduplicator($deduplicator)
144+
{
145+
$this->deduplicator = $deduplicator instanceof Closure
146+
? new SerializableClosure($deduplicator)
147+
: $deduplicator;
148+
149+
return $this;
150+
}
151+
127152
/**
128153
* Set the desired connection for the chain.
129154
*

src/Illuminate/Events/Dispatcher.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,11 @@ protected function propagateListenerOptions($listener, $job)
706706
$job->timeout = $listener->timeout ?? null;
707707
$job->failOnTimeout = $listener->failOnTimeout ?? false;
708708
$job->tries = method_exists($listener, 'tries') ? $listener->tries(...$data) : ($listener->tries ?? null);
709+
$job->messageGroup = method_exists($listener, 'messageGroup') ? $listener->messageGroup(...$data) : ($listener->messageGroup ?? null);
710+
$job->withDeduplicator(method_exists($listener, 'deduplicator')
711+
? $listener->deduplicator(...$data)
712+
: (method_exists($listener, 'deduplicationId') ? $listener->deduplicationId(...) : null)
713+
);
709714

710715
$job->through(array_merge(
711716
method_exists($listener, 'middleware') ? $listener->middleware(...$data) : [],

src/Illuminate/Events/QueuedClosure.php

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,20 @@ class QueuedClosure
3131
*/
3232
public $queue;
3333

34+
/**
35+
* The job "group" the job should be sent to.
36+
*
37+
* @var string|null
38+
*/
39+
public $messageGroup;
40+
41+
/**
42+
* The job deduplicator callback the job should use to generate the deduplication ID.
43+
*
44+
* @var \Laravel\SerializableClosure\SerializableClosure|null
45+
*/
46+
public $deduplicator;
47+
3448
/**
3549
* The number of seconds before the job should be made available.
3650
*
@@ -81,6 +95,38 @@ public function onQueue($queue)
8195
return $this;
8296
}
8397

98+
/**
99+
* Set the desired job "group".
100+
*
101+
* This feature is only supported by some queues, such as Amazon SQS.
102+
*
103+
* @param \UnitEnum|string $group
104+
* @return $this
105+
*/
106+
public function onGroup($group)
107+
{
108+
$this->messageGroup = enum_value($group);
109+
110+
return $this;
111+
}
112+
113+
/**
114+
* Set the desired job deduplicator callback.
115+
*
116+
* This feature is only supported by some queues, such as Amazon SQS FIFO.
117+
*
118+
* @param callable|null $deduplicator
119+
* @return $this
120+
*/
121+
public function withDeduplicator($deduplicator)
122+
{
123+
$this->deduplicator = $deduplicator instanceof Closure
124+
? new SerializableClosure($deduplicator)
125+
: $deduplicator;
126+
127+
return $this;
128+
}
129+
84130
/**
85131
* Set the desired delay in seconds for the job.
86132
*
@@ -121,7 +167,12 @@ public function resolve()
121167
'catch' => (new Collection($this->catchCallbacks))
122168
->map(fn ($callback) => new SerializableClosure($callback))
123169
->all(),
124-
]))->onConnection($this->connection)->onQueue($this->queue)->delay($this->delay);
170+
]))
171+
->onConnection($this->connection)
172+
->onQueue($this->queue)
173+
->delay($this->delay)
174+
->onGroup($this->messageGroup)
175+
->withDeduplicator($this->deduplicator);
125176
};
126177
}
127178
}

src/Illuminate/Foundation/Bus/PendingDispatch.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ public function onGroup($group)
7878
return $this;
7979
}
8080

81+
/**
82+
* Set the desired job deduplicator callback.
83+
*
84+
* This feature is only supported by some queues, such as Amazon SQS FIFO.
85+
*
86+
* @param callable|null $deduplicator
87+
* @return $this
88+
*/
89+
public function withDeduplicator($deduplicator)
90+
{
91+
$this->job->withDeduplicator($deduplicator);
92+
93+
return $this;
94+
}
95+
8196
/**
8297
* Set the desired connection for the chain.
8398
*

src/Illuminate/Mail/Mailable.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,14 @@ public function later($delay, Queue $queue)
262262
*/
263263
protected function newQueuedJob()
264264
{
265+
$messageGroup = $this->messageGroup ?? null;
266+
267+
/** @phpstan-ignore callable.nonNativeMethod (false positive since method_exists guard is used) */
268+
$deduplicator = $this->deduplicator ?? (method_exists($this, 'deduplicationId') ? $this->deduplicationId(...) : null);
269+
265270
return Container::getInstance()->make(SendQueuedMailable::class, ['mailable' => $this])
271+
->onGroup($messageGroup)
272+
->withDeduplicator($deduplicator)
266273
->through(array_merge(
267274
method_exists($this, 'middleware') ? $this->middleware() : [],
268275
$this->middleware ?? []

src/Illuminate/Notifications/NotificationSender.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,18 @@ protected function queueNotification($notifiables, $notification)
247247
$delay = $notification->withDelay($notifiable, $channel) ?? null;
248248
}
249249

250+
$messageGroup = $notification->messageGroup ?? null;
251+
252+
if (method_exists($notification, 'withMessageGroups')) {
253+
$messageGroup = $notification->withMessageGroups($notifiable, $channel) ?? null;
254+
}
255+
256+
$deduplicator = $notification->deduplicator ?? (method_exists($notification, 'deduplicationId') ? $notification->deduplicationId(...) : null);
257+
258+
if (method_exists($notification, 'withDeduplicators')) {
259+
$deduplicator = $notification->withDeduplicators($notifiable, $channel) ?? null;
260+
}
261+
250262
$middleware = $notification->middleware ?? [];
251263

252264
if (method_exists($notification, 'middleware')) {
@@ -265,6 +277,8 @@ protected function queueNotification($notifiables, $notification)
265277
->onConnection($connection)
266278
->onQueue($queue)
267279
->delay(is_array($delay) ? ($delay[$channel] ?? null) : $delay)
280+
->onGroup(is_array($messageGroup) ? ($messageGroup[$channel] ?? null) : $messageGroup)
281+
->withDeduplicator(is_array($deduplicator) ? ($deduplicator[$channel] ?? null) : $deduplicator)
268282
->through($middleware)
269283
);
270284
}

src/Illuminate/Queue/SqsQueue.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public function push($job, $data = '', $queue = null)
163163
$queue,
164164
null,
165165
function ($payload, $queue) use ($job) {
166-
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue));
166+
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $payload));
167167
}
168168
);
169169
}
@@ -200,7 +200,7 @@ public function later($delay, $job, $data = '', $queue = null)
200200
$queue,
201201
$delay,
202202
function ($payload, $queue, $delay) use ($job) {
203-
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $delay));
203+
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue, $payload, $delay));
204204
}
205205
);
206206
}
@@ -210,10 +210,11 @@ function ($payload, $queue, $delay) use ($job) {
210210
*
211211
* @param mixed $job
212212
* @param string|null $queue
213+
* @param string $payload
213214
* @param \DateTimeInterface|\DateInterval|int|null $delay
214215
* @return array{DelaySeconds?: int, MessageGroupId?: string, MessageDeduplicationId?: string}
215216
*/
216-
protected function getQueueableOptions($job, $queue, $delay = null): array
217+
protected function getQueueableOptions($job, $queue, $payload, $delay = null): array
217218
{
218219
// Make sure we have a queue name to properly determine if it's a FIFO queue...
219220
$queue ??= $this->default;
@@ -255,7 +256,8 @@ protected function getQueueableOptions($job, $queue, $delay = null): array
255256

256257
if ($isFifo) {
257258
$messageDeduplicationId = match (true) {
258-
$isObject && method_exists($job, 'deduplicationId') => transform($job->deduplicationId(), $transformToString),
259+
$isObject && isset($job->deduplicator) && is_callable($job->deduplicator) => transform(call_user_func($job->deduplicator, $payload, $queue), $transformToString),
260+
$isObject && method_exists($job, 'deduplicationId') => transform($job->deduplicationId($payload, $queue), $transformToString),
259261
default => (string) Str::orderedUuid(),
260262
};
261263
}

tests/Events/QueuedEventsTest.php

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Illuminate\Events\Dispatcher;
1010
use Illuminate\Queue\QueueManager;
1111
use Illuminate\Support\Testing\Fakes\QueueFake;
12+
use Laravel\SerializableClosure\SerializableClosure;
1213
use Mockery as m;
1314
use PHPUnit\Framework\TestCase;
1415

@@ -197,6 +198,82 @@ public function testQueuePropagateTries()
197198
});
198199
}
199200

201+
public function testQueuePropagateMessageGroupProperty()
202+
{
203+
$d = new Dispatcher;
204+
205+
$fakeQueue = new QueueFake(new Container);
206+
207+
$d->setQueueResolver(function () use ($fakeQueue) {
208+
return $fakeQueue;
209+
});
210+
211+
$d->listen('some.event', TestDispatcherWithMessageGroupProperty::class.'@handle');
212+
$d->dispatch('some.event', ['foo', 'bar']);
213+
214+
$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
215+
return $job->messageGroup === 'group-property';
216+
});
217+
}
218+
219+
public function testQueuePropagateMessageGroupMethodOverProperty()
220+
{
221+
$d = new Dispatcher;
222+
223+
$fakeQueue = new QueueFake(new Container);
224+
225+
$d->setQueueResolver(function () use ($fakeQueue) {
226+
return $fakeQueue;
227+
});
228+
229+
$d->listen('some.event', TestDispatcherWithMessageGroupMethod::class.'@handle');
230+
$d->dispatch('some.event', ['foo', 'bar']);
231+
232+
$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
233+
return $job->messageGroup === 'group-method';
234+
});
235+
}
236+
237+
public function testQueuePropagateDeduplicationIdMethod()
238+
{
239+
$d = new Dispatcher;
240+
241+
$fakeQueue = new QueueFake(new Container);
242+
243+
$d->setQueueResolver(function () use ($fakeQueue) {
244+
return $fakeQueue;
245+
});
246+
247+
$d->listen('some.event', TestDispatcherWithDeduplicationIdMethod::class.'@handle');
248+
$d->dispatch('some.event', ['foo', 'bar']);
249+
250+
$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
251+
$this->assertInstanceOf(SerializableClosure::class, $job->deduplicator);
252+
253+
return is_callable($job->deduplicator) && call_user_func($job->deduplicator, '', null) === 'deduplication-id-method';
254+
});
255+
}
256+
257+
public function testQueuePropagateDeduplicatorMethodOverDeduplicationIdMethod()
258+
{
259+
$d = new Dispatcher;
260+
261+
$fakeQueue = new QueueFake(new Container);
262+
263+
$d->setQueueResolver(function () use ($fakeQueue) {
264+
return $fakeQueue;
265+
});
266+
267+
$d->listen('some.event', TestDispatcherWithDeduplicatorMethod::class.'@handle');
268+
$d->dispatch('some.event', ['foo', 'bar']);
269+
270+
$fakeQueue->assertPushed(CallQueuedListener::class, function ($job) {
271+
$this->assertInstanceOf(SerializableClosure::class, $job->deduplicator);
272+
273+
return is_callable($job->deduplicator) && call_user_func($job->deduplicator, '', null) === 'deduplicator-method';
274+
});
275+
}
276+
200277
public function testQueuePropagateMiddleware()
201278
{
202279
$d = new Dispatcher;
@@ -323,6 +400,62 @@ public function handle()
323400
}
324401
}
325402

403+
class TestDispatcherWithMessageGroupProperty implements ShouldQueue
404+
{
405+
public $messageGroup = 'group-property';
406+
407+
public function handle()
408+
{
409+
//
410+
}
411+
}
412+
413+
class TestDispatcherWithMessageGroupMethod implements ShouldQueue
414+
{
415+
public $messageGroup = 'group-property';
416+
417+
public function handle()
418+
{
419+
//
420+
}
421+
422+
public function messageGroup($event)
423+
{
424+
return 'group-method';
425+
}
426+
}
427+
428+
class TestDispatcherWithDeduplicationIdMethod implements ShouldQueue
429+
{
430+
public function handle()
431+
{
432+
//
433+
}
434+
435+
public function deduplicationId($payload, $queue)
436+
{
437+
return 'deduplication-id-method';
438+
}
439+
}
440+
441+
class TestDispatcherWithDeduplicatorMethod implements ShouldQueue
442+
{
443+
public function handle()
444+
{
445+
//
446+
}
447+
448+
public function deduplicationId($payload, $queue)
449+
{
450+
return 'deduplication-id-method';
451+
}
452+
453+
public function deduplicator($event)
454+
{
455+
return fn ($payload, $queue) => 'deduplicator-method';
456+
}
457+
}
458+
326459
class TestDispatcherMiddleware implements ShouldQueue
327460
{
328461
public function middleware($a, $b)

0 commit comments

Comments
 (0)