diff -r b7f1952cef8d -r d643bfb862d8 multithreading.php --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/multithreading.php Tue Sep 23 23:24:13 2008 -0400 @@ -0,0 +1,336 @@ +event_sigchld(); + } + } +} + +/** + * Global signal handler for SIGUSR2. + */ + +function Threader_SigUsr2() +{ + global $threader_instances; + foreach ( $threader_instances as &$mt ) + { + if ( is_object($mt) ) + { + $parchild = $mt->is_child() ? 'child' : 'parent'; + $mt->event_sigusr2(); + } + } +} + +/** + * List of Threader instances. Needed for global handling of signals. + * @var array + */ + +global $threader_instances; +$threader_instances = array(); + +/** + * Tools for emulating multi-threaded operation in PHP scripts. + * @package Amarok + * @subpackage WebControl + * @author Dan Fuhry + * @license GNU General Public License + */ + +class Threader +{ + + /** + * Return value of fork() if the process is a child. + * @const int + */ + + const FORK_CHILD = -1; + + /** + * Set to true if this is a child process. No exceptions. + * @var bool + * @access private + */ + + private $is_child = false; + + /** + * Sockets for inter-process communication. + * @var array + * @access private + */ + + protected $ipc_sockets = array(); + + /** + * Socket for communication with the parent. Obviously only used after calling fork(). + * @var resource + * @access private + */ + + protected $parent_sock = false; + + /** + * Services_JSON instance. + * @var object + * @access private + */ + + protected $json = false; + + /** + * PID of the parent process. + * @var int + * @access private + */ + + protected $parent_pid = 1; + + /** + * List of actions for IPC events. + * @var array + * @access private + */ + + protected $ipc_actions = array(); + + /** + * Constructor. Sets up signal handlers. Nothing to see here, move along. + */ + + public function __construct() + { + global $threader_instances; + + declare(ticks=1); + + $threader_instances[] =& $this; + + pcntl_signal(SIGUSR2, 'Threader_SigUsr2'); + pcntl_signal(SIGCHLD, 'Threader_SigChld'); + + $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE); + $this->parent_pid = getmypid(); + } + + /** + * Forks the current process. See your system's fork(2) man page for details. + * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure. + */ + + public function fork() + { + // create our new sockets for IPC + $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + // fork (emoticon of the day: --ipc_sockets[$fork_result] = $socket_pair[1]; + return $fork_result; + } + else + { + // we are the child. + fclose($socket_pair[1]); + $this->parent_sock = $socket_pair[0]; + $this->is_child = true; + return FORK_CHILD; + } + } + + /** + * Are we a child? + * @return bool + */ + + public function is_child() + { + return $this->is_child; + } + + /** + * Register an action so that when it is fired over IPC, a custom function can be called. + * @param string Action + * @param callback Function to call + * @return true on success, false on failure + */ + + function ipc_register($action, $callback) + { + if ( !is_string($action) || empty($action) || !is_callable($callback) ) + { + return false; + } + $this->ipc_actions[$action] = $callback; + return true; + } + + /** + * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. + * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children. + * @return null + */ + + function ipc_send($data) + { + if ( !isset($data['action']) ) + { + return false; + } + $data = $this->json->encode($data); + if ( $this->is_child() ) + { + fwrite($this->parent_sock, "$data\n"); + // signal the parent that we've got an event + posix_kill($this->parent_pid, SIGUSR2); + } + else + { + // signal each child + foreach ( $this->ipc_sockets as $pid => $socket ) + { + fwrite($socket, "$data\n"); + posix_kill($pid, SIGUSR2); + } + } + return null; + } + + /** + * Handler for SIGCHLD events. + * @access private + */ + + function event_sigchld() + { + // this should never happen to children. + if ( $this->is_child() ) + { + return null; + } + + // wait for child to exit. + pcntl_wait($status); + // for each child PID, kill with signal 0 (effectively, test if process is alive) + // if posix_kill fails, it's dead so remove it from the list. + foreach ( $this->ipc_sockets as $pid => $socket ) + { + if ( !@posix_kill($pid, 0) ) + { + // signal failed. + fclose($socket); + unset($this->ipc_sockets[$pid]); + } + } + } + + /** + * Handler for IPC events. + * @access private + */ + + function event_sigusr2() + { + if ( $this->is_child() ) + { + // this is easy - the parent sent the signal. + $command = rtrim(fgets($this->parent_sock, 102400), "\n"); + } + else + { + // since we can't find which PID sent the signal, set the timeout to a very small amount + // of time and try to read; if we get something, awesome. + foreach ( $this->ipc_sockets as $pid => $socket ) + { + // 1000 microseconds = 1/80th of the time it takes you to blink. + @stream_set_timeout($socket, 0, 1000); + $command = rtrim(@fgets($socket, 102400), "\n"); + if ( !empty($command) ) + { + break; + } + } + } + if ( empty($command) ) + { + // hmm, got a sigusr2 without an incoming command. oh well, ignore. + return null; + } + $command = $this->json->decode($command); + if ( !isset($command['action']) ) + { + // no action = no way to figure out how to handle this. + return null; + } + if ( !isset($this->ipc_actions[$command['action']]) ) + { + // action not registered + return null; + } + // should we propagate this event? + if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true ) + { + $this->ipc_send($command); + } + // we're good + @call_user_func($this->ipc_actions[$command['action']], $command, $this); + } + + /** + * Kills all child processes. + * @access public + */ + + public function kill_all_children() + { + foreach ( $this->ipc_sockets as $pid => $socket ) + { + $socklen = count($this->ipc_sockets); + posix_kill($pid, SIGTERM); + // wait until we are conscious of this child's death + while ( count($this->ipc_sockets) >= $socklen ) + { + usleep(20000); + } + } + } + +} + +?>