<?php/** * Multi-threading (well sort of) tools * * Greyhound - real web management for Amarok * Copyright (C) 2008 Dan Fuhry * * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. */require_once('json.php');/** * Global signal handler for SIGCHLD. */function Threader_SigChld(){ global $threader_instances; foreach ( $threader_instances as &$mt ) { if ( is_object($mt) ) { $mt->event_sigchld(); } }}/** * Global signal handler for SIGUSR2. */function Threader_SigUsr2(){ global $threader_instances, $threader_notick; if ( @$threader_notick ) return; foreach ( $threader_instances as &$mt ) { if ( is_object($mt) ) { $mt->event_sigusr2(); } }}/** * List of Threader instances. Needed for global handling of signals. * @var array */global $threader_instances;$threader_instances = array();/** * Tools for emulating multi-threaded operation in PHP scripts. * @package Amarok * @subpackage WebControl * @author Dan Fuhry * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html> */class Threader{ /** * Return value of fork() if the process is a child. * @const int */ const FORK_CHILD = -1; /** * Set to true if this is a child process. No exceptions. * @var bool * @access private */ private $is_child = false; /** * Sockets for inter-process communication. * @var array * @access private */ protected $ipc_sockets = array(); /** * Socket for communication with the parent. Obviously only used after calling fork(). * @var resource * @access private */ protected $parent_sock = false; /** * Services_JSON instance. * @var object * @access private */ protected $json = false; /** * PID of the parent process. * @var int * @access private */ protected $parent_pid = 1; /** * List of actions for IPC events. * @var array * @access private */ protected $ipc_actions = array(); /** * Constructor. Sets up signal handlers. Nothing to see here, move along. */ public function __construct() { global $threader_instances; if ( function_exists('pcntl_signal') ) { declare(ticks=1); $threader_instances[] =& $this; pcntl_signal(SIGCHLD, 'Threader_SigChld'); pcntl_signal(SIGUSR2, 'Threader_SigUsr2'); } $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE); $this->parent_pid = getmypid(); } /** * Forks the current process. See your system's fork(2) man page for details. * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure. */ public function fork() { // create our new sockets for IPC $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); // fork (emoticon of the day: --<E) $fork_result = pcntl_fork(); if ( $fork_result == -1 ) { // fork failed. return false; } else if ( $fork_result ) { // we are the parent - register the child fclose($socket_pair[0]); $this->ipc_sockets[$fork_result] = $socket_pair[1]; return $fork_result; } else { // we are the child. fclose($socket_pair[1]); $this->parent_sock = $socket_pair[0]; $this->is_child = true; return self::FORK_CHILD; } } /** * Are we a child? * @return bool */ public function is_child() { return $this->is_child; } /** * Register an action so that when it is fired over IPC, a custom function can be called. * @param string Action * @param callback Function to call * @return true on success, false on failure */ function ipc_register($action, $callback) { if ( !is_string($action) || empty($action) || !is_callable($callback) ) { return false; } $this->ipc_actions[$action] = $callback; return true; } /** * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children. * @return null */ function ipc_send($data) { if ( !isset($data['action']) ) { return false; } $data = $this->json->encode($data); if ( $this->is_child() ) { fwrite($this->parent_sock, "$data\n"); // signal the parent that we've got an event posix_kill($this->parent_pid, SIGUSR2); } else { // signal each child foreach ( $this->ipc_sockets as $pid => $socket ) { fwrite($socket, "$data\n"); posix_kill($pid, SIGUSR2); } } return null; } /** * Handler for SIGCHLD events. * @access private */ function event_sigchld() { // this should never happen to children. if ( $this->is_child() ) { return null; } // wait for child to exit. pcntl_wait($status); // for each child PID, kill with signal 0 (effectively, test if process is alive) // if posix_kill fails, it's dead so remove it from the list. foreach ( $this->ipc_sockets as $pid => $socket ) { if ( !@posix_kill($pid, 0) ) { // signal failed. fclose($socket); unset($this->ipc_sockets[$pid]); } } } /** * Handler for IPC events. * @access private */ function event_sigusr2() { if ( $this->is_child() ) { // this is easy - the parent sent the signal. @stream_set_blocking($this->parent_sock, 0); $command = rtrim(fgets($this->parent_sock, 102400), "\n"); } else { // since we can't find which PID sent the signal, set the timeout to a very small amount // of time and try to read; if we get something, awesome. foreach ( $this->ipc_sockets as $pid => $socket ) { @stream_set_blocking($socket, 0); $command = rtrim(@fgets($socket, 102400), "\n"); if ( !empty($command) ) { break; } } } if ( empty($command) ) { // hmm, got a sigusr2 without an incoming command. oh well, ignore. return null; } $command = $this->json->decode($command); if ( !isset($command['action']) ) { // no action = no way to figure out how to handle this. return null; } if ( !isset($this->ipc_actions[$command['action']]) ) { // action not registered return null; } // should we propagate this event? if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true ) { $this->ipc_send($command); } // we're good @call_user_func($this->ipc_actions[$command['action']], $command, $this); } /** * Kills all child processes. * @access public */ public function kill_all_children() { foreach ( $this->ipc_sockets as $pid => $socket ) { $socklen = count($this->ipc_sockets); posix_kill($pid, SIGTERM); // wait until we are conscious of this child's death while ( count($this->ipc_sockets) >= $socklen ) { usleep(20000); } } }}?>