<?php /* * This deamon monitors a directory for files, and will process each file in * its own thread. * * Sample daemon using Benoit Perroud's MultiThreaded Daemon (MTD) * See http://code.google.com/p/phpmultithreadeddaemon/ * and http://phpmultithreaddaemon.blogspot.com/ for more information * * Modifications by Daniel Kadosh, Affinegy Inc., June 2009 * Made many enhancements for robustness: * - Truly daemonized by starting process group with posix_setsid() * - Surrounded key items in try/catch for proper logging * - Handling of SIGTERM/ SIGQUIT = no more children spawned, wait for all threads to die. * - Handling of SIGHUP = call to loadConfig() method * - Created PID file to ensure only 1 copy of the daemon is running * - Full cleanup of semaphores & PID file on exit * */ error_reporting(E_ALL); require_once 'class.MTDaemon.php';
// Set number of threads for daemon $nThreads = 2;
// Optional: Init logging class $sLogfile = $argv[0].'.log'; // Set to null for logging to stdout MTLog::getInstance($sLogfile)->setVerbosity(MTLog::INFO);
// Directory names $sBaseDir = dirname(__FILE__); $aDir['queue'] = "$sBaseDir/_queue"; // incoming files $aDir['done'] = "$sBaseDir/_done"; // files parsed OK $aDir['error'] = "$sBaseDir/_error"; // files not parsed OK
////////////////// For DEMO: populate _queue directory $sQueuePath = $aDir['queue']; if ( !file_exists($sQueuePath) ) mkdir($sQueuePath, 0775, true); if ( !is_dir($sQueuePath) ) die('This should be a directory: '.$sQueuePath); $sThisDir = dirname(__FILE__); echo `cp $sThisDir/*.php $sQueuePath`; ////////////////// For DEMO: populate _queue directory
class MTFileParser extends MTDaemon { /* * Optional: read in config files or perform other "init" actions * Called from MTDaemon's _prerun(), plus when a SIGHUP signal is received. * NOTE: Runs under the PARENT process. */ public function loadConfig() { // Already locked from within handle() loop, so don't lock/unlock.
// Could load $aDir settings from a config file, but just using the global's setting here global $aDir; foreach ( $aDir as $sPath ) { if ( !file_exists($sPath) ) mkdir($sPath, 0775, true); } $this->setVar('aDir', $aDir);
global $argv; MTLog::getInstance()->info($argv[0].': Monitoring Queue dir='.$aDir['queue']); }
/* * Function to return quickly with (a) no work to do, or (b) next file to process * NOTE: Runs under the PARENT process * Could keep open DB connection to affinegy_master, and figure out $build here */ public function getNext($slot) { $sFileToProcess = null;
// Get shared arrays $this->lock(); $aDir = $this->getVar('aDir'); $aFileList = $this->getVar('aFileList'); // FIFO queue $aFilesInProcess = $this->getVar('aFilesInProcess'); if ( $aFilesInProcess==null ) $aFilesInProcess = array();
// Use ls to get files, only if the $aFileList queue is empty if ( !$aFileList && is_dir($aDir['queue']) ) { $sCmd = 'ls -1 '.$aDir['queue']; exec($sCmd, $aFileList); // Take out files already in process if ( count($aFileList) && count($aFilesInProcess) ) { $aFileList = array_diff($aFileList, $aFilesInProcess); } }
// Pull out a file to process from FIFO queue if ( count($aFileList) ) { $sFileToProcess = array_shift($aFileList); array_push($aFilesInProcess, $sFileToProcess); }
// Store shared arrays back $this->setVar('aFileList', $aFileList); $this->setVar('aFilesInProcess', $aFilesInProcess); $this->unlock(); if ( !$sFileToProcess ) return null; return $aDir['queue'].'/'.$sFileToProcess; // $sFileName in run() method below }
/* * Do main work here. * NOTE: Runs under a CHILD process */ public function run($sFileName, $slot) { $sMsg = 'slot='.$slot.' file='.basename($sFileName); MTLog::getInstance()->info('## Start '.$sMsg); try { // Call parsing function that returns TRUE if OK, FALSE if not. // Could instantiate an object of a file-parsing class and run it instead... $bProcessedOK = ParseFile($sFileName); } catch( Exception $e ) { MTLog::getInstance()->error('ProcessFile error '.$sMsg.': '.$e->getMessage()); $bProcessedOK = false; }
// Done, take file off "in-process" list $this->lock(); $aFilesInProcess = $this->getVar('aFilesInProcess'); $nFileToRemove = array_search(basename($sFileName), $aFilesInProcess); unset($aFilesInProcess[$nFileToRemove]); $this->setVar('aFilesInProcess', $aFilesInProcess);
// Move file out of queue directory $aDir = $this->getVar('aDir'); $sDestDir = ($bProcessedOK ? $aDir['done'] : $aDir['error']); $sDestFile = $sDestDir.'/'.basename($sFileName); rename($sFileName, $sDestFile); $this->unlock();
MTLog::getInstance()->info('-- End '.$sMsg); return 0; } }
// For demo purposes, function that actually does the work on a file function ParseFile($sFileName) { $sContent = file_get_contents($sFileName); if ( !$sContent ) return false;
$nRand = rand(10, 20); MTLog::getInstance()->info('ParseFile() read '.strlen($sContent).' bytes from file '.$sFileName.' |pausing '.$nRand.' seconds'); sleep($nRand); return true; }
// Run daemon, start threads try { $mttest = null; $mttest = new MTFileParser($nThreads); // Init $mttest->handle(); // Run threads } catch( Exception $e ) { if ( $mttest==null ) { $sErr = $argv[0].': Daemon failed to start: '.$e->getMessage(); } else { $sErr = $argv[0].': Daemon died: '.$e->getMessage(); } MTLog::getInstance()->error($sErr); die($sErr."\n"); }
?>
|