A little more work on reboot support. Still not enabled, but should work eventually.
<?php
/**
* Multi-threading (well sort of) tools
*
* Greyhound - real web management for Amarok
* Copyright (C) 2008 Dan Fuhry
*
* This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
*/
require_once('json.php');
/**
* Global signal handler for SIGCHLD.
*/
function Threader_SigChld()
{
global $threader_instances;
foreach ( $threader_instances as &$mt )
{
if ( is_object($mt) )
{
$mt->event_sigchld();
}
}
}
/**
* Global signal handler for SIGUSR2.
*/
function Threader_SigUsr2()
{
global $threader_instances, $threader_notick;
if ( @$threader_notick )
return;
foreach ( $threader_instances as &$mt )
{
if ( is_object($mt) )
{
$mt->event_sigusr2();
}
}
}
/**
* List of Threader instances. Needed for global handling of signals.
* @var array
*/
global $threader_instances;
$threader_instances = array();
/**
* Tools for emulating multi-threaded operation in PHP scripts.
* @package Amarok
* @subpackage WebControl
* @author Dan Fuhry
* @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
*/
class Threader
{
/**
* Return value of fork() if the process is a child.
* @const int
*/
const FORK_CHILD = -1;
/**
* Set to true if this is a child process. No exceptions.
* @var bool
* @access private
*/
private $is_child = false;
/**
* Sockets for inter-process communication.
* @var array
* @access private
*/
protected $ipc_sockets = array();
/**
* Socket for communication with the parent. Obviously only used after calling fork().
* @var resource
* @access private
*/
protected $parent_sock = false;
/**
* Services_JSON instance.
* @var object
* @access private
*/
protected $json = false;
/**
* PID of the parent process.
* @var int
* @access private
*/
protected $parent_pid = 1;
/**
* List of actions for IPC events.
* @var array
* @access private
*/
protected $ipc_actions = array();
/**
* Constructor. Sets up signal handlers. Nothing to see here, move along.
*/
public function __construct()
{
global $threader_instances;
if ( function_exists('pcntl_signal') )
{
declare(ticks=1);
$threader_instances[] =& $this;
pcntl_signal(SIGCHLD, 'Threader_SigChld');
pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
}
$this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
$this->parent_pid = getmypid();
}
/**
* Forks the current process. See your system's fork(2) man page for details.
* @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
*/
public function fork()
{
// create our new sockets for IPC
$socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
// fork (emoticon of the day: --<E)
$fork_result = pcntl_fork();
if ( $fork_result == -1 )
{
// fork failed.
return false;
}
else if ( $fork_result )
{
// we are the parent - register the child
fclose($socket_pair[0]);
$this->ipc_sockets[$fork_result] = $socket_pair[1];
return $fork_result;
}
else
{
// we are the child.
fclose($socket_pair[1]);
$this->parent_sock = $socket_pair[0];
$this->is_child = true;
return self::FORK_CHILD;
}
}
/**
* Are we a child?
* @return bool
*/
public function is_child()
{
return $this->is_child;
}
/**
* Register an action so that when it is fired over IPC, a custom function can be called.
* @param string Action
* @param callback Function to call
* @return true on success, false on failure
*/
function ipc_register($action, $callback)
{
if ( !is_string($action) || empty($action) || !is_callable($callback) )
{
return false;
}
$this->ipc_actions[$action] = $callback;
return true;
}
/**
* Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified.
* @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
* @return null
*/
function ipc_send($data)
{
if ( !isset($data['action']) )
{
return false;
}
$data = $this->json->encode($data);
if ( $this->is_child() )
{
fwrite($this->parent_sock, "$data\n");
// signal the parent that we've got an event
posix_kill($this->parent_pid, SIGUSR2);
}
else
{
// signal each child
foreach ( $this->ipc_sockets as $pid => $socket )
{
fwrite($socket, "$data\n");
posix_kill($pid, SIGUSR2);
}
}
return null;
}
/**
* Handler for SIGCHLD events.
* @access private
*/
function event_sigchld()
{
// this should never happen to children.
if ( $this->is_child() )
{
return null;
}
// wait for child to exit.
pcntl_wait($status);
// for each child PID, kill with signal 0 (effectively, test if process is alive)
// if posix_kill fails, it's dead so remove it from the list.
foreach ( $this->ipc_sockets as $pid => $socket )
{
if ( !@posix_kill($pid, 0) )
{
// signal failed.
fclose($socket);
unset($this->ipc_sockets[$pid]);
}
}
}
/**
* Handler for IPC events.
* @access private
*/
function event_sigusr2()
{
if ( $this->is_child() )
{
// this is easy - the parent sent the signal.
@stream_set_blocking($this->parent_sock, 0);
$command = rtrim(fgets($this->parent_sock, 102400), "\n");
}
else
{
// since we can't find which PID sent the signal, set the timeout to a very small amount
// of time and try to read; if we get something, awesome.
foreach ( $this->ipc_sockets as $pid => $socket )
{
@stream_set_blocking($socket, 0);
$command = rtrim(@fgets($socket, 102400), "\n");
if ( !empty($command) )
{
break;
}
}
}
if ( empty($command) )
{
// hmm, got a sigusr2 without an incoming command. oh well, ignore.
return null;
}
$command = $this->json->decode($command);
if ( !isset($command['action']) )
{
// no action = no way to figure out how to handle this.
return null;
}
if ( !isset($this->ipc_actions[$command['action']]) )
{
// action not registered
return null;
}
// should we propagate this event?
if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
{
$this->ipc_send($command);
}
// we're good
@call_user_func($this->ipc_actions[$command['action']], $command, $this);
}
/**
* Kills all child processes.
* @access public
*/
public function kill_all_children()
{
foreach ( $this->ipc_sockets as $pid => $socket )
{
$socklen = count($this->ipc_sockets);
posix_kill($pid, SIGTERM);
// wait until we are conscious of this child's death
while ( count($this->ipc_sockets) >= $socklen )
{
usleep(20000);
}
}
}
}
?>