PHP Classes

File: test/Services/Async/AsyncPluginTest.php

Recommend this page to a friend!
  Classes of Artur Graniszewski   ZEUS for PHP   test/Services/Async/AsyncPluginTest.php   Download  
File: test/Services/Async/AsyncPluginTest.php
Role: Unit test script
Content type: text/plain
Description: Unit test script
Class: ZEUS for PHP
Manage the execution of multiple parallel tasks
Author: By
Last change: Fixed Async Controller Plugin not honoring join timeout settings (could hang indefinitely on stale socket connection)
Switched Async Controller Plugin to ZEUS networking layer
Date: 7 years ago
Size: 9,852 bytes
 

Contents

Class file image Download
<?php

namespace ZeusTest\Services\Async;

use
PHPUnit_Framework_TestCase;
use
Zend\ServiceManager\ServiceManager;
use
Zeus\Kernel\Networking\SocketServer;
use
Zeus\Kernel\Networking\SocketStream;
use
Zeus\ServerService\Async\AsyncPlugin;
use
Zeus\ServerService\Async\Config;
use
Zeus\ServerService\Async\Factory\AsyncPluginFactory;

class
AsyncPluginTest extends PHPUnit_Framework_TestCase
{
   
/** @var SocketServer */
   
protected $server;
    protected
$port;
    protected
$client;

    public function
setUp()
    {
       
$config = new Config();
       
$this->port = 9999;
       
$config->setListenPort($this->port);
       
$config->setListenAddress('0.0.0.0');
       
$this->server = new SocketServer($config);
       
$this->server->createServer();

       
$this->client = stream_socket_client('tcp://localhost:' . $this->port);
       
stream_set_blocking($this->client, false);
    }

    public function
tearDown()
    {
        if (
$this->server) {
           
$this->server->stop();
        }
       
fclose($this->client);
    }

   
/**
     * @param bool $asMock
     * @return AsyncPlugin
     */
   
protected function getPlugin($asMock)
    {
       
$factory = new AsyncPluginFactory();
       
$container = new ServiceManager();
       
$container->setService('configuration', [
           
'zeus_process_manager' => [
               
'services' => [
                   
'zeus_async' => [
                       
'auto_start' => false,
                       
'service_name' => 'zeus_async',
                       
'scheduler_name' => 'zeus_web_scheduler',
                       
'service_adapter' => \Zeus\ServerService\Async\Service::class,
                       
'service_settings' => [
                           
'listen_port' => $this->port,
                           
'listen_address' => '127.0.0.1',
                        ],
                    ]
                ]
            ]
        ]);

        if (!
$asMock) {
            return
$factory($container, AsyncPlugin::class, []);
        }

       
$config = $container->get('configuration');
       
$config = new Config($config['zeus_process_manager']['services']['zeus_async']['service_settings']);

       
$mockBuilder = $this->getMockBuilder(AsyncPlugin::class);
       
$mockBuilder->setConstructorArgs([$config]);
       
$mockBuilder->setMethods(['getSocket']);

        return
$mockBuilder->getMock();
    }

    public function
testPluginInstantiation()
    {
       
$this->getPlugin(false);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed, server response: "BAD_REQUEST"
     */
   
public function testErrorHandlingOnRun()
    {
       
fwrite($this->client, "BAD_REQUEST\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$plugin->run(function() { return "ok"; });
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed: async server is offline
     */
   
public function testErrorHandlingOnRunWhenOffline()
    {
       
$plugin = $this->getPlugin(false);
       
$this->server->stop();
       
$this->server = null;

       
$plugin->run(function() { return "ok"; });
    }

    public function
testSocketIsClosedOnError()
    {
       
fwrite($this->client, "BAD_REQUEST\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($stream = $this->server->listen(1));

        try {
           
$plugin->run(function () {
                return
"ok";
            });
        } catch (\
Exception $e) {
           
$this->assertFalse($stream->isReadable(), "Socket should be closed after error");
            return;
        }

       
$this->fail('No exception detected on error');
    }

    public function
testProcessingOnRun()
    {
       
fwrite($this->client, "PROCESSING\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$plugin->run(function() { return "ok"; });
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed, no response from server
     */
   
public function testOperationOnRealNotConnectedSocket()
    {
       
$this->server->listen(1);
       
$plugin = $this->getPlugin(false);

       
$plugin->run(function() { return "ok"; });
    }

    public function
testIsWorkingWhenNoDataOnStream()
    {
       
fwrite($this->client, "PROCESSING\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$isWorking = $plugin->isWorking($id);
       
$this->assertTrue($isWorking, 'Callback should be reported as working');
    }

    public function
testIsWorkingWhenDataOnStream()
    {
       
fwrite($this->client, "PROCESSING\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
fwrite($this->client, "SOME DATA\n");
       
$isWorking = $plugin->isWorking($id);
       
$this->assertFalse($isWorking, 'Callback should be reported as not working anymore');
    }

    public function
testResultOnJoin()
    {
       
$data = "OK! " . microtime(true);
       
$message = serialize($data);
       
$size = strlen($message);
       
fwrite($this->client, "PROCESSING\n$size:$message\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$result = $message = $plugin->join($id);
       
$this->assertEquals($data, $result);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Join timeout encountered
     */
   
public function testResultOnJoinTimeout()
    {
       
fwrite($this->client, "PROCESSING\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$this->assertGreaterThan(1, $plugin->getJoinTimeout());
       
$plugin->setJoinTimeout(1);
       
$this->assertEquals(1, $plugin->getJoinTimeout());
       
$id = $plugin->run(function() { return "ok"; });
       
$plugin->join($id);
    }

    public function
testResultOnArrayJoin()
    {
       
$data = "OK! " . microtime(true);
       
$message = serialize($data);
       
$size = strlen($message);
       
fwrite($this->client, "PROCESSING\n$size:$message\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$result = $message = $plugin->join([$id]);
       
$this->assertEquals([$data], $result);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed: request was corrupted
     */
   
public function testSerializationErrorOnJoin()
    {
       
fwrite($this->client, "PROCESSING\nCORRUPTED_REQUEST\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$plugin->join($id);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed: server connection lost
     */
   
public function testTimeoutOnJoin()
    {
       
fwrite($this->client, "PROCESSING\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($stream = $this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$stream->close();
       
$plugin->join($id);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed: response is corrupted
     */
   
public function testCorruptedResultOnJoin()
    {
       
fwrite($this->client, "PROCESSING\naaaa\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($stream = $this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$plugin->join($id);
    }

   
/**
     * @expectedException \RuntimeException
     * @expectedExceptionMessage Async call failed: response size is invalid
     */
   
public function testCorruptedResultSizeOnJoin()
    {
       
fwrite($this->client, "PROCESSING\naaaa12:aaa\n");
       
$plugin = $this->getPlugin(true);
       
$plugin
           
->expects($this->any())
            ->
method('getSocket')
            ->
willReturn($this->server->listen(1));

       
$id = $plugin->run(function() { return "ok"; });
       
$plugin->join($id);
    }
}