<?php
abstract class WebClientService {
// Magic Hash
const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
// This is the max size a header may be received from a client. Default: 128kb
const MAX_HEADER_SIZE = 0x4000;
// How long in seconds after not receiving any data to ping client
const PING_TIMER = 10;
// Number of seconds to close connection after not receiving a pong.
const PING_TIMEOUT = 20;
// How often it will release the blocking so it can pickup on zombie processes and such.
const BLOCKING_TIMER = 1;
public $closed = false;
public $protocol_version = 0;
public $ip;
public $port;
public $socket;
public $lastMsgTime;
public $lastPingSent = false;
public $lastPing;
public $needsHandShake = true;
public $master;
public $currentData = '';
/**
* When data is fully received from the WebSocket connection this function is executed. Extend this function to process the data.
* NOTE: you can access the data returned with the variable "$this->currentData"
*/
abstract public function dataReceived();
/**
* When data is received from the parent process it is processed in this function if it is not a type 'c' and 'i' ('c' means close process, 'i' means specific process relay)
* @param string $type 1 Character code telling what type of message it is, 'c' is reserved for close connection and 'i' is reserved for specific process relay
* @param string $data The data passed from the parent process to the child process.
*/
abstract public function parentDataRecv($type, $data);
public function __construct($socket, $master){
$this->master = $master;
$this->socket = $socket;
$this->lastMsgTime = time();
socket_getpeername($socket, $this->ip, $this->port);
Console::log("Successfully forked process!");
Console::log("WebClient: Connection accepted for: $this->ip:$this->port");
while(true){
while(($child_id = pcntl_wait($status, WNOHANG)) > 0) // Removes zombie children if they exist
Console::log("Removed zombie: $child_id");
if($this->closed)
return;
$connections = array($this->master, $this->socket);
socket_select($connections, $write = null, $except = null, static::BLOCKING_TIMER);
foreach($connections as $connection){
if($connection === $this->socket){
// Is from web socket
$this->recv();
}else{
// Is from parent process
Console::log("Receiving master data...");
if($l = socket_recv($connection, $data, 3, MSG_WAITALL)){
$len = (ord($data{0}) << 16) | (ord($data{1}) << 8) | (ord($data{2}));
if(!$len)
continue;
elseif(socket_recv($connection, $data, $len, MSG_WAITALL) == $len){
Console::log("Got master's data...");
$this->parentRecv($data);
}else{
// Not enough data
Console::log("WebSocket: Error not enough data received!");
$this->close();
return;
}
}else{
// Connection closed
Console::log("WebSocket: Parent process connection closed");
$this->close();
return;
}
}
}
$this->check();
}
}
public function parentRecv($data){
switch($data{0}){
case WebSocket::CHILD_PROCESS_RESPONSE_CLOSE:
Console::log("WebSocket: Got close command from parent process");
$this->close();
return;
default:
Console::log("Received parent data (user defined)");
$this->parentDataRecv($data{0}, substr($data, 1));
return;
}
}
public function close(){
if(!$this->closed){
Console::log("WebClient: Closing socket $this->socket");
@socket_close($this->socket);
@socket_close($this->master);
$this->closed = true;
}
}
public function recv(){
if($this->needsHandShake){
$lastChr = '';
while(true){
if(!socket_recv($this->socket, $buff, 1, MSG_WAITALL) || $buff === null){
Console::log("WebClient: Looks like client '$this->socket' disconnected");
$this->close();
return false;
}
if($buff === "\r")
continue;
elseif($buff === "\n" && $lastChr === "\n"){
$lastChr = $buff;
$this->currentData .= $buff;
break;
}
$lastChr = $buff;
$this->currentData .= $buff;
}
$headers = self::getheaders($this->currentData);
if(!isset($headers['Connection'])){
Console::log("WebClient: Looks like client '$this->socket' is not a websocket, closing");
$this->close();
return false;
}
if(isset($headers['Sec-WebSocket-Version'])){
$this->protocol_version = $headers['Sec-WebSocket-Version'];
Console::log("WebClient: Client is version {$headers['Sec-WebSocket-Version']}, proceeding to handshake...");
$responseH = "HTTP/1.1 101 Switching Protocols\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept: " . base64_encode(sha1($headers['Sec-WebSocket-Key'] . static::GUID, true)) . "\r\n\r\n";
}else{
if(isset($headers['Sec-WebSocket-Key1']) && isset($headers['Sec-WebSocket-Key2'])){
$this->protocol_version = 0;
Console::log("WebClient: Client is version 00, proceeding to handshake...");
if(!socket_recv($this->socket, $code, 8, MSG_WAITALL) || $code === null){
Console::log("WebClient: Looks like client '$this->socket' disconnected");
$this->close();
return false;
}
$key1 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key1'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key1'], $space)?implode('', $number[0]) / count($space[0]):'';
$key2 = preg_match_all('/[0-9]/', $headers['Sec-WebSocket-Key2'], $number) && preg_match_all('/ /', $headers['Sec-WebSocket-Key2'], $space)?implode('', $number[0]) / count($space[0]):'';
$hash = md5(pack('N', $key1).pack('N', $key2).$code, true);
$responseH = "HTTP/1.1 101 WebSocket Protocol Handshake\r\n".
"Upgrade: WebSocket\r\n".
"Connection: Upgrade\r\n".
"Sec-WebSocket-Origin: {$headers['Origin']}\r\n".
"Sec-WebSocket-Location: ws://{$headers['Host']}{$headers['uri']}\r\n".
"\r\n".
$hash;
}else{
Console::log("WebClient: Looks like client '$this->socket' has version: {$headers['Sec-WebSocket-Version']} which is not a supported websocket version, closing");
$this->close();
return false;
}
}
socket_write($this->socket, $responseH);
$this->needsHandShake = false;
$this->currentData = '';
}else{
if($this->protocol_version == 0){
$l = socket_recv($this->socket, $buff, 1, MSG_WAITALL);
$last_error = socket_last_error($this->socket);
if(!$l || !($last_error == 105 || $last_error == 0)){
socket_clear_error($this->socket);
Console::log("WebClient: Looks like client '$this->socket' disconnected");
$this->close();
return false;
}
if($buff !== "\x00"){
Console::log("WebClient: Client did not append a 0xFF byte to head of request");
$this->close();
return false;
}
$this->currentData = '';
while(true){
$l = socket_recv($this->socket, $buff, 1, MSG_WAITALL);
$last_error = socket_last_error($this->socket);
if(!$l || !($last_error == 105 || $last_error == 0)){
socket_clear_error($this->socket);
Console::log("WebClient: Looks like client '$this->socket' disconnected");
$this->close();
return false;
}
if($l){
$this->lastMsgTime = time();
$this->lastPingSent = null;
}
if($buff === "\xFF"){
if(!$this->currentData){
Console::log("WebClient: Received pong from: $this->socket");
$this->currentData = '';
return;
}
$this->dataReceived();
$this->currentData = '';
break;
}
$this->currentData .= $buff;
}
}elseif($this->protocol_version != 0){
$l = socket_recv($this->socket, $buff, 2048, MSG_DONTWAIT);
$last_error = socket_last_error($this->socket);
if(!$l || !($last_error == 105 || $last_error == 0)){
socket_clear_error($this->socket);
Console::log("WebClient: Looks like client '$this->socket' disconnected");
$this->close();
return false;
}
if($l){
$this->lastMsgTime = time();
$this->lastPingSent = null;
}
$this->currentData .= $buff;
// Not enough data to do anything with
if(strlen($this->currentData) < 2)
return;
static $step, $firstChr, $secondChr, $masked, $dataLen, $maskIndex, $dataIndex, $mask;
while(true){
switch($step){
// Step 0 checks data length and decides what length type to use
case 0:
$firstChr = $this->currentData{0};
$secondChr = $this->currentData{1};
$masked = (bool) ($secondChr & chr(0x80));
$dataLen = ord($secondChr & chr(0x7F));
if($dataLen >= 127){
$step = 1;
continue 2;
}elseif($dataLen >= 126){
$step = 2;
continue 2;
}else{
$step = 3;
$maskIndex = 2;
continue 2;
}
// If length is for 64bit type.
case 1:
if(strlen($this->currentData) < 10)
return;
$dataLen = (ord($this->currentData{2}) << 54) | (ord($this->currentData{3}) << 48) | (ord($this->currentData{4}) << 40) | (ord($this->currentData{5}) << 32) | (ord($this->currentData{6}) << 24) | (ord($this->currentData{7}) << 16) | (ord($this->currentData{8}) << 8) | ord($this->currentData{9});
$maskIndex = 10;
$step = 3;
continue 2;
// If length is 16bit type
case 2:
if(strlen($this->currentData) < 4)
return;
$dataLen = (ord($this->currentData{2}) << 8) | ord($this->currentData{3});
$maskIndex = 4;
$step = 3;
continue 2;
// Gets the mask if there is one
case 3:
// Masking
if($masked){
if(strlen($this->currentData) < $maskIndex + 4)
return;
$mask = substr($this->currentData, $maskIndex, 4);
$dataIndex = $maskIndex + 4;
}else{
$dataIndex = $maskIndex;
}
$step = 4;
continue 2;
// Retreives the data
case 4:
if($dataLen + $dataIndex > strlen($this->currentData))
return;
$firstChrInt = ord($firstChr);
$optCodes = $firstChrInt & 0xF;
if($optCodes == 0xA){
// Received Pong
//Console::log("WebClient: Received pong from: $this->socket");
$this->currentData = '';
$step = 0;
}elseif($optCodes == 0x9){
// Received Ping Request
socket_write($this->socket, chr(0x8A) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)));
Console::log("WebClient: Received ping request responding with pong from: $this->socket");
$this->currentData = '';
$step = 0;
}elseif($optCodes == 0x8){
// Received Close Request
Console::log("WebClient:: Received close request from: $this->socket");
$this->close();
return;
}else{
if($masked)
for($i=0;$i<$dataLen;$i++)
$this->currentData{$i + $dataIndex} = $this->currentData{$i + $dataIndex} ^ $mask{$i % 4};
$this->currentData = substr($this->currentData, $dataIndex);
$this->dataReceived();
$this->currentData = '';
$step = 0;
}
break 2;
default:
$step = 0;
continue 2;
}
}
}
}
}
public function ping(){
if($this->protocol_version != 0)
@socket_write($this->socket, chr(0x89) . chr(128) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)));
else
if(@socket_write($this->socket, "\x00\xFF") === false){
Console::log("Failed to send ping request");
$this->close();
return;
}
}
public function check(){
if(!$this->lastPingSent && $this->lastMsgTime + static::PING_TIMER < time()){
$this->lastPingSent = time();
//Console::log("WebSocket: Sending Ping to client on $this->socket");
$this->ping();
}elseif($this->lastPingSent && $this->lastPingSent + static::PING_TIMEOUT < time() && $this->lastMsgTime + static::PING_TIMEOUT < time()){
Console::log("WebSocket: Ping not received for ". static::PING_TIMEOUT . " seconds on $this->socket");
$this->close();
}
}
public function fork(){
$id = pcntl_fork();
if($id == -1){
return false;
}elseif($id)
// parent
return false;
else
// child
set_time_limit(30);
return true;
}
public function sendToParent($type, $data, array $pids = array()){
if($pids)
// Relay to all processes
$data = 'i'.implode(',', $pids).':'.$type.$data;
else
$data = $type.$data;
$i = 0;
$length = strlen($data);
if($length > 0xFFFFFF){
Console::log("Failed to send packet to master, 16MB limit hit");
return false;
}
for($i=0;$i<3;$i++)
$data = chr(($length >> ($i * 8)) & 0xFF).$data;
$length = strlen($data);
do{
if(($d = @socket_send($this->master, $data, strlen($data), 0)) === false){
Console::log("WebSocket: Failed to send data to: $this->master");
$this->close();
return false;
}
if(!$d)
usleep(25);
$i += $d;
}while($length > $i);
return true;
}
public function send($data, $encode = true){
// Fork over to new process to send the data to keep blocking from consumeing the entire service.
if(!$this->fork())
return;
$i = 0;
if($encode)
$data = $this->encodeData($data);
$length = strlen($data);
do{
if(($d = @socket_write($this->socket, $da = substr($data, $i), strlen($da))) === false){
Console::log("WebSocket: Failed to send data to: $this->socket");
$this->close();
break;
}
if(!$d)
usleep(25);
$i += $d;
}while($length > $i);
exit;
}
public static function getheaders($header){
preg_match("/GET (.+) HTTP\\/([0-9]+\\.[0-9]+)[\r\n]+/", $header, $match);
$return = array(
'uri' => $match[1],
'http-version' => $match[2]
);
preg_match_all("/([^:]+): ([^\n\r]*)[\r\n]+/", $header, $matches, PREG_SET_ORDER);
foreach($matches as $match)
$return[$match[1]] = $match[2];
return $return;
}
public function encodeData($data){
if($this->protocol_version != 0){
$mask = chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255)) . chr(mt_rand(1, 255));
$frame = chr(0x81);
$dataLength = strlen($data);
if($dataLength <= 125)
$frame .= chr($dataLength | 128);
elseif($dataLength < 0xFFFF){
$frame .= chr(254);
$frame .= chr($dataLength >> 8);
$frame .= chr($dataLength & 0xFF);
}else{
$frame .= chr(255);
$frame .= chr($dataLength >> 0x30 & 0xFF);
$frame .= chr($dataLength >> 0x28 & 0xFF);
$frame .= chr($dataLength >> 0x20 & 0xFF);
$frame .= chr($dataLength >> 0x18 & 0xFF);
$frame .= chr($dataLength >> 0x10 & 0xFF);
$frame .= chr($dataLength >> 0x8 & 0xFF);
$frame .= chr($dataLength & 0xFF);
}
$frame .= $mask;
for($i = 0; $i < strlen($data); $i++)
$frame .= $data[$i] ^ $mask{$i % 4};
return $frame;
}elseif($this->protocol_version == 0){
return "\x00$data\xFF";
}
}
}
|