multithreading.php
changeset 48 d643bfb862d8
child 53 a6b339665650
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/multithreading.php	Tue Sep 23 23:24:13 2008 -0400
@@ -0,0 +1,336 @@
+<?php
+
+/**
+ * Multi-threading (well sort of) tools
+ * 
+ * Greyhound - real web management for Amarok
+ * Copyright (C) 2008 Dan Fuhry
+ *
+ * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
+ */
+
+require_once('json.php');
+
+/**
+ * Global signal handler for SIGCHLD.
+ */
+
+function Threader_SigChld()
+{
+  global $threader_instances;
+  foreach ( $threader_instances as &$mt )
+  {
+    if ( is_object($mt) )
+    {
+      $mt->event_sigchld();
+    }
+  }
+}
+
+/**
+ * Global signal handler for SIGUSR2.
+ */
+
+function Threader_SigUsr2()
+{
+  global $threader_instances;
+  foreach ( $threader_instances as &$mt )
+  {
+    if ( is_object($mt) )
+    {
+      $parchild = $mt->is_child() ? 'child' : 'parent';
+      $mt->event_sigusr2();
+    }
+  }
+}
+
+/**
+ * List of Threader instances. Needed for global handling of signals.
+ * @var array
+ */
+
+global $threader_instances;
+$threader_instances = array();
+
+/**
+ * Tools for emulating multi-threaded operation in PHP scripts.
+ * @package Amarok
+ * @subpackage WebControl
+ * @author Dan Fuhry
+ * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
+ */
+
+class Threader
+{
+  
+  /**
+   * Return value of fork() if the process is a child.
+   * @const int
+   */
+  
+  const FORK_CHILD = -1;
+  
+  /**
+   * Set to true if this is a child process. No exceptions.
+   * @var bool
+   * @access private
+   */
+  
+  private $is_child = false;
+  
+  /**
+   * Sockets for inter-process communication.
+   * @var array
+   * @access private
+   */
+  
+  protected $ipc_sockets = array();
+  
+  /**
+   * Socket for communication with the parent. Obviously only used after calling fork().
+   * @var resource
+   * @access private
+   */
+  
+  protected $parent_sock = false;
+  
+  /**
+   * Services_JSON instance.
+   * @var object
+   * @access private
+   */
+  
+  protected $json = false;
+  
+  /**
+   * PID of the parent process.
+   * @var int
+   * @access private
+   */
+  
+  protected $parent_pid = 1;
+  
+  /**
+   * List of actions for IPC events.
+   * @var array
+   * @access private
+   */
+  
+  protected $ipc_actions = array();
+  
+  /**
+   * Constructor. Sets up signal handlers. Nothing to see here, move along.
+   */
+  
+  public function __construct()
+  {
+    global $threader_instances;
+    
+    declare(ticks=1);
+    
+    $threader_instances[] =& $this;
+    
+    pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
+    pcntl_signal(SIGCHLD, 'Threader_SigChld');
+    
+    $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
+    $this->parent_pid = getmypid();
+  }
+  
+  /**
+   * Forks the current process. See your system's fork(2) man page for details.
+   * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
+   */
+  
+  public function fork()
+  {
+    // create our new sockets for IPC
+    $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+    // fork (emoticon of the day: --<E)
+    $fork_result = pcntl_fork();
+    if ( $fork_result == -1 )
+    {
+      // fork failed.
+      return false;
+    }
+    else if ( $fork_result )
+    {
+      // we are the parent - register the child
+      fclose($socket_pair[0]);
+      $this->ipc_sockets[$fork_result] = $socket_pair[1];
+      return $fork_result;
+    }
+    else
+    {
+      // we are the child.
+      fclose($socket_pair[1]);
+      $this->parent_sock = $socket_pair[0];
+      $this->is_child = true;
+      return FORK_CHILD;
+    }
+  }
+  
+  /**
+   * Are we a child?
+   * @return bool
+   */
+  
+  public function is_child()
+  {
+    return $this->is_child;
+  }
+  
+  /**
+   * Register an action so that when it is fired over IPC, a custom function can be called.
+   * @param string Action
+   * @param callback Function to call
+   * @return true on success, false on failure
+   */
+  
+  function ipc_register($action, $callback)
+  {
+    if ( !is_string($action) || empty($action) || !is_callable($callback) )
+    {
+      return false;
+    }
+    $this->ipc_actions[$action] = $callback;
+    return true;
+  }
+  
+  /**
+   * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified. 
+   * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
+   * @return null
+   */
+  
+  function ipc_send($data)
+  {
+    if ( !isset($data['action']) )
+    {
+      return false;
+    }
+    $data = $this->json->encode($data);
+    if ( $this->is_child() )
+    {
+      fwrite($this->parent_sock, "$data\n");
+      // signal the parent that we've got an event
+      posix_kill($this->parent_pid, SIGUSR2);
+    }
+    else
+    {
+      // signal each child
+      foreach ( $this->ipc_sockets as $pid => $socket )
+      {
+        fwrite($socket, "$data\n");
+        posix_kill($pid, SIGUSR2);
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Handler for SIGCHLD events.
+   * @access private
+   */
+  
+  function event_sigchld()
+  {
+    // this should never happen to children.
+    if ( $this->is_child() )
+    {
+      return null;
+    }
+    
+    // wait for child to exit.
+    pcntl_wait($status);
+    // for each child PID, kill with signal 0 (effectively, test if process is alive)
+    // if posix_kill fails, it's dead so remove it from the list.
+    foreach ( $this->ipc_sockets as $pid => $socket )
+    {
+      if ( !@posix_kill($pid, 0) )
+      {
+        // signal failed.
+        fclose($socket);
+        unset($this->ipc_sockets[$pid]);
+      }
+    }
+  }
+  
+  /**
+   * Handler for IPC events.
+   * @access private
+   */
+  
+  function event_sigusr2()
+  {
+    if ( $this->is_child() )
+    {
+      // this is easy - the parent sent the signal.
+      $command = rtrim(fgets($this->parent_sock, 102400), "\n");
+    }
+    else
+    {
+      // since we can't find which PID sent the signal, set the timeout to a very small amount
+      // of time and try to read; if we get something, awesome.
+      foreach ( $this->ipc_sockets as $pid => $socket )
+      {
+        // 1000 microseconds = 1/80th of the time it takes you to blink.
+        @stream_set_timeout($socket, 0, 1000);
+        $command = rtrim(@fgets($socket, 102400), "\n");
+        if ( !empty($command) )
+        {
+          break;
+        }
+      }
+    }
+    if ( empty($command) )
+    {
+      // hmm, got a sigusr2 without an incoming command. oh well, ignore.
+      return null;
+    }
+    $command = $this->json->decode($command);
+    if ( !isset($command['action']) )
+    {
+      // no action = no way to figure out how to handle this.
+      return null;
+    }
+    if ( !isset($this->ipc_actions[$command['action']]) )
+    {
+      // action not registered
+      return null;
+    }
+    // should we propagate this event?
+    if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
+    {
+      $this->ipc_send($command);
+    }
+    // we're good
+    @call_user_func($this->ipc_actions[$command['action']], $command, $this);
+  }
+  
+  /**
+   * Kills all child processes.
+   * @access public
+   */
+  
+  public function kill_all_children()
+  {
+    foreach ( $this->ipc_sockets as $pid => $socket )
+    {
+      $socklen = count($this->ipc_sockets);
+      posix_kill($pid, SIGTERM);
+      // wait until we are conscious of this child's death
+      while ( count($this->ipc_sockets) >= $socklen )
+      {
+        usleep(20000);
+      }
+    }
+  }
+  
+}
+
+?>