Skip to content

Commit ec0e11b

Browse files
committed
Misc. cleanup
1 parent f1a30b4 commit ec0e11b

File tree

4 files changed

+179
-349
lines changed

4 files changed

+179
-349
lines changed

src/Consumer.php

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,63 +2,47 @@
22

33
namespace Gos\Component\ReactAMQP;
44

5-
use AMQPQueue;
6-
use BadMethodCallException;
75
use Evenement\EventEmitter;
86
use React\EventLoop\LoopInterface;
97
use React\EventLoop\TimerInterface;
108

119
/**
12-
* Class to listen to an AMQP queue and dispatch listeners when messages are
13-
* received.
10+
* Class to listen to an AMQP queue and dispatch listeners when messages are received.
1411
*
15-
* @author Jeremy Cook <[email protected]>
12+
* @author Jeremy Cook <[email protected]>
1613
*/
17-
class Consumer extends EventEmitter
14+
final class Consumer extends EventEmitter
1815
{
1916
/**
20-
* AMQP message queue to read messages from.
21-
*
22-
* @var AMQPQueue
17+
* @var \AMQPQueue
2318
*/
24-
protected $queue;
19+
private $queue;
2520

2621
/**
27-
* Event loop.
28-
*
2922
* @var LoopInterface
3023
*/
31-
protected $loop;
24+
private $loop;
3225

3326
/**
34-
* Flag to indicate if this listener is closed.
35-
*
3627
* @var bool
3728
*/
38-
protected $closed = false;
29+
private $closed = false;
3930

4031
/**
41-
* Max number of messages to consume in a 'batch'. Should stop the event
42-
* loop stopping on this class for protracted lengths of time.
32+
* Max number of messages to consume in a 'batch'.
33+
*
34+
* Should stop the event loop stopping on this class for protracted lengths of time.
4335
*
4436
* @var int
4537
*/
46-
protected $max;
38+
private $max;
4739

4840
/**
4941
* @var TimerInterface
5042
*/
5143
private $timer;
5244

53-
/**
54-
* Constructor. Stores the message queue and the event loop for use.
55-
*
56-
* @param AMQPQueue $queue Message queue
57-
* @param LoopInterface $loop Event loop
58-
* @param float|null $interval Interval to check for new messages
59-
* @param int|null $max Max number of messages to consume in one go
60-
*/
61-
public function __construct(AMQPQueue $queue, LoopInterface $loop, ?float $interval, ?int $max = null)
45+
public function __construct(\AMQPQueue $queue, LoopInterface $loop, float $interval, ?int $max = null)
6246
{
6347
$this->queue = $queue;
6448
$this->loop = $loop;
@@ -69,31 +53,31 @@ public function __construct(AMQPQueue $queue, LoopInterface $loop, ?float $inter
6953
}
7054

7155
/**
72-
* Method to handle receiving an incoming message.
56+
* Handles receiving an incoming message.
7357
*
74-
* @throws \AMQPChannelException|\AMQPConnectionException
58+
* @throws \AMQPChannelException
59+
* @throws \AMQPConnectionException
60+
* @throws \BadMethodCallException if the consumer connection has been closed
7561
*/
7662
public function __invoke(): void
7763
{
7864
if ($this->closed) {
79-
throw new BadMethodCallException('This consumer object is closed and cannot receive any more messages.');
65+
throw new \BadMethodCallException('This consumer object is closed and cannot receive any more messages.');
8066
}
8167

8268
$counter = 0;
69+
8370
while ($envelope = $this->queue->get()) {
8471
$this->emit('consume', [$envelope, $this->queue]);
72+
8573
if ($this->max && ++$counter >= $this->max) {
8674
return;
8775
}
8876
}
8977
}
9078

9179
/**
92-
* Allows calls to unknown methods to be passed through to the queue
93-
* stored.
94-
*
95-
* @param string $method Method name
96-
* @param mixed $args Args to pass
80+
* Allows calls to unknown methods to be passed through to the queue store.
9781
*
9882
* @return mixed
9983
*/
@@ -102,9 +86,6 @@ public function __call(string $method, $args)
10286
return \call_user_func_array([$this->queue, $method], $args);
10387
}
10488

105-
/**
106-
* Method to call when stopping listening to messages.
107-
*/
10889
public function close(): void
10990
{
11091
if ($this->closed) {
@@ -117,4 +98,9 @@ public function close(): void
11798
$this->queue = null;
11899
$this->closed = true;
119100
}
101+
102+
public function isClosed(): bool
103+
{
104+
return true === $this->closed;
105+
}
120106
}

src/Producer.php

Lines changed: 27 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,7 @@
22

33
namespace Gos\Component\ReactAMQP;
44

5-
use AMQPExchange;
6-
use AMQPExchangeException;
7-
use BadMethodCallException;
8-
use Countable;
95
use Evenement\EventEmitter;
10-
use IteratorAggregate;
116
use React\EventLoop\LoopInterface;
127
use React\EventLoop\TimerInterface;
138

@@ -16,93 +11,63 @@
1611
*
1712
* @author Jeremy Cook <[email protected]>
1813
*/
19-
class Producer extends EventEmitter implements Countable, IteratorAggregate
14+
final class Producer extends EventEmitter implements \Countable, \IteratorAggregate
2015
{
2116
/**
22-
* AMQP message exchange to send messages to.
23-
*
24-
* @var AMQPExchange
17+
* @var \AMQPExchange
2518
*/
26-
protected $exchange;
19+
private $exchange;
2720

2821
/**
29-
* Event loop.
30-
*
3122
* @var LoopInterface
3223
*/
33-
protected $loop;
24+
private $loop;
3425

3526
/**
36-
* Flag to indicate if this listener is closed.
37-
*
3827
* @var bool
3928
*/
40-
protected $closed = false;
29+
private $closed = false;
4130

4231
/**
43-
* Collection of messages waiting to be sent.
44-
*
4532
* @var array
4633
*/
47-
protected $messages = [];
34+
private $messages = [];
4835

4936
/**
5037
* @var TimerInterface
5138
*/
52-
protected $timer;
39+
private $timer;
5340

54-
/**
55-
* Constructor. Stores the message queue and the event loop for use.
56-
*
57-
* @param AMQPExchange $exchange Message queue
58-
* @param LoopInterface $loop Event loop
59-
* @param float $interval Interval to run loop to send messages
60-
*/
61-
public function __construct(AMQPExchange $exchange, LoopInterface $loop, $interval)
41+
public function __construct(\AMQPExchange $exchange, LoopInterface $loop, float $interval)
6242
{
6343
$this->exchange = $exchange;
6444
$this->loop = $loop;
6545
$this->timer = $this->loop->addPeriodicTimer($interval, $this);
6646
}
6747

68-
/**
69-
* Returns the number of messages waiting to be sent. Implements the
70-
* countable interface.
71-
*
72-
* @return int
73-
*/
7448
public function count(): int
7549
{
7650
return \count($this->messages);
7751
}
7852

79-
/**
80-
* Returns the array of messages stored. Completes the implementation of
81-
* the iteratorAggregate interface.
82-
*
83-
* @return array
84-
*/
8553
public function getIterator(): array
8654
{
8755
return $this->messages;
8856
}
8957

9058
/**
91-
* Method to publish a message to an AMQP exchange. Has the same method
92-
* signature as the exchange objects publish method.
59+
* Publishes a message to an AMQP exchange.
9360
*
94-
* @param string $message Message
95-
* @param string $routingKey Routing key
96-
* @param int|null $flags Flags
97-
* @param array $attributes Attributes
61+
* Has the same method signature as the exchange object's publish method.
9862
*
99-
* @throws BadMethodCallException
63+
* @throws \BadMethodCallException if the producer connection has been closed
10064
*/
101-
public function publish(string $message, string $routingKey, ?int $flags = null, $attributes = []): void
65+
public function publish(string $message, string $routingKey, int $flags = 0 /* AMQP_NOPARAM */, array $attributes = []): void
10266
{
10367
if ($this->closed) {
104-
throw new BadMethodCallException('This Producer object is closed and cannot send any more messages.');
68+
throw new \BadMethodCallException('This Producer object is closed and cannot send any more messages.');
10569
}
70+
10671
$this->messages[] = [
10772
'message' => $message,
10873
'routingKey' => $routingKey,
@@ -112,32 +77,31 @@ public function publish(string $message, string $routingKey, ?int $flags = null,
11277
}
11378

11479
/**
115-
* Callback to dispatch on the loop timer.
80+
* Handles publishing outgoing messages.
11681
*
117-
* @throws \AMQPChannelException|\AMQPConnectionException
82+
* @throws \AMQPChannelException
83+
* @throws \AMQPConnectionException
84+
* @throws \BadMethodCallException if the consumer connection has been closed
11885
*/
11986
public function __invoke(): void
12087
{
12188
if ($this->closed) {
122-
throw new BadMethodCallException('This Producer object is closed and cannot send any more messages.');
89+
throw new \BadMethodCallException('This Producer object is closed and cannot send any more messages.');
12390
}
91+
12492
foreach ($this->messages as $key => $message) {
12593
try {
12694
$this->exchange->publish($message['message'], $message['routingKey'], $message['flags'], $message['attributes']);
12795
unset($this->messages[$key]);
12896
$this->emit('produce', array_values($message));
129-
} catch (AMQPExchangeException $e) {
97+
} catch (\AMQPExchangeException $e) {
13098
$this->emit('error', [$e]);
13199
}
132100
}
133101
}
134102

135103
/**
136-
* Allows calls to unknown methods to be passed through to the exchange
137-
* stored.
138-
*
139-
* @param string $method Method name
140-
* @param mixed $args Args to pass
104+
* Allows calls to unknown methods to be passed through to the exchange store.
141105
*
142106
* @return mixed
143107
*/
@@ -146,9 +110,6 @@ public function __call($method, $args)
146110
return \call_user_func_array([$this->exchange, $method], $args);
147111
}
148112

149-
/**
150-
* Method to call when stopping listening to messages.
151-
*/
152113
public function close(): void
153114
{
154115
if ($this->closed) {
@@ -161,4 +122,9 @@ public function close(): void
161122
$this->exchange = null;
162123
$this->closed = true;
163124
}
125+
126+
public function isClosed(): bool
127+
{
128+
return true === $this->closed;
129+
}
164130
}

0 commit comments

Comments
 (0)