<?php
namespace ZeusTest;
use PHPUnit_Framework_TestCase;
use Zend\EventManager\EventManager;
use Zeus\Kernel\ProcessManager\Process;
use Zeus\Kernel\ProcessManager\SchedulerEvent;
use Zeus\ServerService\Async\Config;
use Zeus\ServerService\Shared\Networking\SocketEventSubscriber;
use Zeus\Kernel\Networking\SocketServer;
use ZeusTest\Helpers\SocketTestMessage;
use ZeusTest\Helpers\ZeusFactories;
class SocketEventSubscriberTest extends PHPUnit_Framework_TestCase
{
use ZeusFactories;
/** @var SocketServer */
protected $server;
protected $port;
public function setUp()
{
$config = new Config();
$this->port = 7777;
$config->setListenPort($this->port);
$config->setListenAddress('0.0.0.0');
$this->server = new SocketServer($config);
}
public function tearDown()
{
$this->server->stop();
}
public function testSubscriberRequestHandling()
{
$events = new EventManager();
$event = new SchedulerEvent();
$event->setScheduler($this->getScheduler(0));
$process = new Process($event);
$process->setConfig(new \Zeus\Kernel\ProcessManager\Config([]));
$event->setProcess($process);
$process->attach($events);
$received = null;
$steps = 0;
$message = new SocketTestMessage(function($connection, $data) use (&$received, &$steps) {
$received = $data;
$steps ++;
}, function($connection) use (& $heartBeats) {
$heartBeats++;
if ($heartBeats == 2) {
$connection->close();
}
});
$eventSubscriber = new SocketEventSubscriber($this->server, $message);
$eventSubscriber->attach($events);
$events->attach(SchedulerEvent::EVENT_SCHEDULER_START, function(SchedulerEvent $event) use (& $schedulerStarted) {
$event->stopPropagation(true);
}, SchedulerEvent::PRIORITY_FINALIZE + 1);
$events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) use (& $schedulerStarted) {
$event->stopPropagation(true);
}, SchedulerEvent::PRIORITY_FINALIZE + 1);
$event->setName(SchedulerEvent::EVENT_SCHEDULER_START);
$events->triggerEvent($event);
$event->setName(SchedulerEvent::EVENT_PROCESS_INIT);
$events->triggerEvent($event);
$client = stream_socket_client('tcp://localhost:' . $this->port);
stream_set_blocking($client, false);
$requestString = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
fwrite($client, $requestString);
$event->setName(SchedulerEvent::EVENT_PROCESS_LOOP);
$events->triggerEvent($event);
$events->triggerEvent($event);
fclose($client);
$event->setName(SchedulerEvent::EVENT_PROCESS_EXIT);
$events->triggerEvent($event);
$this->assertEquals($requestString, $received);
$this->assertEquals(1, $steps, "Message should be fetched twice");
$this->assertEquals(2, $heartBeats, "Heartbeat should be called twice");
}
public function testSubscriberErrorHandling()
{
$events = new EventManager();
$event = new SchedulerEvent();
$event->setScheduler($this->getScheduler(0));
$process = new Process($event);
$process->setConfig(new \Zeus\Kernel\ProcessManager\Config([]));
$event->setProcess($process);
$process->attach($events);
$received = null;
$message = new SocketTestMessage(function($connection, $data) use (&$received) {
throw new \RuntimeException("TEST");
});
$eventSubscriber = new SocketEventSubscriber($this->server, $message);
$eventSubscriber->attach($events);
$events->attach(SchedulerEvent::EVENT_SCHEDULER_START, function(SchedulerEvent $event) use (& $schedulerStarted) {
$event->stopPropagation(true);
}, SchedulerEvent::PRIORITY_FINALIZE + 1);
$events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) use (& $schedulerStarted) {
$event->stopPropagation(true);
}, SchedulerEvent::PRIORITY_FINALIZE + 1);
$event->setName(SchedulerEvent::EVENT_SCHEDULER_START);
$events->triggerEvent($event);
$event->setName(SchedulerEvent::EVENT_PROCESS_INIT);
$events->triggerEvent($event);
$client = stream_socket_client('tcp://localhost:' . $this->port);
stream_set_blocking($client, false);
$requestString = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
fwrite($client, $requestString);
$event->setName(SchedulerEvent::EVENT_PROCESS_LOOP);
$exception = null;
try {
$events->triggerEvent($event);
} catch (\RuntimeException $exception) {
}
$this->assertTrue(is_object($exception), 'Exception should be raised');
$this->assertInstanceOf(\RuntimeException::class, $exception, 'Correct exception should be raised');
$this->assertEquals("TEST", $exception->getMessage(), 'Correct exception should be raised');
$read = @stream_get_contents($client);
$eof = feof($client);
$this->assertEquals("", $read, 'Stream should not contain any message');
$this->assertEquals(true, $eof, 'Client stream should not be readable when disconnected');
fclose($client);
}
}
|