Kaynağa Gözat

Daemon: implemented parallel processing although there's currently no way to turn it on.

Lacey Sanderson 7 yıl önce
ebeveyn
işleme
7bd8a670d8

+ 234 - 59
tripal_daemon/TripalDaemon.inc

@@ -28,6 +28,20 @@ class TripalDaemon extends DrushDaemon {
   // If this array is empty then we are waiting for jobs ;-).
   protected $tripal_jobs = array();
 
+  // Queue of Tripal Jobs waiting to be run.
+  protected $queue = array();
+
+  // Boolean as to whether we are aloud to process jobs in parallel.
+  // @todo: Implement actually changing this setting.
+  // NOTE: Can't be set via a drush option to trpjob-daemon since
+  // tripal daemon calls drush daemon which then forks and can't
+  // pass on the options to the child.
+  protected $do_parallel = FALSE; 
+
+  // Maximum number of jobs that can be run in parallel.
+  // @todo: Implement actually changing this setting (see above note).
+  protected $max_num_jobs = 2;
+
   /**
    * Implements DaemonAPIDaemon::executeTask() function.
    *
@@ -43,71 +57,232 @@ class TripalDaemon extends DrushDaemon {
    *   This is an integer stating the current iteration of the loop you are on.
    */
   protected function executeTask($iteration_number) {
- 
-    $do_parallel = FALSE;
-    $max_jobs = 1;
 
-    // First check if any jobs are currently running if they are, don't continue,
-    // we don't want to have more than one job script running at a time.
-    if (!$do_parallel and tripal_is_job_running()) {
-      $this->log("Jobs are still running. Use the --parallel=1 option with the Drush command to run jobs in parallel.\n");
+    // If jobs are being run in parallel then we need to check to see if jobs
+    // we think are running have actually been completed.
+    if ($this->do_parallel) {
+      $this->checkJobStatus();
+    }
+
+    // Start the loop by seeing if there is a job to be run :-).
+    $job_id = $this->getTripalJobID();
+    while ($job_id) {
+
+      // Simply run the job :-D.
+      $this->runTripalJob($job_id);
+
+      // Get the next job (if we're aloud to run another one)...
+      $job_id = $this->getTripalJobID();
+    }
+
+    // If jobs are being run in parallel then we need to check to see if jobs
+    // we think are running have actually been completed.
+    if ($this->do_parallel) {
+      $this->checkJobStatus();
+    }
+  }
+
+  /**
+   * Get job_id of Tripal Job to run.
+   *
+   * NOTE: This function should only return a job_id if we are aloud to run it.
+   */
+  protected function getTripalJobID() {
+
+    // First we need to determine if we are in sequenctial mode or parallel mode.
+    // Parallel:
+    if ($this->do_parallel) {
+
+      // Check that we arn't already running the maximum number of jobs.
+      if (tripal_max_jobs_exceeded($this->max_num_jobs)) {
+        $this->log('Already running the maximum number of jobs.');
+        return FALSE;
+      }
+      // Also check based on our list of running jobs just in case they haven't yet registered in the db.
+      if (sizeof($this->tripal_jobs) >= $this->max_num_jobs) {
+        $this->log('Already running the maximum number of jobs.');
+        return FALSE;
+      }
+    }
+    // Sequential:
+    else {
+
+      // Check that we arn't already running a job.
+      if (tripal_is_job_running()) {
+        $this->log('Job is still running. Waiting until it completes before starting another one.');
+        return FALSE;
+      }
+    }
+
+    // If we reach this point then we're aloud to run a job! :-D.
+    //-----------------------------------------------------------
+
+    // We would like to use a queue to keep track of Tripal Jobs to be run.
+    // This will cut down on the number of queries and help ensure that the same job is
+    // not run repeatedly with parallel processing.
+    // First step, fill the queue if it's empty.
+    if (empty($this->queue)) {
+      $this->queue = db_query(
+        "SELECT job_id FROM {tripal_jobs} TJ
+         WHERE TJ.start_time IS NULL AND TJ.end_time IS NULL AND TJ.status != 'Cancelled'
+         ORDER BY priority ASC, job_id ASC"
+      )->fetchCol();
+    }
+
+    // If the queue is still empty then there are no jobs waiting.
+    if (empty($this->queue)) {
+      return FALSE;
+    }
+
+    // Return the next job in line.
+    $job_id = array_shift($this->queue);
+
+    // But only if it wasn't already run.
+    if (!isset($this->tripal_jobs[$job_id])) {
+      return $job_id;
+    }
+  }
+
+  /**
+   * Run Tripal Job.
+   */
+  protected function runTripalJob($job_id) {
+
+    // Load the job we are going to run.
+    $job = new TripalJob();
+    $job->load($job_id);
+
+    // Tell admins we are running a job.
+    $this->tripal_jobs[$job_id] = $job;
+    $this->setStatus();
+    // And log the details.
+    $this->log('Job (ID='.$job_id.') Started at '.format_date(time(), 'small').'.');
+
+    // Parallel:
+    if ($this->do_parallel) {
+      $this->runParallelTripalJob($job_id);
+    }
+    // Sequential:
+    else {
+      $job = $this->runSequentialTripalJob($job);
+
+      // Job needs to be re-loaded to reflect the new end time and status
+      // since this does not seem to be set by run().
+      $job->load($job_id);
+
+      // If the job is sequential then we know we are done the job
+      // just by the virtue of having reached this code.
+      // As such, tell the admin :-).
+      unset($this->tripal_jobs[$job_id]);
+      $this->setStatus();
+      // And log the details.
+      $this->log('Job (ID='.$job_id.') Completed');
+      $this->log('End DateTime: '.format_date($job->getEndTime(), 'small'), '', 1);
+      $this->log('Status: '.$job->getStatus(), '', 1);
+
+    }
+  }
+
+  /**
+   * Run Parallel Tripal Job.
+   */
+  protected function runParallelTripalJob($job_id) {
+
+    // Tripal job launcher needs the user... Unfortunatly we cannot pass one through the
+    // drush command since that is intercepted by drushd.
+    // As such we are going ot use the god user (uid=1) as a default that can be overriden
+    // by passing in the drush "user" option.
+    $uid = drush_get_option('user', 1);
+    $user = user_load($uid);
+    $username = $user->name;
+
+    // We use drush_invoke_process() to fork the daemon safely to run
+    // multiple jobs concurrently. We can't use the PHP-daemon built
+    // in functionality such as workers & tasks b/c they copy the
+    // entire enviro. resulting in mutliple processes using the same
+    // database connection (which causes errors).
+    drush_invoke_process(
+      '@self',                               // Obviously run on the current site.
+      'trp-run-jobs',                        // Run the tripal job launcher.
+      array(),                               // No arguements (only options below).
+      array(
+        'job_id' => $job_id,                 // The job to be run.
+        'username' => $username,             // The user to run it as.
+        'single' => 1,                       // Only run a single job!
+        'parallel' => $this->do_parallel,    // We're aloud to run in parallel.
+        'max_jobs' => $this->max_num_jobs,   // But only this many jobs at once.
+      ),
+      array('fork' => TRUE)                  // This tells drush to spawn a new process.
+    );
+
+  }
+
+  /**
+   * Run Sequential Tripal Job.
+   */
+  protected function runSequentialTripalJob($job) {
+
+    // Run the job.
+    try {
+      $job->run();
+    }
+    catch (Exception $e) {
+      $job->logMessage($e->getMessage(), array(), TRIPAL_ERROR);
+    }
+
+    return $job;
+  }
+
+  /**
+   * Check the status of a given (or all running) jobs.
+   *
+   * @param $job_id
+   *    The job_id of a specific job to check the status us. (OPTIONAL)
+   */
+  protected function checkJobStatus($job_id = FALSE) {
+
+    if ($job_id) {
+      $job = new TripalJob();
+      $job->load($job_id);
+      $jobs = array($job_id);
     }
-    elseif ($do_parallel && tripal_max_jobs_exceeded($max_jobs)) {
-      $this->log("At least $max_jobs jobs are still running. At least one of these jobs much complete before a new job can start.\n");
+    elseif (!empty($this->tripal_jobs)) {
+      $jobs = array_keys($this->tripal_jobs);
     }
     else {
+      return TRUE;
+    }
+
+    $results = db_query(
+      'SELECT job_id, pid, end_time, status FROM {tripal_jobs} WHERE job_id IN (:jobs)',
+      array(':jobs' => $jobs));
+    foreach ($results as $job) {
+      // If the system still thinks the job is running then check it's pid.
+      if ($job->status == 'Running') {
+        $status = shell_exec('ps -p ' . escapeshellarg($job->pid) . ' -o pid=');
+        if (!$status) {
 
-      // First check to see if there are any tripal jobs to be run.
-      $sql =  "
-        SELECT TJ.job_id
-        FROM {tripal_jobs} TJ
-        WHERE
-          TJ.start_time IS NULL AND
-          TJ.end_time IS NULL AND
-          NOT TJ.status = 'Cancelled'
-        ORDER BY priority ASC,job_id ASC";
-      $job_ids = db_query($sql)->fetchCol();
-      $num_waiting_jobs = sizeof($job_ids);
-
-      // If there are then run them and log the output.
-      if ($num_waiting_jobs > 0) {
-        $this->log($num_waiting_jobs . ' Waiting Tripal Jobs... '
-          . 'Running waiting job(s) now.');
-
-        // Launch all tripal jobs :) Yay for bootstrapping!!
-        foreach ($job_ids as $id) {
-          $this->log('Starting Job (ID=' . $id . ')', '', 1);
-
-          // Tell admins we are running a job.
-          $this->tripal_jobs[$id] = $id;
-          $this->setStatus();
-
-          // Launch Tripal Job.
-          $job = new TripalJob();
-          $job->load($id);
-          try {
-            $job->run();
-          }
-          catch (Exception $e) {
-            $job->logMessage($e->getMessage(), array(), TRIPAL_ERROR);
-          } 
-
-          // Tell admins that we're done :-).
-          unset($this->tripal_jobs[$id]);
-          $this->setStatus();
-
-          // Report job details.
-          $job = db_query(
-            "SELECT j.*
-            FROM {tripal_jobs} j
-            WHERE j.job_id = :jid",
-            array(':jid' => $id)
-          )->fetchObject();
-          $this->log("Job (ID=" . $id . ") completed at "
-          . date('d M Y H:i:s', $job->end_time) . " with a status of '"
-          . $job->status . "'", "", 1);
+          // Update the job.
+          $job->end_time = time();
+          $job->error_msg = 'Unknown Error Encountered.';
+          $job->status = 'Error';
+          $job->pid = '';
+          drupal_write_record('tripal_jobs', $job, 'job_id');
         }
       }
+
+      // The job is finished if it's not running or waiting -tell the admin.
+      if ($job->status != 'Running' AND $job->status != 'Waiting') {
+
+        // As such, tell the admin :-).
+        unset($this->tripal_jobs[$job->job_id]);
+        $this->setStatus();
+        // And log the details.
+        $this->log('Job (ID='.$job->job_id.') Completed');
+        $this->log('End DateTime: '.format_date($job->end_time, 'small'), '', 1);
+        $this->log('Status: '.$job->status, '', 1);
+
+      }
     }
   }
 
@@ -122,7 +297,7 @@ class TripalDaemon extends DrushDaemon {
     $status_details = parent::getStatusDetails();
 
     $status_details['Running Job'] = (empty($this->tripal_jobs)) ? FALSE : TRUE;
-    $status_details['Current Jobs'] = $this->tripal_jobs;
+    $status_details['Current Jobs'] = array_keys($this->tripal_jobs);
 
     return $status_details;
   }

+ 3 - 1
tripal_daemon/includes/tripal_daemon.blocks.inc

@@ -121,10 +121,12 @@ function theme_tripal_daemon_status_block_content($show_all = FALSE) {
 
       // If these are current jobs then we want to link to details.
       if ($k == 'Current Jobs' AND !empty($v)) {
+        $list = array();
         foreach ($v as $job_id) {
           $url = 'admin/tripal/tripal_jobs/view/' . $job_id;
-          $v[$job_id] = l($job_id, $url);
+          $list[$job_id] = l($job_id, $url);
         }
+        $v = $list;
       }
 
       // If it's an array then make it a list.