Kaynağa Gözat

Merge pull request #51 from tripal/LegumeFederation-multijobs

Tripal Job Launcher changes: --max_jobs and --single
Stephen Ficklin 8 yıl önce
ebeveyn
işleme
2ad7497a2d

+ 66 - 3
tripal_core/api/tripal_core.jobs.api.inc

@@ -261,6 +261,51 @@ function tripal_get_job_end($job) {
 
   return $end;
 }
+
+
+/**
+ * Check for too many concurrent jobs
+ *
+ * @param $max_jobs
+ *   The maximum number of concurrent jobs to allow; -1 = no limit
+ *
+ * @ingroup tripal_jobs_api
+ */
+function tripal_max_jobs_exceeded($max_jobs) {
+  if ($max_jobs < 0) {
+    // No limit on concurrent jobs
+    return FALSE;
+  }
+
+  $num_jobs_running = 0;
+
+  // Iterate through each job that has not ended and see if it is still running.
+  // If it is not running but does not have an end_time then set the end time
+  // and set the status to 'Error'
+  $sql =  "SELECT * FROM {tripal_jobs} TJ " .
+          "WHERE TJ.end_time IS NULL and NOT TJ.start_time IS NULL ";
+  $jobs = db_query($sql);
+  foreach ($jobs as $job) {
+    $status = shell_exec('ps -p ' . escapeshellarg($job->pid) . ' -o pid=');
+    if ($job->pid && $status) {
+      // the job is still running
+      $num_jobs_running++;
+    }
+    else {
+      // the job is not running so terminate it
+      $record = new stdClass();
+      $record->job_id = $job->job_id;
+      $record->end_time = REQUEST_TIME;
+      $record->status = 'Error';
+      $record->error_msg = 'Job has terminated unexpectedly.';
+      drupal_write_record('tripal_jobs', $record, 'job_id');
+    }
+  }
+
+  return ($num_jobs_running >= $max_jobs);
+}
+
+
 /**
  * Set a job to be re-ran (ie: add it back into the job queue)
  *
@@ -376,16 +421,24 @@ function tripal_execute_job($job_id, $redirect = TRUE) {
  *   based on order and priority.  However there are times when a specific
  *   job needs to be launched and this argument will allow it.  Only jobs
  *   which have not been run previously will run.
- *
+ * @param $max_jobs
+ *   The maximum number of jobs that should be run concurrently. If -1 then unlimited.
+ * @param $single
+ *   Ensures only a single job is run rather then the entire queue.
  * @ingroup tripal_jobs_api
  */
-function tripal_launch_job($do_parallel = 0, $job_id = NULL) {
+function tripal_launch_job($do_parallel = 0, $job_id = NULL, $max_jobs = -1, $single = 0) {
 
   // first check if any jobs are currently running
   // if they are, don't continue, we don't want to have
   // more than one job script running at a time
   if (!$do_parallel and tripal_is_job_running()) {
-    print "Jobs are still running. Use the --parallel=1 option with the Drush command to run jobs in parallel.";
+    print "Jobs are still running. Use the --parallel=1 option with the Drush command to run jobs in parallel.\n";
+    return;
+  }
+
+  if ($do_parallel && tripal_max_jobs_exceeded($max_jobs)) {
+    print "At least $max_jobs jobs are still running. At least one of these jobs much complete before a new job can start.\n";
     return;
   }
 
@@ -403,6 +456,8 @@ function tripal_launch_job($do_parallel = 0, $job_id = NULL) {
             "ORDER BY priority ASC,job_id ASC";
     $job_res = db_query($sql);
   }
+  print "There are " . $job_res->rowCount() . " jobs queued.\n";
+
   foreach ($job_res as $job) {
     // set the start time for this job
     $record = new stdClass();
@@ -451,6 +506,14 @@ function tripal_launch_job($do_parallel = 0, $job_id = NULL) {
     $record->progress = '100';
     drupal_write_record('tripal_jobs', $record, 'job_id');
 
+    if ($single) {
+      // Don't start any more jobs
+      break;
+    }
+    if (tripal_max_jobs_exceeded($max_jobs)) {
+      break;
+    }
+
     // send an email to the user advising that the job has finished
   }
 }

+ 19 - 7
tripal_core/tripal_core.drush.inc

@@ -102,7 +102,8 @@ function tripal_core_drush_command() {
     'description' => dt('Launches jobs waiting in the queue. Only one job can execute at a time unless the --parllel=1 option is provided.'),
     'examples' => array(
       'Single Job' => 'drush trp-run-jobs --username=administrator',
-      'Parallel Job' => 'drush trp-run-jobs --username=administrator --parallel=1'
+      'Parallel Job' => 'drush trp-run-jobs --username=administrator --parallel=1',
+      'Max-jobs Job' => 'drush trp-run-jobs --username=administrator --parallel=1 --max_jobs=10',
     ),
     'arguments' => array(),
     'options' => array(
@@ -114,6 +115,8 @@ function tripal_core_drush_command() {
       ),
       'parallel' => dt('Normally jobs are executed one at a time. But if you are certain no conflicts will occur with other currently running jobs you may set this argument to a value of 1 to make the job run in parallel with other running jobs.'),
       'job_id' => dt('Provide a job_id to run a specific job. Only jobs that have not been run already can be used'),
+      'max_jobs' => dt('Indicate the maximum number of concurrent jobs. Default is -1 (unlimited). Ignore if not running parallel jobs'),
+      'single' => dt('Execute only one queued job'),
     ),
   );
   $items['trp-rerun-job'] = array(
@@ -135,6 +138,8 @@ function tripal_core_drush_command() {
         'required' => TRUE,
       ),
       'parallel' => dt('Normally jobs are executed one at a time. But if you are certain no conflicts will occur with other currently running jobs you may set this argument to a value of 1 to make the job run in parallel with other running jobs.'),
+      'max_jobs' => dt('Indicate the maximum number of concurrent jobs. Default is -1 (unlimited). Ignore if not running parallel jobs'),
+      'single' => dt('Execute only one queued job'),
     ),
   );
   $items['trp-get-cversion'] = array(
@@ -296,6 +301,8 @@ function drush_tripal_core_set_user($username) {
 function drush_tripal_core_trp_run_jobs() {
   $parallel = drush_get_option('parallel');
   $job_id   = drush_get_option('job_id');
+  $max_jobs = drush_get_option('max_jobs', -1);
+  $single   = drush_get_option('single', 0);
 
   // Unfortunately later versions of Drush use the '--user' argument which
   // makes it incompatible with how Tripal was using it.  For backwards
@@ -316,17 +323,19 @@ function drush_tripal_core_trp_run_jobs() {
 
   drush_tripal_core_set_user($username);
 
+  drush_print("\n" . date('Y-m-d H:i:s'));
   if ($parallel) {
     drush_print("Tripal Job Launcher (in parallel)");
+    if ($max_jobs !== -1) drush_print("Maximum number of jobs is " . $max_jobs);
     drush_print("Running as user '$username'");
     drush_print("-------------------");
-    tripal_launch_job($parallel, $job_id);
+    tripal_launch_job($parallel, $job_id, $max_jobs, $single);
   }
   else {
     drush_print("Tripal Job Launcher");
     drush_print("Running as user '$username'");
     drush_print("-------------------");
-    tripal_launch_job(0, $job_id);
+    tripal_launch_job(0, $job_id, $max_jobs, $single);
   }
 }
 /**
@@ -374,7 +383,7 @@ function drush_tripal_core_trp_rerun_job() {
   // --username argument for the fture.
   $user = drush_get_option('user');
   $uname = drush_get_option('username');
-  print "USER: '$user', UNAME: '$uname'\n";
+  print "USER: '$user', USERNAME: '$uname'\n";
   if ($user and is_numeric($user)) {
   }
   elseif ($user) {
@@ -387,15 +396,18 @@ function drush_tripal_core_trp_rerun_job() {
 
   $parallel = drush_get_option('parallel');
   $job_id   = drush_get_option('job_id');
+  $max_jobs = drush_get_option('max_jobs', -1);
+  $single   = drush_get_option('single', 0);
 
   drush_tripal_core_set_user($username);
   $new_job_id = tripal_rerun_job($job_id, FALSE);
 
+  drush_print("\n" . date('Y-m-d H:i:s'));
   if ($parallel) {
     drush_print("Tripal Job Launcher (in parallel)");
     drush_print("Running as user '$username'");
     drush_print("-------------------");
-    tripal_launch_job($parallel, $new_job_id);
+    tripal_launch_job($parallel, $new_job_id, $max_jobs, $single);
   }
   else {
     drush_print("Tripal Job Launcher");
@@ -403,7 +415,6 @@ function drush_tripal_core_trp_rerun_job() {
     drush_print("-------------------");
     tripal_launch_job(0, $new_job_id);
   }
-
 }
 
 /**
@@ -447,7 +458,8 @@ function drush_tripal_core_trp_get_currjob() {
         "Module: " . $job->modulename . "\n" .
         "Callback: " . $job->callback . "\n" .
         "Process ID: " . $job->pid . "\n" .
-        "Progress: " . $job->progress . "%\n";
+        "Progress: " . $job->progress . "%\n".
+        "Current Date: " . date('Y-m-d H:i:s') . "\n";
     drush_print($output);
   }
   if (!$job_pid) {

+ 3 - 5
tripal_core/tripal_launch_job.php

@@ -20,14 +20,13 @@ $job_id = $argv[1];
 $root = $argv[2];
 $username = $argv[3];
 $do_parallel = $argv[4];
+$max_jobs = (isset($argv[5]) ? $argv[5] : -1;  // -1 = don't limit number of consecutive jobs
 
 /**
  * Root directory of Drupal installation.
  */
 define('DRUPAL_ROOT', getcwd());
 
-
-
 require_once DRUPAL_ROOT . '/includes/bootstrap.inc';
 drupal_bootstrap(DRUPAL_BOOTSTRAP_FULL);
 menu_execute_active_handler();
@@ -40,7 +39,6 @@ $_SERVER['REMOTE_ADDR'] = NULL;
 $_SERVER['REQUEST_METHOD'] = NULL;
 
 
-
 $results = db_query("SELECT * FROM {users} WHERE name = :name", array(':name' => $username));
 $u = $results->fetchObject();
 if (!$u) {
@@ -51,12 +49,12 @@ if (!$u) {
 global $user;
 $user = user_load($u->uid);
 
-
+fwrite($stdout, "\n" . date('Y-m-d' H:i:s) . "\n");
 fwrite($stdout, "Tripal Job Launcher\n");
 fwrite($stdout, "Running as user ' . $username . '\n");
 fwrite($stdout, "-------------------\n");
 
-tripal_launch_job($do_parallel);
+tripal_launch_job($do_parallel, null, $max_jobs);
 
 /**
  * Print out the usage instructions if they are not followed correctly