<?php
/**
* @author AlexanderC <self@alexanderc.me>
* @date 4/7/14
* @time 9:09 PM
*/
namespace Threadator;
use Threadator\Communication\Communication;
use Threadator\Communication\TRuntimeCommunication;
class Runtime
{
use TRuntimeCommunication;
/**
* @var array
*/
protected $pool = [];
/**
* @var int
*/
protected $pid;
/**
* @var Communication\Communication
*/
protected $communication;
/**
* @param string $communicationDriver
*/
public function __construct($communicationDriver)
{
$this->pid = posix_getpid();
$this->communication = Communication::create($communicationDriver, $this);
}
/**
* @return \Generator
*/
public function getIterator()
{
foreach($this->pool as $thread) {
yield $thread;
}
}
/**
* @param int $pid
* @return null|Thread
*/
public function findThreadByPid($pid)
{
foreach($this->pool as $thread) {
if($thread->getPid() === $pid) {
return $thread;
}
}
return null;
}
/**
* @return array
*/
public function getThreadsPid()
{
$threadsPid = [];
foreach($this->pool as $thread) {
$threadsPid[] = $thread->getPid();
}
return $threadsPid;
}
/**
* @return \Threadator\Communication\Communication\Communication
*/
public function getCommunication()
{
return $this->communication;
}
/**
* @param Thread $thread
* @return $this
*/
public function push(Thread $thread)
{
$this->pool[] = $thread;
return $this;
}
/**
* @param Thread $thread
* @param bool $safe
* @return $this
* @throws \OutOfBoundsException
*/
public function remove(Thread $thread, $safe = true)
{
if(false === ($key = array_search($thread, $this->pool, true))) {
throw new \OutOfBoundsException("No such thread found");
}
if(true === $safe) {
/** @var Thread $thread */
$thread = $this->pool[$key];
if(!$thread->isJoined()) {
$thread->join();
}
}
unset($this->pool[$key]);
return $this;
}
/**
* @param callable $callback
* @return $this
*/
public function map(callable $callback)
{
foreach($this->pool as $thread) {
call_user_func($callback, $thread);
}
return $this;
}
/**
* @param bool $safe
* @return null|Thread
*/
public function pop($safe = true)
{
/** @var Thread $thread */
$thread = array_pop($this->pool);
if(null !== $thread) {
if(!$thread->isJoined()) {
$thread->join();
}
return $thread;
}
return null;
}
/**
* @param bool $safe
* @return null|Thread
*/
public function shift($safe = true)
{
/** @var Thread $thread */
$thread = array_shift($this->pool);
if(null !== $thread) {
if(!$thread->isJoined()) {
$thread->join();
}
return $thread;
}
return null;
}
/**
* @return $this
*/
public function run()
{
/** @var Thread $thread */
foreach($this->pool as $thread) {
if($thread->isWaiting()) {
$thread->run();
}
}
return $this;
}
/**
* @return $this
*/
public function join()
{
while(null !== $this->shift(true));
return $this;
}
/**
* @return int
*/
public function getPid()
{
return $this->pid;
}
}
|