<?php
class Console{
function log($d){
$pid = getmypid();
$color = "\033[1;".(30 + ($pid & 7)).";".(40 + (($pid >> 3) & 7)).((($pid >> 6) & 1)?';4':'')."m";
printf("[%s]{$color}%-5s\033[0m: %s\n", date('m/d/Y h:i:s'), $pid, $d);
}
}
class WebSocket{
// How long in seconds to cleanup connections and garbage collect.
const CLEANUP_TIMER = 10;
const CHILD_PROCESS_RESPONSE_CLOSE = 'c';
const CHILD_PROCESS_RESPONSE_RELAY_TO = 'i';
public $link;
public $lastCleanup = 0;
public $children = array();
function __construct($address,$port){
global $child;
Console::log("Creating WebSocket");
($this->link = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) || die("socket_create() failed");
Console::log("Configuring WebSocket");
socket_set_option($this->link, SOL_SOCKET, SO_REUSEADDR, 1) || die("socket_option() failed");
Console::log("Binding WebSocket");
socket_bind($this->link, $address, $port) || die("socket_bind() failed");
Console::log("Listening on WebSocket");
socket_listen($this->link) || die("socket_listen() failed");
Console::log("Setting WebSocket to non-blocking");
Console::log("Server Started");
Console::log("Listening on: $address port $port");
Console::log("Master socket: $this->link");
$lastCleanup = time();
while(true){
while(($child_id = pcntl_wait($status, WNOHANG)) > 0){ // Removes zombie children if they exist
Console::log("Removed zombie: $child_id");
if(isset($this->children[$child_id])){
socket_close($this->children[$child_id]);
unset($this->children[$child_id]);
}
}
$connections = array_merge(array('master' => $this->link), $this->children);
socket_select($connections, $write = null, $except = null, static::CLEANUP_TIMER);
foreach($connections as $connection){
if($connection === $this->link){
// Is a connection request from the web browser
$client = @socket_accept($this->link);
if(!$client){
Console::log("Web Client connection attempted but failed");
continue;
}else{
socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $ary);
$pid = pcntl_fork();
if($pid == -1){
Console::log("Could not fork process");
}elseif($pid){
// Parent
socket_close($ary[0]);
$this->children[$pid] = $ary[1];
$child = false;
}else{
// Child
socket_close($ary[1]);
global $parent;
$parent = $ary[0];
$child = $client;
return;
}
unset($ary);
}
}else{
// Child communicating with parent
Console::log("Getting data from child");
if($l = socket_recv($connection, $len_data, 3, MSG_WAITALL)){
$len = (ord($len_data{0}) << 16) | (ord($len_data{1}) << 8) | (ord($len_data{2}));
Console::log("Got data of length: $l from child");
if(!$len)
continue;
elseif(socket_recv($connection, $data, $len, MSG_WAITALL) == $len){
Console::log("Got data from child!");
switch($data{0}){
case self::CHILD_PROCESS_RESPONSE_CLOSE:
Console::log("WebSocket: Received notice from child about death");
$this->closeChild(array_search($connection, $this->children, true));
break;
case self::CHILD_PROCESS_RESPONSE_RELAY_TO:
if(!$this->fork())
break;
$child = null;
$pids = substr($data, 1, $len = strpos($data, ':'));
$pids = explode(',', $pids);
if(!$pids){
Console::log("No Process IDS passed with packet, forgetting packet!");
break;
}
foreach($pids as &$pid)
$pid = (int) $pid;
unset($pid);
$data = substr($data, $len+1);
$len = strlen($data);
if($len > 0xFFFFFF){
Console::log("Could not send packet too large!");
exit;
}
for($i=0;$i<3;$i++)
$data = chr(($len >> ($i * 8)) & 0xFF).$data;
$len = strlen($data);
foreach($pids as $pid)
if(isset($this->children[$pid])){
$i=0;
do{
if(($d = @socket_send($this->children[$pid], $da = substr($data, $i), strlen($da), 0)) === false){
Console::log("WebSocket: Failed to send data to: {$this->children[$pid]}");
$this->closeChild($pid);
break;
}
if(!$d)
usleep(25);
$i += $d;
}while($len > $i);
}
unset($pid, $len, $data, $i, $pids, $d, $len_data, $l, $da);
global $forked;
$forked = true;
$child = true;
exit;
default:
if(!$this->fork())
break;
$child = null;
Console::log("Relaying to all children");
$data = $len_data . $data;
$len = strlen($data);
foreach($this->children as $k => $c){
$i=0;
do{
if(($d = @socket_send($c, $da = substr($data, $i), strlen($da), 0)) === false){
Console::log("WebSocket: Failed to send data to: $c");
$this->closeChild($k);
break;
}
if(!$d)
usleep(25);
$i += $d;
}while($len > $i);
}
unset($k, $c, $d, $i, $len, $len_data, $data, $l, $da);
global $forked, $child;
$forked = true;
$child = true;
exit;
}
}else{
Console::log("WebSocket: Not enough data received from child process");
$this->closeChild(array_search($connection, $this->children, true));
}
}else
$this->closeChild(array_search($connection, $this->children, true));
}
}
$data = '';
}
}
public function fork(){
$id = pcntl_fork();
if($id == -1){
return false;
}elseif($id)
// parent
return false;
else
// child
set_time_limit(30);
return true;
}
public function closeChild($process_id){
if(!$process_id)
return;
$connection = $this->children[$process_id];
@socket_send($connection, "\x00\x00\x01c", 4, 0);
@socket_close($connection);
unset($this->children[$process_id]);
}
public function __destruct(){
global $child;
if($child)
return;
foreach($this->children as $pid => $child)
$this->closeChild($pid);
@socket_close($this->link);
}
}
|