<?php
/*
* This deamon monitors a directory for files, and will process each file in
* its own thread.
*
* Sample daemon using Benoit Perroud's MultiThreaded Daemon (MTD)
* See http://code.google.com/p/phpmultithreadeddaemon/
* and http://phpmultithreaddaemon.blogspot.com/ for more information
*
* Modifications by Daniel Kadosh, Affinegy Inc., June 2009
* Made many enhancements for robustness:
* - Truly daemonized by starting process group with posix_setsid()
* - Surrounded key items in try/catch for proper logging
* - Handling of SIGTERM/ SIGQUIT = no more children spawned, wait for all threads to die.
* - Handling of SIGHUP = call to loadConfig() method
* - Created PID file to ensure only 1 copy of the daemon is running
* - Full cleanup of semaphores & PID file on exit
*
*/
error_reporting(E_ALL);
require_once 'class.MTDaemon.php';
// Set number of threads for daemon
$nThreads = 2;
// Optional: Init logging class
$sLogfile = $argv[0].'.log'; // Set to null for logging to stdout
MTLog::getInstance($sLogfile)->setVerbosity(MTLog::INFO);
// Directory names
$sBaseDir = dirname(__FILE__);
$aDir['queue'] = "$sBaseDir/_queue"; // incoming files
$aDir['done'] = "$sBaseDir/_done"; // files parsed OK
$aDir['error'] = "$sBaseDir/_error"; // files not parsed OK
////////////////// For DEMO: populate _queue directory
$sQueuePath = $aDir['queue'];
if ( !file_exists($sQueuePath) ) mkdir($sQueuePath, 0775, true);
if ( !is_dir($sQueuePath) ) die('This should be a directory: '.$sQueuePath);
$sThisDir = dirname(__FILE__);
echo `cp $sThisDir/*.php $sQueuePath`;
////////////////// For DEMO: populate _queue directory
class MTFileParser extends MTDaemon {
/*
* Optional: read in config files or perform other "init" actions
* Called from MTDaemon's _prerun(), plus when a SIGHUP signal is received.
* NOTE: Runs under the PARENT process.
*/
public function loadConfig() {
// Already locked from within handle() loop, so don't lock/unlock.
// Could load $aDir settings from a config file, but just using the global's setting here
global $aDir;
foreach ( $aDir as $sPath ) {
if ( !file_exists($sPath) ) mkdir($sPath, 0775, true);
}
$this->setVar('aDir', $aDir);
global $argv;
MTLog::getInstance()->info($argv[0].': Monitoring Queue dir='.$aDir['queue']);
}
/*
* Function to return quickly with (a) no work to do, or (b) next file to process
* NOTE: Runs under the PARENT process
* Could keep open DB connection to affinegy_master, and figure out $build here
*/
public function getNext($slot) {
$sFileToProcess = null;
// Get shared arrays
$this->lock();
$aDir = $this->getVar('aDir');
$aFileList = $this->getVar('aFileList'); // FIFO queue
$aFilesInProcess = $this->getVar('aFilesInProcess');
if ( $aFilesInProcess==null ) $aFilesInProcess = array();
// Use ls to get files, only if the $aFileList queue is empty
if ( !$aFileList && is_dir($aDir['queue']) ) {
$sCmd = 'ls -1 '.$aDir['queue'];
exec($sCmd, $aFileList);
// Take out files already in process
if ( count($aFileList) && count($aFilesInProcess) ) {
$aFileList = array_diff($aFileList, $aFilesInProcess);
}
}
// Pull out a file to process from FIFO queue
if ( count($aFileList) ) {
$sFileToProcess = array_shift($aFileList);
array_push($aFilesInProcess, $sFileToProcess);
}
// Store shared arrays back
$this->setVar('aFileList', $aFileList);
$this->setVar('aFilesInProcess', $aFilesInProcess);
$this->unlock();
if ( !$sFileToProcess ) return null;
return $aDir['queue'].'/'.$sFileToProcess; // $sFileName in run() method below
}
/*
* Do main work here.
* NOTE: Runs under a CHILD process
*/
public function run($sFileName, $slot) {
$sMsg = 'slot='.$slot.' file='.basename($sFileName);
MTLog::getInstance()->info('## Start '.$sMsg);
try {
// Call parsing function that returns TRUE if OK, FALSE if not.
// Could instantiate an object of a file-parsing class and run it instead...
$bProcessedOK = ParseFile($sFileName);
} catch( Exception $e ) {
MTLog::getInstance()->error('ProcessFile error '.$sMsg.': '.$e->getMessage());
$bProcessedOK = false;
}
// Done, take file off "in-process" list
$this->lock();
$aFilesInProcess = $this->getVar('aFilesInProcess');
$nFileToRemove = array_search(basename($sFileName), $aFilesInProcess);
unset($aFilesInProcess[$nFileToRemove]);
$this->setVar('aFilesInProcess', $aFilesInProcess);
// Move file out of queue directory
$aDir = $this->getVar('aDir');
$sDestDir = ($bProcessedOK ? $aDir['done'] : $aDir['error']);
$sDestFile = $sDestDir.'/'.basename($sFileName);
rename($sFileName, $sDestFile);
$this->unlock();
MTLog::getInstance()->info('-- End '.$sMsg);
return 0;
}
}
// For demo purposes, function that actually does the work on a file
function ParseFile($sFileName) {
$sContent = file_get_contents($sFileName);
if ( !$sContent ) return false;
$nRand = rand(10, 20);
MTLog::getInstance()->info('ParseFile() read '.strlen($sContent).' bytes from file '.$sFileName.' |pausing '.$nRand.' seconds');
sleep($nRand);
return true;
}
// Run daemon, start threads
try {
$mttest = null;
$mttest = new MTFileParser($nThreads); // Init
$mttest->handle(); // Run threads
} catch( Exception $e ) {
if ( $mttest==null ) {
$sErr = $argv[0].': Daemon failed to start: '.$e->getMessage();
} else {
$sErr = $argv[0].': Daemon died: '.$e->getMessage();
}
MTLog::getInstance()->error($sErr);
die($sErr."\n");
}
?>
|