PHP Classes

File: include/class.MTDaemon.php

Recommend this page to a friend!
  Classes of Killerwhile   Multi Threaded Daemon   include/class.MTDaemon.php   Download  
File: include/class.MTDaemon.php
Role: Class source
Content type: text/plain
Description: Main core class
Class: Multi Threaded Daemon
Manage the execution of parallel threads
Author: By
Last change: moving into the directory include/
Date: 16 years ago
Size: 14,559 bytes
 

Contents

Class file image Download
<?php /** * MultiThreaded Daemon (MTD) * * Copyright (c) 2007, Benoit Perroud * * All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: Redistributions of source code must retain the * above copyright notice, this list of conditions and the following * disclaimer. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @package MTD * @author Benoit Perroud <ben@migtechnology.ch> * @copyright 2007 Benoit Perroud * @license http://www.gnu.org/copyleft/gpl.html * @version $Id: class.MTDaemon.php 8 2007-10-19 09:16:00Z killerwhile $ * * See http://code.google.com/p/phpmultithreadeddaemon/ * and http://phpmultithreaddaemon.blogspot.com/ for more information * */ require_once dirname(__FILE__) . '/class.MTLog.php'; abstract class MTDaemon { /* * Configuration vaiables */ // max concurrent threads protected $max_threads = 4; // should be implemented with sem_get('name', $max_aquire) but this can't be dynamically updated as this var. // sleep time when no job protected $idle_sleep_time = 5; /* * Internal constants */ const _INDEX_DATA = 0; // shared data goes here. const _INDEX_THREADS = 1; // how many threads are working const _INDEX_SLOTS = 2; // which slot do the current thread use ? /* * Internal variables */ protected $shm; // = ftok(__FILE__, 'g'); protected $shared_data; // = shm_attach($this->shm); protected $mutex; // lock critical path, used in lock() and unlock() protected $mutex_main_process; // lock main process only. Children can continue to run protected $mutex_children_processes; // lock children processes only. Main can continue to run protected $main_thread_pid; /* * Constructor * * @params $threads : number of concurrent threads, default 4 * @params $idelsleeptime : time to sleep when no job ready (getNext return null), in seconds, default 5 */ public function __construct($threads = null, $idlesleeptime = null) { if ($threads) $this->max_threads = $threads; if ($idlesleeptime) $this->idle_sleep_time = $idlesleeptime; $this->main_thread_pid = posix_getpid(); } /* * Hook called just before the main loop. * * Remark : cleanup code goes here. */ protected function _prerun() { MTLog::getInstance()->info('Starting daemon with ' . $this->max_threads . ' slots'); $this->shm = ftok(__FILE__, 'g'); // global shm $this->shared_data = shm_attach($this->shm); $this->mutex = sem_get($this->shm); $this->mutex_main_process = sem_get(ftok(__FILE__, 'm')); $this->mutex_children_processes = sem_get(ftok(__FILE__, 'c')); shm_put_var($this->shared_data, self::_INDEX_DATA, array()); $this->setThreads(0); $slots = array(); for ($i = 0; $i < $this->max_threads; $i++) { $slots[] = false; } shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots); } /* * Hook called just after the main loop */ protected function _postrun() { MTLog::getInstance()->info('Stopping daemon. '); shm_remove($this->shared_data); sem_remove($this->mutex); sem_remove($this->mutex_process); } /* * Main loop, request next job using getNext() and execute run($job) in a separate thread * _prerun and _postrun hooks are called before and after the main loop -> usefull for cleanup and so on. */ public function handle() { $this->run = true; $this->_prerun(); while ($this->run) { /* * Terminating all child, to not let some zombie leaking the memory. */ MTLog::getInstance()->debug2('-- Next iteration '); $this->lock(); // HACK : avoid zombie and free earlier the memory do { $res = pcntl_wait($status, WNOHANG); MTLog::getInstance()->debug2('$res = pcntl_wait($status, WNOHANG); called with $res = ' . $res); if ($res > 0) MTLog::getInstance()->debug('(finishing child with pid ' . $res . ')'); } while ($res > 0); /* * Loop until a slot frees */ while (!$this->hasFreeSlot()) { $this->unlock(); MTLog::getInstance()->debug('No more free slot, waiting'); $res = pcntl_wait($status); // wait until a child ends up MTLog::getInstance()->debug2('$res = pcntl_wait($status); called with $res = ' . $res); if ($res > 0) { MTLog::getInstance()->debug('Finishing child with pid ' . $res); } else { MTLog::getInstance()->error('Outch1, this souldn\'t happen. Verify your implementation ...'); $this->run = false; continue; } $this->lock(); } $slot = $this->requestSlot(); $this->incThreads(); $this->unlock(); if ($slot === null) { var_dump(shm_get_var($this->shared_data, self::_INDEX_DATA)); var_dump(shm_get_var($this->shared_data, self::_INDEX_THREADS)); var_dump(shm_get_var($this->shared_data, self::_INDEX_SLOTS)); MTLog::getInstance()->error('Outch2, this souldn\'t happen. Verify your implementation ...'); $this->run = false; continue; } /* * Request next action to handle */ $next = $this->getNext($slot); /* * If no job */ if (!$next) { MTLog::getInstance()->debug('No job, sleeping at most ' . $this->idle_sleep_time . ' sec ... '); // TODO : waiting for signal pushed into a queue when inserting a new job. $this->lock(); $this->releaseSlot($slot); $this->decThreads(); $this->unlock(); sleep($this->idle_sleep_time); continue; } else { $pid = pcntl_fork(); if ($pid == -1) { MTLog::getInstance()->error('[fork] Duplication impossible'); $this->run = false; continue; } else if ($pid) { usleep(10); // HACK : give the hand to the child -> a simple way to better handle zombies continue; } else { MTLog::getInstance()->debug('Executing thread #' . posix_getpid() . ' in slot ' . number_format($slot)); $res = $this->run($next, $slot); $this->lock(); $this->releaseSlot($slot); $this->decThreads(); $this->unlock(); exit($res); } } } $this->_postrun(); } /* * Request data of the next element to run in a thread * * return null or false if no job currently */ abstract public function getNext($slot); /* * Process the element fetched by getNext in a new thread * * return the exiting status of the thread */ abstract public function run($next, $slot); /* * */ protected function lock() { MTLog::getInstance()->debug2('[lock] lock'); $res = sem_acquire($this->mutex); if (!$res) exit(-1); } /* * */ protected function unlock() { MTLog::getInstance()->debug2('[lock] unlock'); $res = sem_release($this->mutex); if (!$res) exit(-1); } /* * */ protected function lockMain() { MTLog::getInstance()->debug2('[lock] lock main process'); $res = sem_acquire($this->mutex_main_process); if (!$res) exit(-1); } /* * */ protected function unlockMain() { MTLog::getInstance()->debug2('[lock] unlock main process'); $res = sem_release($this->mutex_main_process); if (!$res) exit(-1); } /* * */ protected function lockChildren() { MTLog::getInstance()->debug2('[lock] lock children processes'); $res = sem_acquire($this->mutex_children_processes); if (!$res) exit(-1); } /* * */ protected function unlockChildren() { MTLog::getInstance()->debug2('[lock] unlock children processes'); $res = sem_release($this->mutex_children_processes); if (!$res) exit(-1); } /* * Get a shared var based on hash. * * Return null if the var doesn't exist. */ protected function getVar($name, $lock = false) { if ($lock) $this->lock(); $vars = shm_get_var($this->shared_data, self::_INDEX_DATA); $value = (isset($vars[$name])) ? $vars[$name] : null; if ($lock) $this->unlock(); return $value; } /* * Set a shared var. * * Remark : the var should be serialized. */ protected function setVar($name, $value, $lock = false) { if ($lock) $this->lock(); $vars = shm_get_var($this->shared_data, self::_INDEX_DATA); $vars[$name] = $value; $res = shm_put_var($this->shared_data, self::_INDEX_DATA, $vars); if ($lock) $this->unlock(); return $res; } /* * Get the number of running threads */ protected function getThreads($lock = false) { if ($lock) $this->lock(); $res = shm_get_var($this->shared_data, self::_INDEX_THREADS); if ($lock) $this->unlock(); return $res; } /* * Set the number of running threads */ protected function setThreads($threads, $lock = false) { if ($lock) $this->lock(); $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads); if ($lock) $this->unlock(); return $res; } /* * Increment the number of running threads */ protected function incThreads($lock = false) { if ($lock) $this->lock(); $threads = $this->getThreads(); $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads + 1); MTLog::getInstance()->debug('incThreads, $threads = ' . ($threads + 1)); if ($lock) $this->unlock(); return $res; } /* * Decrement the number of running threads */ protected function decThreads($lock = false) { if ($lock) $this->lock(); $threads = $this->getThreads(); $res = shm_put_var($this->shared_data, self::_INDEX_THREADS, $threads - 1); MTLog::getInstance()->debug('decThreads, $threads = ' . ($threads - 1)); if ($lock) $this->unlock(); return $res; } /* * Return true if any slot is free */ protected function hasFreeSlot() { $threads = $this->getThreads(); $res = ($threads < $this->max_threads) ? true : false; MTLog::getInstance()->debug('Has free slot ? => #running threads = ' . $threads); return $res; } /* * Assign a free slot * * Return null if no free slot is available */ protected function requestSlot($lock = false) { MTLog::getInstance()->debug('Requesting slot ... '); $slot = null; if ($lock) $this->lock(); $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS); for ($i = 0; $i < $this->max_threads; $i++) { if (!isset($slots[$i])) { $slots[$i] = true; $slot = $i; break; } else { if ($slots[$i] == false) { $slots[$i] = true; $slot = $i; break; } } } shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots); if ($lock) $this->unlock(); if (is_null($slot)) { MTLog::getInstance()->debug('no free slots !!'); } else { MTLog::getInstance()->debug('slot ' . $slot . ' found.'); } return $slot; } /* * Release given slot */ protected function releaseSlot($slot, $lock = false) { if ($lock) $this->lock(); $slots = shm_get_var($this->shared_data, self::_INDEX_SLOTS); $slots[$slot] = false; shm_put_var($this->shared_data, self::_INDEX_SLOTS, $slots); if ($lock) $this->unlock(); MTLog::getInstance()->debug('Releasing slot ' . $slot); return true; } }