Skip to content

Commit 0e94580

Browse files
committed
Add CONTROL_MSG_HEARTBEAT to inject a heartbeat event into the engine every X seconds as configured. Rules can use this for updating things, checking for missing data (including batch jobs detecting missing data) and for logging. Defaults to Off (0 seconds)
1 parent d7f3651 commit 0e94580

File tree

1 file changed

+29
-2
lines changed

1 file changed

+29
-2
lines changed

src/Scheduler.php

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ class Scheduler implements LoggerAwareInterface {
173173
*/
174174
const CONTROL_MSG_STOP = 'PHP-EC:Engine:Stop';
175175

176+
/**
177+
* Event type when the heartbeat time goes off
178+
*/
179+
const CONTROL_MSG_HEARTBEAT = 'PHP-EC:Engine:Heartbeat';
180+
176181
/** @var int Level at which the memory pressure is considered resolved */
177182
const MEMORY_PRESSURE_LOW_WATERMARK = 35;
178183

@@ -235,6 +240,11 @@ class Scheduler implements LoggerAwareInterface {
235240
/** @var null|Server */
236241
protected ?Server $managementServer = null;
237242

243+
/**
244+
* @var int Number of seconds between heartbeats or 0 to disable
245+
*/
246+
protected int $heartbeat_seconds = 0;
247+
238248
/**
239249
* @param array<class-string<IEventMatcher>> $rules An array of Rules defined by classNames
240250
*/
@@ -588,7 +598,7 @@ public function setup_save_state() : void
588598
}
589599
});
590600

591-
$this->scheduledTasks[] = $this->loop->addPeriodicTimer($this->saveStateSeconds, function() {
601+
$this->scheduledTasks['scheduledSaveState'] = $this->loop->addPeriodicTimer($this->saveStateSeconds, function() {
592602
if (($this->engine->isDirty() || $this->dirty) && false === $this->saveStateHandler->asyncSaveInProgress())
593603
{
594604
/** Clear the dirty flags before calling the async save process.
@@ -602,8 +612,10 @@ public function setup_save_state() : void
602612
});
603613

604614
/** Set up an hourly time to save state (or skip if we are already saving state when this timer fires) */
605-
$this->scheduledTasks[] = $this->loop->addPeriodicTimer(3600, function() {
615+
$this->scheduledTasks['hourlySaveState'] = $this->loop->addPeriodicTimer(3600, function() {
606616
if (false === $this->saveStateHandler->asyncSaveInProgress()) {
617+
$this->engine->clearDirtyFlag();
618+
$this->dirty = false;
607619
$this->saveStateHandler->saveStateAsync($this->buildState());
608620
}
609621
});
@@ -627,6 +639,14 @@ public function setSaveStateInterval(float $seconds) : void {
627639
$this->saveStateSeconds = $seconds;
628640
}
629641

642+
/**
643+
* @param int $seconds
644+
* @return void
645+
*/
646+
public function setHeartbeatInterval(int $seconds) : void {
647+
$this->heartbeat_seconds = $seconds;
648+
}
649+
630650
/**
631651
* Scheduling timeouts is only supported when the engine is running in live mode. The Correlation engine will check timeouts for batch mode within the handle() function
632652
* @throws Exception
@@ -795,6 +815,13 @@ public function run() : void
795815
/** Initialise the state saving task */
796816
$this->setup_save_state();
797817

818+
/** Initialise Heartbeat timer */
819+
if ($this->heartbeat_seconds > 0) {
820+
$this->scheduledTasks['heartbeat'] = $this->loop->addPeriodicTimer($this->heartbeat_seconds, function() {
821+
$this->engine->handle(new Event(['event' => static::CONTROL_MSG_HEARTBEAT]));
822+
});
823+
}
824+
798825
/** Monitor memory usage */
799826
$sysInfo = new SysInfo();
800827
$this->memoryLimit = $sysInfo->getMemoryLimit();

0 commit comments

Comments
 (0)