Login   Register  
PHP Classes
elePHPant
Icontem

File: QueueDirParser.php

Recommend this page to a friend!
Stumble It! Stumble It! Bookmark in del.icio.us Bookmark in del.icio.us
  Classes of Daniel Kadosh  >  Multi Threaded Daemon - enhanced  >  QueueDirParser.php  >  Download  
File: QueueDirParser.php
Role: Example script
Content type: text/plain
Description: Sample app - Monitor queue dir and parse files in it
Class: Multi Threaded Daemon - enhanced
Daemon that spawns multiple parallel threads
Author: By
Last change:
Date: 2009-06-11 06:38
Size: 5,543 bytes
 

Contents

Class file image Download
<?php
/*
 * This deamon monitors a directory for files, and will process each file in
 * its own thread.
 *
 * Sample daemon using Benoit Perroud's MultiThreaded Daemon (MTD)
 *     See http://code.google.com/p/phpmultithreadeddaemon/ 
 *    and http://phpmultithreaddaemon.blogspot.com/ for more information
 *
 * Modifications by Daniel Kadosh, Affinegy Inc., June 2009
 * Made many enhancements for robustness:
 * - Truly daemonized by starting process group with posix_setsid()
 * - Surrounded key items in try/catch for proper logging
 * - Handling of SIGTERM/ SIGQUIT = no more children spawned, wait for all threads to die.
 * - Handling of SIGHUP = call to loadConfig() method
 * - Created PID file to ensure only 1 copy of the daemon is running
 * - Full cleanup of semaphores & PID file on exit
 *
 */
error_reporting(E_ALL);
require_once 
'class.MTDaemon.php';

// Set number of threads for daemon
$nThreads 2;

// Optional: Init logging class
$sLogfile $argv[0].'.log'// Set to null for logging to stdout
MTLog::getInstance($sLogfile)->setVerbosity(MTLog::INFO);

// Directory names
$sBaseDir dirname(__FILE__);
$aDir['queue'] = "$sBaseDir/_queue"// incoming files
$aDir['done'] = "$sBaseDir/_done";   // files parsed OK
$aDir['error'] = "$sBaseDir/_error"// files not parsed OK

////////////////// For DEMO: populate _queue directory
$sQueuePath $aDir['queue'];
if ( !
file_exists($sQueuePath) ) mkdir($sQueuePath0775true);
if ( !
is_dir($sQueuePath) ) die('This should be a directory: '.$sQueuePath);
$sThisDir dirname(__FILE__);
echo `
cp $sThisDir/*.php $sQueuePath`;
////////////////// For DEMO: populate _queue directory

class MTFileParser extends MTDaemon {
    
/*
     * Optional: read in config files or perform other "init" actions
     * Called from MTDaemon's _prerun(), plus when a SIGHUP signal is received.
     * NOTE: Runs under the PARENT process.
     */
    
public function loadConfig() {
        
// Already locked from within handle() loop, so don't lock/unlock.

        // Could load $aDir settings from a config file, but just using the global's setting here
        
global $aDir;
        foreach ( 
$aDir as $sPath ) {
            if ( !
file_exists($sPath) ) mkdir($sPath0775true);
        }
        
$this->setVar('aDir'$aDir);

        global 
$argv;
        
MTLog::getInstance()->info($argv[0].': Monitoring Queue dir='.$aDir['queue']);
    }

    
/* 
     * Function to return quickly with (a) no work to do, or (b) next file to process
     * NOTE: Runs under the PARENT process
     * Could keep open DB connection to affinegy_master, and figure out $build here
     */
    
public function getNext($slot) {
        
$sFileToProcess null;

        
// Get shared arrays
        
$this->lock();
        
$aDir $this->getVar('aDir');
        
$aFileList $this->getVar('aFileList'); // FIFO queue
        
$aFilesInProcess $this->getVar('aFilesInProcess');
        if ( 
$aFilesInProcess==null $aFilesInProcess = array();

        
// Use ls to get files, only if the $aFileList queue is empty
        
if ( !$aFileList && is_dir($aDir['queue']) ) {
            
$sCmd 'ls -1 '.$aDir['queue'];
            
exec($sCmd$aFileList);
            
// Take out files already in process
            
if ( count($aFileList) && count($aFilesInProcess) ) {
                
$aFileList array_diff($aFileList$aFilesInProcess);
            }
        }

        
// Pull out a file to process from FIFO queue
        
if ( count($aFileList) ) {
            
$sFileToProcess array_shift($aFileList);
            
array_push($aFilesInProcess$sFileToProcess);
        }

        
// Store shared arrays back
        
$this->setVar('aFileList'$aFileList);
        
$this->setVar('aFilesInProcess'$aFilesInProcess);
        
$this->unlock();
        if ( !
$sFileToProcess ) return null;
        return 
$aDir['queue'].'/'.$sFileToProcess// $sFileName in run() method below
    
}

    
/* 
     * Do main work here.
     * NOTE: Runs under a CHILD process
     */
    
public function run($sFileName$slot) {
        
$sMsg 'slot='.$slot.' file='.basename($sFileName);
        
MTLog::getInstance()->info('## Start '.$sMsg);
        try {
            
// Call parsing function that returns TRUE if OK, FALSE if not.
            // Could instantiate an object of a file-parsing class and run it instead...
            
$bProcessedOK ParseFile($sFileName);
        } catch( 
Exception $e ) {
            
MTLog::getInstance()->error('ProcessFile error '.$sMsg.': '.$e->getMessage());
            
$bProcessedOK false;
        }

        
// Done, take file off "in-process" list
        
$this->lock();
        
$aFilesInProcess $this->getVar('aFilesInProcess');
        
$nFileToRemove array_search(basename($sFileName), $aFilesInProcess);
        unset(
$aFilesInProcess[$nFileToRemove]);
        
$this->setVar('aFilesInProcess'$aFilesInProcess);

        
// Move file out of queue directory
        
$aDir $this->getVar('aDir');
        
$sDestDir = ($bProcessedOK $aDir['done'] : $aDir['error']);
        
$sDestFile $sDestDir.'/'.basename($sFileName);
        
rename($sFileName$sDestFile);
        
$this->unlock();

        
MTLog::getInstance()->info('-- End '.$sMsg);
        return 
0;
    }
}

// For demo purposes, function that actually does the work on a file
function ParseFile($sFileName) {
    
$sContent file_get_contents($sFileName);
    if ( !
$sContent ) return false;

    
$nRand rand(1020);
    
MTLog::getInstance()->info('ParseFile() read '.strlen($sContent).' bytes from file '.$sFileName.' |pausing '.$nRand.' seconds');
    
sleep($nRand);
    return 
true;
}

// Run daemon, start threads
try {
    
$mttest null;
    
$mttest = new MTFileParser($nThreads); // Init
    
$mttest->handle();                     // Run threads
} catch( Exception $e ) {
    if ( 
$mttest==null ) {
        
$sErr $argv[0].': Daemon failed to start: '.$e->getMessage();
    } else {
        
$sErr $argv[0].': Daemon died: '.$e->getMessage();
    }
    
MTLog::getInstance()->error($sErr);
    die(
$sErr."\n");
}

?>