diff --git a/.travis.yml b/.travis.yml index 4c5dd1d..cd91bed 100755 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ before_script: - wget 'https://github.com/xp-framework/xp-runners/releases/download/v5.0.0/setup' -O - | php - wget 'https://github.com/xp-framework/core/releases/download/v6.0.0alpha7/xp-rt-6.0.0alpha7.xar' - ls -1 *.xar > boot.pth + - echo "use=." > xp.ini - echo "[runtime]" >> xp.ini - echo "date.timezone=Europe/Berlin" >> xp.ini diff --git a/src/main/php/util/data/Distribution.class.php b/src/main/php/util/data/Distribution.class.php new file mode 100755 index 0000000..f012cab --- /dev/null +++ b/src/main/php/util/data/Distribution.class.php @@ -0,0 +1,59 @@ +it= $it; + $this->workers= $workers; + $this->inv= 0; + } + + protected function enqueue() { + while ($this->valid() && $this->workers->enqueue($this->it->current())) { + $this->it->next(); + } + } + + /** @return void */ + public function rewind() { + $this->it->rewind(); + $this->enqueue(); + } + + /** @return var */ + public function current() { + return $this->workers->dequeue(); + } + + /** @return int */ + public function key() { + return $this->inv++; + } + + /** @return void */ + public function next() { + if ($this->workers->pending()) { + // Nothing + } else { + $this->it->next(); + $this->enqueue(); + } + } + + /** @return bool */ + public function valid() { + return $this->it->valid(); + } +} \ No newline at end of file diff --git a/src/main/php/util/data/LocalWorkerProcess.class.php b/src/main/php/util/data/LocalWorkerProcess.class.php new file mode 100755 index 0000000..fc3671a --- /dev/null +++ b/src/main/php/util/data/LocalWorkerProcess.class.php @@ -0,0 +1,111 @@ +queue= new Queue(); + $this->pending= false; + $this->cmd= $class.' ['.implode(', ', $args).']'; + $this->proc= Runtime::getInstance()->newInstance(null, 'class', $class, $args); + $this->connect(); + } + + /** + * Initiates communication with worker + * + * @return void + * @throws lang.IllegalStateException + */ + protected function connect() { + $line= $this->proc->out->readLine(); + if ('+' === $line{0}) { + sscanf($line, '+ %s:%d', $host, $port); + $this->comm= new Socket($host, $port); + $this->comm->connect(); + } else { + $this->proc->close(); + throw new IllegalStateException('Cannot initiate communication with worker: '.$line); + } + } + + public function handle() { return $this->comm->getHandle(); } + + public function pending() { return $this->pending; } + + /** + * Pass in an element for processing + * + * @param var $element + */ + public function pass($element) { + if ($this->pending) { + $this->queue->put($element); + } else { + $this->comm->write(serialize($element)."\n"); + $this->pending= true; + } + } + + /** + * Returns a processing result + * + * @return var + * @throws lang.IllegalStateException + */ + public function result() { + if (!$this->pending) { + if ($this->queue->isEmpty()) { + throw new IllegalStateException('No pending results'); + } + + $this->comm->write(serialize($this->queue->get())."\n"); + } + + $element= unserialize($this->comm->readLine()); + $this->pending= false; + return $element; + } + + /** + * Shuts down this worker process + * + * @return int + */ + public function shutdown() { + if (-1 === $this->proc->exitValue()) { + try { + $this->comm->write("SHUTDOWN\n"); + $this->comm->close(); + } catch (IOException $ignored) { } + + $this->proc->close(); + } + return $this->proc->exitValue(); + } + + public function __destruct() { + $this->shutdown(); + } + + public function toString() { + return $this->getClassName().'(pid= '.$this->proc->getProcessId().', cmd= '.$this->cmd.')'; + } +} diff --git a/src/main/php/util/data/Sequence.class.php b/src/main/php/util/data/Sequence.class.php index 1398641..03a77bd 100755 --- a/src/main/php/util/data/Sequence.class.php +++ b/src/main/php/util/data/Sequence.class.php @@ -391,6 +391,16 @@ public function peek($action) { })); } + /** + * Returns a new stream which maps the given function to each element + * + * @param util.data.Workers $workers + * @return self + */ + public function distribute(Workers $workers) { + return new self(new Distribution($this->getIterator(), $workers)); + } + /** * Returns a new stream which counts the number of elements as iteration * proceeeds. A short form of `peek()` with a function incrementing a local @@ -406,7 +416,6 @@ public function counting(&$count) { })); } - /** * Returns a stream with distinct elements * diff --git a/src/main/php/util/data/WorkerFunctions.class.php b/src/main/php/util/data/WorkerFunctions.class.php new file mode 100755 index 0000000..3fac6de --- /dev/null +++ b/src/main/php/util/data/WorkerFunctions.class.php @@ -0,0 +1,34 @@ +functions= $functions; + $this->offset= 0; + } + + public function enqueue($element) { + $this->result= $this->functions[$this->offset]->__invoke($element); + if (++$this->offset >= sizeof($this->functions)) { + $this->offset= 0; + } + return false; + } + + public function pending() { + return false; + } + + public function dequeue() { + return $this->result; + } +} \ No newline at end of file diff --git a/src/main/php/util/data/WorkerProcesses.class.php b/src/main/php/util/data/WorkerProcesses.class.php new file mode 100755 index 0000000..a6348d6 --- /dev/null +++ b/src/main/php/util/data/WorkerProcesses.class.php @@ -0,0 +1,71 @@ +processes= $processes; + $this->timeout= $timeout; + $this->offset= 0; + $this->pending= []; + $this->waitHandles= []; + foreach ($this->processes as $i => $process) { + $this->waitHandles[$i]= $process->handle(); + } + } + + public function enqueue($element) { + $this->processes[$this->offset]->pass($element); + $this->pending[$this->offset]= true; + + if (++$this->offset >= sizeof($this->processes)) { + $this->offset= 0; + return false; + } + return true; + } + + public function pending() { + return !empty($this->pending); + } + + /** + * Wait for the first worker process to become ready, and return its result + * + * @param var[] $r + * @param double $timeout + * @param util.data.WorkerProcess + */ + protected function waitFor($r, $timeout) { + if (null === $timeout) { + $tv_sec= $tv_usec= null; + } else { + $tv_sec= intval(floor($timeout)); + $tv_usec= intval(($timeout - floor($timeout)) * 1000000); + } + + $w= $e= null; + if (false === stream_select($r, $w, $e, $tv_sec, $tv_usec) || empty($r)) { + throw new IOException('No results present'.($this->timeout ? ' after '.$this->timeout.' seconds' : '')); + } + + $offset= key($r); + unset($this->pending[$offset]); + return $this->processes[$offset]; + } + + public function dequeue() { + return $this->waitFor($this->waitHandles, $this->timeout)->result(); + } +} \ No newline at end of file diff --git a/src/main/php/util/data/Workers.class.php b/src/main/php/util/data/Workers.class.php new file mode 100755 index 0000000..57808d7 --- /dev/null +++ b/src/main/php/util/data/Workers.class.php @@ -0,0 +1,13 @@ +shutdown(); + } + + #[@test] + public function pass_and_result_roundtrip() { + self::$worker->pass(1); + $this->assertEquals(3, self::$worker->result()); + } + + #[@test] + public function elements_are_processed_consecutively() { + self::$worker->pass(1); + $this->assertEquals(3, self::$worker->result()); + self::$worker->pass(2); + $this->assertEquals(6, self::$worker->result()); + } + + #[@test, @values([ + # [[1, 2]], + # [[1, 2, 3]], + # [[1, 2, 3, 4]] + #])] + public function elements_can_be_processed_in_batches($values) { + foreach ($values as $value) { + self::$worker->pass($value); + } + + $recv= []; + foreach ($values as $value) { + $recv[]= self::$worker->result() / 3; + } + + $this->assertEquals($values, $recv); + } +} \ No newline at end of file diff --git a/src/test/php/util/data/unittest/SequenceDistributionTest.class.php b/src/test/php/util/data/unittest/SequenceDistributionTest.class.php new file mode 100755 index 0000000..eb69b1f --- /dev/null +++ b/src/test/php/util/data/unittest/SequenceDistributionTest.class.php @@ -0,0 +1,61 @@ + []]; + $workers= new WorkerFunctions([ + function($e) use(&$processed) { $processed[0][]= $e; } + ]); + Sequence::of([1, 2, 3, 4, 5, 6, 7, 8])->distribute($workers)->count(); + $this->assertEquals([0 => [1, 2, 3, 4, 5, 6, 7, 8]], $processed); + } + + #[@test] + public function two_functions() { + $processed= [0 => [], 1 => []]; + $workers= new WorkerFunctions([ + function($e) use(&$processed) { $processed[0][]= $e; }, + function($e) use(&$processed) { $processed[1][]= $e; } + ]); + Sequence::of([1, 2, 3, 4, 5, 6, 7, 8])->distribute($workers)->count(); + $this->assertEquals([0 => [1, 3, 5, 7], 1 => [2, 4, 6, 8]], $processed); + } + + #[@test] + public function three_functions() { + $processed= [0 => [], 1 => [], 2 => []]; + $workers= new WorkerFunctions([ + function($e) use(&$processed) { $processed[0][]= $e; }, + function($e) use(&$processed) { $processed[1][]= $e; }, + function($e) use(&$processed) { $processed[2][]= $e; } + ]); + Sequence::of([1, 2, 3, 4, 5, 6, 7, 8])->distribute($workers)->count(); + $this->assertEquals([0 => [1, 4, 7], 1 => [2, 5, 8], 2 => [3, 6]], $processed); + } + + #[@test] + public function processes() { + $workers= [ + new LocalWorkerProcess('util.data.unittest.Worker', [3]), + new LocalWorkerProcess('util.data.unittest.Worker', [10]) + ]; + + $results= Sequence::of([1, 2, 3, 4, 5, 6, 7, 8])->distribute(new WorkerProcesses($workers))->toArray(); + + foreach ($workers as $worker) { + $worker->shutdown(); + } + + // The order in which results are returned cannot be guaranteed! + sort($results); + $this->assertEquals([3, 9, 15, 20, 21, 40, 60, 80], $results); + } +} \ No newline at end of file diff --git a/src/test/php/util/data/unittest/Worker.class.php b/src/test/php/util/data/unittest/Worker.class.php new file mode 100755 index 0000000..1de413e --- /dev/null +++ b/src/test/php/util/data/unittest/Worker.class.php @@ -0,0 +1,55 @@ +setProtocol(newinstance('peer.server.ServerProtocol', [$args ? (int)$args[0] : 2], '{ + public function __construct($factor) { + $this->factor= $factor; + } + + public function initialize() { } + + public function handleConnect($socket) { } + + public function handleDisconnect($socket) { } + + public function handleError($socket, $e) { } + + public function handleData($socket) { + $in= $socket->readLine(); + + if ("SHUTDOWN" === $in) { + $this->server->terminate= true; + } else if ($in) { + $socket->write(serialize(unserialize($in) * $this->factor)."\n"); + } else { + $socket->write("-ERR\n"); + } + } + }')); + + try { + $server->init(); + Console::writeLinef('+ %s:%d', $server->socket->host, $server->socket->port); + $server->service(); + Console::writeLine('+ Shutdown'); + return 0; + } catch (Throwable $e) { + Console::writeLine('- ', $e->getMessage()); + return 1; + } + } +} \ No newline at end of file