TripalDaemon.inc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. <?php
  2. /**
  3. * @file
  4. * Implements the Tripal Daemon functionality by using the Daemon API.
  5. */
  6. /**
  7. * This is the main class for the Tripal Daemon.
  8. *
  9. * It extends the DaemonAPIDaemon class provided by the Daemon API in order
  10. * to implement tripal job checking and execution functionality.
  11. */
  12. class TripalDaemon extends DrushDaemon {
  13. // OPTIONAL: Set how often in seconds your executeTask() should be called.
  14. // Keep in mind that this time does not include the amount of time spent
  15. // executing your tasks. For example, if you set this to 5 seconds and you
  16. // have 2 tasks in your execute_tasks() function, each of which take 15
  17. // seconds, then your loop will iterate (and thus your execute_task()
  18. // function will be called again) before your tasks finish.
  19. // CODING STANDARDS: Can't change this variable to lowerCamel since it
  20. // inherits from a library class.
  21. protected $loop_interval = 20;
  22. // Keep track of whether we are running a Tripal job or not
  23. // and if so, which Tripal jobs are we running.
  24. // If this array is empty then we are waiting for jobs ;-).
  25. protected $tripal_jobs = [];
  26. // Queue of Tripal Jobs waiting to be run.
  27. protected $queue = [];
  28. // Boolean as to whether we are aloud to process jobs in parallel.
  29. // @todo: Implement actually changing this setting.
  30. // NOTE: Can't be set via a drush option to trpjob-daemon since
  31. // tripal daemon calls drush daemon which then forks and can't
  32. // pass on the options to the child.
  33. protected $do_parallel = FALSE;
  34. // Maximum number of jobs that can be run in parallel.
  35. // @todo: Implement actually changing this setting (see above note).
  36. protected $max_num_jobs = 3;
  37. /**
  38. * Implements DaemonAPIDaemon::executeTask() function.
  39. *
  40. * This gets executed once per loop iteration & does the following:
  41. * 1. Checks to see if there are any Tripal Jobs waiting to be executed.
  42. * 2. If there are then they are run (jobs with a higher priority and higher
  43. * job_id are run first.
  44. *
  45. * This function will log how many jobs have been found and when each one was
  46. * started/completed, as well as, it's status upon completion.
  47. *
  48. * @param int $iteration_number
  49. * This is an integer stating the current iteration of the loop you are on.
  50. */
  51. protected function executeTask($iteration_number) {
  52. // If jobs are being run in parallel then we need to check to see if jobs
  53. // we think are running have actually been completed.
  54. if ($this->do_parallel) {
  55. $this->checkJobStatus();
  56. }
  57. // Start the loop by seeing if there is a job to be run :-).
  58. $job_id = $this->getTripalJobID();
  59. while ($job_id) {
  60. // Simply run the job :-D.
  61. $this->runTripalJob($job_id);
  62. // Get the next job (if we're aloud to run another one)...
  63. $job_id = $this->getTripalJobID();
  64. }
  65. // If jobs are being run in parallel then we need to check to see if jobs
  66. // we think are running have actually been completed.
  67. if ($this->do_parallel) {
  68. $this->checkJobStatus();
  69. }
  70. }
  71. /**
  72. * Get job_id of Tripal Job to run.
  73. *
  74. * NOTE: This function should only return a job_id if we are aloud to run it.
  75. */
  76. protected function getTripalJobID() {
  77. // First we need to determine if we are in sequenctial mode or parallel mode.
  78. // Parallel:
  79. if ($this->do_parallel) {
  80. // Check that we arn't already running the maximum number of jobs.
  81. if (tripal_max_jobs_exceeded($this->max_num_jobs)) {
  82. $this->log('Already running the maximum number of jobs.');
  83. return FALSE;
  84. }
  85. // Also check based on our list of running jobs just in case they haven't yet registered in the db.
  86. if (sizeof($this->tripal_jobs) >= $this->max_num_jobs) {
  87. $this->log('Already running the maximum number of jobs.');
  88. return FALSE;
  89. }
  90. }
  91. // Sequential:
  92. else {
  93. // Check that we arn't already running a job.
  94. if (tripal_is_job_running()) {
  95. $this->log('Job is still running. Waiting until it completes before starting another one.');
  96. return FALSE;
  97. }
  98. }
  99. // If we reach this point then we're aloud to run a job! :-D.
  100. //-----------------------------------------------------------
  101. // We would like to use a queue to keep track of Tripal Jobs to be run.
  102. // This will cut down on the number of queries and help ensure that the same job is
  103. // not run repeatedly with parallel processing.
  104. // First step, fill the queue if it's empty.
  105. if (empty($this->queue)) {
  106. $this->queue = db_query(
  107. "SELECT job_id FROM {tripal_jobs} TJ
  108. WHERE TJ.start_time IS NULL AND TJ.end_time IS NULL AND TJ.status != 'Cancelled'
  109. ORDER BY priority ASC, job_id ASC"
  110. )->fetchCol();
  111. }
  112. // If the queue is still empty then there are no jobs waiting.
  113. if (empty($this->queue)) {
  114. return FALSE;
  115. }
  116. // Return the next job in line.
  117. $job_id = array_shift($this->queue);
  118. // But only if it wasn't already run.
  119. if (!isset($this->tripal_jobs[$job_id])) {
  120. return $job_id;
  121. }
  122. }
  123. /**
  124. * Run Tripal Job.
  125. */
  126. protected function runTripalJob($job_id) {
  127. // Load the job we are going to run.
  128. $job = new TripalJob();
  129. $job->load($job_id);
  130. // Tell admins we are running a job.
  131. $this->tripal_jobs[$job_id] = $job;
  132. $this->setStatus();
  133. // And log the details.
  134. $this->log('Job (ID=' . $job_id . ') Started at ' . format_date(time(), 'small') . '.');
  135. // Parallel:
  136. if ($this->do_parallel) {
  137. $this->runParallelTripalJob($job_id);
  138. }
  139. // Sequential:
  140. else {
  141. $job = $this->runSequentialTripalJob($job);
  142. // Job needs to be re-loaded to reflect the new end time and status
  143. // since this does not seem to be set by run().
  144. $job->load($job_id);
  145. // If the job is sequential then we know we are done the job
  146. // just by the virtue of having reached this code.
  147. // As such, tell the admin :-).
  148. unset($this->tripal_jobs[$job_id]);
  149. $this->setStatus();
  150. // And log the details.
  151. $this->log('Job (ID=' . $job_id . ') Completed');
  152. $this->log('End DateTime: ' . format_date($job->getEndTime(), 'small'), '', 1);
  153. $this->log('Status: ' . $job->getStatus(), '', 1);
  154. }
  155. }
  156. /**
  157. * Run Parallel Tripal Job.
  158. */
  159. protected function runParallelTripalJob($job_id) {
  160. // Tripal job launcher needs the user... Unfortunatly we cannot pass one through the
  161. // drush command since that is intercepted by drushd.
  162. // As such we are going ot use the god user (uid=1) as a default that can be overriden
  163. // by passing in the drush "user" option.
  164. $uid = drush_get_option('user', 1);
  165. $user = user_load($uid);
  166. $username = $user->name;
  167. // We use drush_invoke_process() to fork the daemon safely to run
  168. // multiple jobs concurrently. We can't use the PHP-daemon built
  169. // in functionality such as workers & tasks b/c they copy the
  170. // entire enviro. resulting in mutliple processes using the same
  171. // database connection (which causes errors).
  172. drush_invoke_process(
  173. '@self', // Obviously run on the current site.
  174. 'trp-run-jobs', // Run the tripal job launcher.
  175. [], // No arguments (only options below).
  176. [
  177. 'job_id' => $job_id, // The job to be run.
  178. 'username' => $username, // The user to run it as.
  179. 'single' => 1, // Only run a single job!
  180. 'parallel' => $this->do_parallel, // We're aloud to run in parallel.
  181. 'max_jobs' => $this->max_num_jobs, // But only this many jobs at once.
  182. ],
  183. ['fork' => TRUE] // This tells drush to spawn a new process.
  184. );
  185. }
  186. /**
  187. * Run Sequential Tripal Job.
  188. */
  189. protected function runSequentialTripalJob($job) {
  190. // Run the job.
  191. try {
  192. $job->run();
  193. } catch (Exception $e) {
  194. $job->logMessage($e->getMessage(), [], TRIPAL_ERROR);
  195. }
  196. return $job;
  197. }
  198. /**
  199. * Check the status of a given (or all running) jobs.
  200. *
  201. * @param $job_id
  202. * The job_id of a specific job to check the status us. (OPTIONAL)
  203. */
  204. protected function checkJobStatus($job_id = FALSE) {
  205. if ($job_id) {
  206. $job = new TripalJob();
  207. $job->load($job_id);
  208. $jobs = [$job_id];
  209. }
  210. elseif (!empty($this->tripal_jobs)) {
  211. $jobs = array_keys($this->tripal_jobs);
  212. }
  213. else {
  214. return TRUE;
  215. }
  216. $results = db_query(
  217. 'SELECT job_id, pid, end_time, status FROM {tripal_jobs} WHERE job_id IN (:jobs)',
  218. [':jobs' => $jobs]);
  219. foreach ($results as $job) {
  220. // If the system still thinks the job is running then check it's pid.
  221. if ($job->status == 'Running') {
  222. $status = shell_exec('ps -p ' . escapeshellarg($job->pid) . ' -o pid=');
  223. if (!$status) {
  224. // Update the job.
  225. $job->end_time = time();
  226. $job->error_msg = 'Unknown Error Encountered.';
  227. $job->status = 'Error';
  228. $job->pid = '';
  229. drupal_write_record('tripal_jobs', $job, 'job_id');
  230. }
  231. }
  232. // The job is finished if it's not running or waiting -tell the admin.
  233. if ($job->status != 'Running' AND $job->status != 'Waiting') {
  234. // As such, tell the admin :-).
  235. unset($this->tripal_jobs[$job->job_id]);
  236. $this->setStatus();
  237. // And log the details.
  238. $this->log('Job (ID=' . $job->job_id . ') Completed');
  239. $this->log('End DateTime: ' . format_date($job->end_time, 'small'), '', 1);
  240. $this->log('Status: ' . $job->status, '', 1);
  241. }
  242. }
  243. }
  244. /**
  245. * Override to include additional daemon-specific settings for use in reports.
  246. *
  247. * @return array
  248. * An array of status details where the key should be a human-readable
  249. * name for your detail and the value should be the current state.
  250. */
  251. protected function getStatusDetails() {
  252. $status_details = parent::getStatusDetails();
  253. $status_details['Running Job'] = (empty($this->tripal_jobs)) ? FALSE : TRUE;
  254. $status_details['Current Jobs'] = array_keys($this->tripal_jobs);
  255. return $status_details;
  256. }
  257. /**
  258. * Set whether we should run parallel jobs or not.
  259. *
  260. * @param $do_parallel
  261. * Boolean indicating whether to allow parallel processing of jobs.
  262. * @param $max_num_jobs
  263. * Integer indicating the maximum number of jobs to run at once.
  264. */
  265. public function setParallel($do_parallel, $max_num_jobs) {
  266. $this->do_parallel = $do_parallel;
  267. $this->max_num_jobs = $max_num_jobs;
  268. }
  269. }