Skip to content

Commit a24f3ba

Browse files
committed
Processed timed-out Rules before handled Rules, ensuring that we don't call fire() twice for a timed-out Rule. Was noticeable with Cron shutdown onSchedule firing twice.
1 parent 95ddbfa commit a24f3ba

File tree

2 files changed

+119
-14
lines changed

2 files changed

+119
-14
lines changed

src/CorrelationEngine.php

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public function handle(Event $event) : void
173173
$expecting = $matcher->nextAcceptedEvents();
174174
$result = $matcher->handle($event);
175175
if ($this->isFlagSet($result, $matcher::EVENT_HANDLED)) {
176-
$handledMatchers[] = $matcher;
176+
$handledMatchers[spl_object_id($matcher)] = $matcher;
177177
$skipMatchers[] = get_class($matcher);
178178
/** Update which events we are expecting next **/
179179
$this->removeWatchForEvents($matcher, $expecting);
@@ -186,7 +186,7 @@ public function handle(Event $event) : void
186186

187187
// If we are timed out then flag the matcher for timeout processing.
188188
if ($this->isFlagSet($result, $matcher::EVENT_TIMEOUT)) {
189-
$timedOutMatchers[] = $matcher;
189+
$timedOutMatchers[spl_object_id($matcher)] = $matcher;
190190
}
191191

192192
//Matcher has told us to suppress further processing of this event.
@@ -214,7 +214,7 @@ public function handle(Event $event) : void
214214
$matcher = $this->constructMatcher($class);
215215
$result = $matcher->handle($event);
216216
if ($this->isFlagSet($result, $matcher::EVENT_HANDLED)) {
217-
$handledMatchers[] = $matcher;
217+
$handledMatchers[spl_object_id($matcher)] = $matcher;
218218
$this->eventProcessors[spl_object_id($matcher)] = $matcher;
219219
$this->incrStat('init_matcher', $class);
220220
$this->incrStat('handled', (string)$event->event . "|" . get_class($matcher));
@@ -241,8 +241,21 @@ public function handle(Event $event) : void
241241
$this->incrStat('unhandled', (string)$event->event);
242242
}
243243

244-
/** For any matchers that processed this event fire any actions, then update timeout or destroy if complete **/
245244
$stateChanged = !(empty($handledMatchers) && empty($timedOutMatchers));
245+
246+
/** Fire any action and destroy any timed out matchers **/
247+
foreach ($timedOutMatchers as $objectId => $matcher) {
248+
$this->removeTimeout($matcher);
249+
$matcher->fire();
250+
/** Record stat of matcher timeout */
251+
$this->incrStat('completed_matcher_timeout', get_class($matcher));
252+
$this->removeMatcher($matcher);
253+
unset($handledMatchers[$objectId]);
254+
unset($matcher);
255+
}
256+
unset($timedOutMatchers);
257+
258+
/** For any matchers that processed this event fire any actions, then update timeout or destroy if complete **/
246259
foreach ($handledMatchers as $matcher) {
247260
$matcher->fire();
248261
$this->addTimeout($matcher);
@@ -254,16 +267,7 @@ public function handle(Event $event) : void
254267
unset($matcher);
255268
}
256269
}
257-
258-
/** Fire any action and destroy any timed out matchers **/
259-
foreach ($timedOutMatchers as $matcher) {
260-
$this->removeTimeout($matcher);
261-
$matcher->fire();
262-
/** Record stat of matcher timeout */
263-
$this->incrStat('completed_matcher_timeout', get_class($matcher));
264-
$this->removeMatcher($matcher);
265-
unset($matcher);
266-
}
270+
unset($handledMatchers);
267271

268272
/** Flag as dirty except when we have a control message and state doesn't change **/
269273
if ($stateChanged || !in_array($event->event, Scheduler::CONTROL_MESSAGES, true)) {

tests/cron_timer.php

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
<?php declare(strict_types=1);
2+
3+
use EdgeTelemetrics\EventCorrelation\Event;
4+
use EdgeTelemetrics\EventCorrelation\Rule;
5+
use Bref\Logger\StderrLogger;
6+
use EdgeTelemetrics\EventCorrelation\Scheduler;
7+
8+
use Psr\Log\LogLevel;
9+
use function EdgeTelemetrics\EventCorrelation\php_cmd;
10+
use function EdgeTelemetrics\EventCorrelation\wrap_source_php_cmd;
11+
12+
error_reporting(E_ALL);
13+
ini_set('display_errors', "on");
14+
15+
include __DIR__ . "/../vendor/autoload.php";
16+
17+
class EveryMinuteCron extends Rule\Cron {
18+
19+
const TIMEZONE = 'Australia/Sydney';
20+
const CRON_SCHEDULE = '* * * * *';
21+
22+
public function onSchedule(): void
23+
{
24+
error_log('MINUTE CRON: Minute Cron schedule reached ' . (new DateTimeImmutable())->format('Y-m-d H:i:s'));
25+
}
26+
}
27+
28+
class InitCron extends Rule\Cron {
29+
const CRON_SCHEDULE = Rule\Cron::ON_INITIALISATION;
30+
31+
public function onSchedule(): void
32+
{
33+
error_log('INIT CRON: System has initialised');
34+
}
35+
}
36+
37+
class ShutdownCron extends Rule\Cron {
38+
const CRON_SCHEDULE = Rule\Cron::ON_SHUTDOWN;
39+
40+
public function onSchedule(): void
41+
{
42+
error_log('SHUTDOWN CRON: System shutting down');
43+
}
44+
}
45+
46+
$numberGenClass = new class() extends Scheduler\SourceFunction {
47+
protected \React\EventLoop\TimerInterface $timer;
48+
49+
function functionStart(): void
50+
{
51+
$this->timer = $this->loop->addPeriodicTimer(5.0, function () {
52+
static $count = 1;
53+
try {
54+
$event = new Event(['event' => 'SampleValueEvent', 'value' => $count++]);
55+
$this->emit('data', [$event]);
56+
} catch (Throwable $exception) {
57+
$this->emit('error', [$exception]);
58+
$this->exit(255);
59+
return;
60+
}
61+
});
62+
}
63+
64+
function exit(int $code = 0): void
65+
{
66+
$this->running = false;
67+
$this->loop->cancelTimer($this->timer);
68+
unset($this->timer);
69+
$this->emit('exit', [$code]);
70+
}
71+
72+
function functionStop(): void
73+
{
74+
$this->exit();
75+
}
76+
};
77+
78+
$scheduler = new class([EveryMinuteCron::class, InitCron::class, ShutdownCron::class]) extends Scheduler {
79+
public function __construct(array $rules)
80+
{
81+
parent::__construct($rules);
82+
set_exception_handler([$this, "handle_exception"]);
83+
$this->setLogger(new StderrLogger(LogLevel::DEBUG));
84+
85+
//$this->engine->setEventStreamLive();
86+
87+
$this->setSavefileName("/tmp/php_ec-scheduler_test.state");
88+
$this->setSaveStateInterval(1);
89+
$this->enableManagementServer(true);
90+
$this->setHeartbeatInterval(10);
91+
}
92+
93+
function handle_exception($exception): void
94+
{
95+
$this->logger->emergency("Fatal", ['exception' => $exception,]);
96+
}
97+
};
98+
99+
$scheduler->register_input_process('generator', $numberGenClass, null, [], false);
100+
101+
$scheduler->run();

0 commit comments

Comments
 (0)