Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ before_script:
- wget 'https://github.com/xp-framework/xp-runners/releases/download/v5.0.0/setup' -O - | php
- wget 'https://github.com/xp-framework/core/releases/download/v6.0.0alpha7/xp-rt-6.0.0alpha7.xar'
- ls -1 *.xar > boot.pth
- echo "use=." > xp.ini
- echo "[runtime]" >> xp.ini
- echo "date.timezone=Europe/Berlin" >> xp.ini

Expand Down
59 changes: 59 additions & 0 deletions src/main/php/util/data/Distribution.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php namespace util\data;

/**
*
*/
class Distribution extends \lang\Object implements \Iterator {
protected $it;
protected $workers;
protected $inv;

/**
* Creates a new Generator instance
*
* @param php.Iterator $it
* @param util.data.Workers $workers
*/
public function __construct(\Iterator $it, Workers $workers) {
$this->it= $it;
$this->workers= $workers;
$this->inv= 0;
}

protected function enqueue() {
while ($this->valid() && $this->workers->enqueue($this->it->current())) {
$this->it->next();
}
}

/** @return void */
public function rewind() {
$this->it->rewind();
$this->enqueue();
}

/** @return var */
public function current() {
return $this->workers->dequeue();
}

/** @return int */
public function key() {
return $this->inv++;
}

/** @return void */
public function next() {
if ($this->workers->pending()) {
// Nothing
} else {
$this->it->next();
$this->enqueue();
}
}

/** @return bool */
public function valid() {
return $this->it->valid();
}
}
111 changes: 111 additions & 0 deletions src/main/php/util/data/LocalWorkerProcess.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php namespace util\data;

use peer\Socket;
use lang\Runtime;
use lang\IllegalStateException;
use io\IOException;
use util\collections\Queue;

/**
* A worker process that runs on this machine.
*
* @test xyp://util.data.unittest.LocalWorkerProcessTest
*/
class LocalWorkerProcess extends \lang\Object {
protected $queue, $pending, $cmd, $proc, $comm;

/**
* Creates a new locally running worker process
*
* @param string $class
* @param string[] $args
*/
public function __construct($class, $args= []) {
$this->queue= new Queue();
$this->pending= false;
$this->cmd= $class.' ['.implode(', ', $args).']';
$this->proc= Runtime::getInstance()->newInstance(null, 'class', $class, $args);
$this->connect();
}

/**
* Initiates communication with worker
*
* @return void
* @throws lang.IllegalStateException
*/
protected function connect() {
$line= $this->proc->out->readLine();
if ('+' === $line{0}) {
sscanf($line, '+ %s:%d', $host, $port);
$this->comm= new Socket($host, $port);
$this->comm->connect();
} else {
$this->proc->close();
throw new IllegalStateException('Cannot initiate communication with worker: '.$line);
}
}

public function handle() { return $this->comm->getHandle(); }

public function pending() { return $this->pending; }

/**
* Pass in an element for processing
*
* @param var $element
*/
public function pass($element) {
if ($this->pending) {
$this->queue->put($element);
} else {
$this->comm->write(serialize($element)."\n");
$this->pending= true;
}
}

/**
* Returns a processing result
*
* @return var
* @throws lang.IllegalStateException
*/
public function result() {
if (!$this->pending) {
if ($this->queue->isEmpty()) {
throw new IllegalStateException('No pending results');
}

$this->comm->write(serialize($this->queue->get())."\n");
}

$element= unserialize($this->comm->readLine());
$this->pending= false;
return $element;
}

/**
* Shuts down this worker process
*
* @return int
*/
public function shutdown() {
if (-1 === $this->proc->exitValue()) {
try {
$this->comm->write("SHUTDOWN\n");
$this->comm->close();
} catch (IOException $ignored) { }

$this->proc->close();
}
return $this->proc->exitValue();
}

public function __destruct() {
$this->shutdown();
}

public function toString() {
return $this->getClassName().'(pid= '.$this->proc->getProcessId().', cmd= '.$this->cmd.')';
}
}
11 changes: 10 additions & 1 deletion src/main/php/util/data/Sequence.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ public function peek($action) {
}));
}

/**
* Returns a new stream which maps the given function to each element
*
* @param util.data.Workers $workers
* @return self
*/
public function distribute(Workers $workers) {
return new self(new Distribution($this->getIterator(), $workers));
}

/**
* Returns a new stream which counts the number of elements as iteration
* proceeeds. A short form of `peek()` with a function incrementing a local
Expand All @@ -406,7 +416,6 @@ public function counting(&$count) {
}));
}


/**
* Returns a stream with distinct elements
*
Expand Down
34 changes: 34 additions & 0 deletions src/main/php/util/data/WorkerFunctions.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php namespace util\data;

/**
*
*/
class WorkerFunctions extends \lang\Object implements Workers {
protected $functions, $offset;

/**
* Creates a new WorkerFunctions instance
*
* @param php.Closure[] $functions
*/
public function __construct(array $functions) {
$this->functions= $functions;
$this->offset= 0;
}

public function enqueue($element) {
$this->result= $this->functions[$this->offset]->__invoke($element);
if (++$this->offset >= sizeof($this->functions)) {
$this->offset= 0;
}
return false;
}

public function pending() {
return false;
}

public function dequeue() {
return $this->result;
}
}
71 changes: 71 additions & 0 deletions src/main/php/util/data/WorkerProcesses.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php namespace util\data;

use io\IOException;

/**
*
*/
class WorkerProcesses extends \lang\Object implements Workers {
protected $processes, $timeout, $offset, $waitHandles;

/**
* Creates a new WorkerProcesses instance
*
* @param util.data.WorkerProcess[] $processes
* @param double $timeout
*/
public function __construct(array $processes, $timeout= null) {
$this->processes= $processes;
$this->timeout= $timeout;
$this->offset= 0;
$this->pending= [];
$this->waitHandles= [];
foreach ($this->processes as $i => $process) {
$this->waitHandles[$i]= $process->handle();
}
}

public function enqueue($element) {
$this->processes[$this->offset]->pass($element);
$this->pending[$this->offset]= true;

if (++$this->offset >= sizeof($this->processes)) {
$this->offset= 0;
return false;
}
return true;
}

public function pending() {
return !empty($this->pending);
}

/**
* Wait for the first worker process to become ready, and return its result
*
* @param var[] $r
* @param double $timeout
* @param util.data.WorkerProcess
*/
protected function waitFor($r, $timeout) {
if (null === $timeout) {
$tv_sec= $tv_usec= null;
} else {
$tv_sec= intval(floor($timeout));
$tv_usec= intval(($timeout - floor($timeout)) * 1000000);
}

$w= $e= null;
if (false === stream_select($r, $w, $e, $tv_sec, $tv_usec) || empty($r)) {
throw new IOException('No results present'.($this->timeout ? ' after '.$this->timeout.' seconds' : ''));
}

$offset= key($r);
unset($this->pending[$offset]);
return $this->processes[$offset];
}

public function dequeue() {
return $this->waitFor($this->waitHandles, $this->timeout)->result();
}
}
13 changes: 13 additions & 0 deletions src/main/php/util/data/Workers.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php namespace util\data;

/**
*
*/
interface Workers {

public function enqueue($element);

public function pending();

public function dequeue();
}
49 changes: 49 additions & 0 deletions src/test/php/util/data/unittest/LocalWorkerProcessTest.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php namespace util\data\unittest;

use util\data\LocalWorkerProcess;

class LocalWorkerProcessTest extends \unittest\TestCase {
private static $worker;

#[@beforeClass]
public static function startWorker() {
self::$worker= new LocalWorkerProcess('util.data.unittest.Worker', [3]);
}

#[@afterClass]
public static function stopWorker() {
self::$worker->shutdown();
}

#[@test]
public function pass_and_result_roundtrip() {
self::$worker->pass(1);
$this->assertEquals(3, self::$worker->result());
}

#[@test]
public function elements_are_processed_consecutively() {
self::$worker->pass(1);
$this->assertEquals(3, self::$worker->result());
self::$worker->pass(2);
$this->assertEquals(6, self::$worker->result());
}

#[@test, @values([
# [[1, 2]],
# [[1, 2, 3]],
# [[1, 2, 3, 4]]
#])]
public function elements_can_be_processed_in_batches($values) {
foreach ($values as $value) {
self::$worker->pass($value);
}

$recv= [];
foreach ($values as $value) {
$recv[]= self::$worker->result() / 3;
}

$this->assertEquals($values, $recv);
}
}
Loading