TripalJob.inc 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. <?php
  2. class TripalJob {
  3. /**
  4. * The ID of the job.
  5. */
  6. protected $job_id = NULL;
  7. /**
  8. * Contains the job record for this job.
  9. */
  10. protected $job = NULL;
  11. /**
  12. * The number of items that this importer needs to process. A progress
  13. * can be calculated by dividing the number of items process by this
  14. * number.
  15. */
  16. private $total_items;
  17. /**
  18. * The number of items that have been handled so far. This must never
  19. * be below 0 and never exceed $total_items;
  20. */
  21. private $num_handled;
  22. /**
  23. * The interval when the job progress should be updated. Updating the job
  24. * progress incurrs a database write which takes time and if it occurs to
  25. * frequently can slow down the loader. This should be a value between
  26. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  27. * progress every time the num_handled increases by 1%).
  28. */
  29. private $interval;
  30. /**
  31. * The time stamp when the job begins.
  32. *
  33. * @var integer
  34. */
  35. private $start_time;
  36. /**
  37. * The time from when the setTotalItems is called to the present time.
  38. *
  39. * @var
  40. */
  41. private $progress_start_time;
  42. /**
  43. * Stores the last percentage that progress was reported.
  44. *
  45. * @var integer
  46. */
  47. private $reported = 0;
  48. /**
  49. * Instantiates a new TripalJob object.
  50. *
  51. * By default the job object is "empty". It must be associated with
  52. * job details either by calling the load() function or the
  53. * create() function.
  54. */
  55. public function __construct() {
  56. }
  57. /**
  58. * Loads a job for this object.
  59. *
  60. * @param $job_id
  61. * The ID of the job.
  62. */
  63. public function load($job_id) {
  64. // Make sure we have a numeric job_id.
  65. if (!$job_id or !is_numeric($job_id)) {
  66. // If we don't then do a quick double check in case this is a
  67. // TripalJob object in which case, I still have the job_id.
  68. if (is_object($job_id) AND is_a($job_id, 'TripalJob')) {
  69. $job_id = $job_id->job->job_id;
  70. }
  71. // Finally just throw an exception.
  72. // I can't load a job if I don't know which one.
  73. else {
  74. throw new Exception("You must provide the job_id to load the job.");
  75. }
  76. }
  77. $sql = 'SELECT j.* FROM {tripal_jobs} j WHERE j.job_id = :job_id';
  78. $args = [':job_id' => $job_id];
  79. $this->job = db_query($sql, $args)->fetchObject();
  80. if (!$this->job) {
  81. throw new Exception("Cannot find a job with this ID provided.");
  82. }
  83. // Fix the date/time fields.
  84. $this->job->submit_date_string = $this->job->submit_date ? format_date($this->job->submit_date) : '';
  85. $this->job->start_time_string = $this->job->start_time ? format_date($this->job->start_time) : '';
  86. $this->job->end_time_string = $this->job->end_time ? format_date($this->job->end_time) : '';
  87. // Unserialize the includes.
  88. $this->job->includes = unserialize($this->job->includes);
  89. // Arguments for jobs used to be stored as plain string with a double colon
  90. // separating them. But as of Tripal v2.0 the arguments are stored as
  91. // a serialized array. To be backwards compatible, we should check for
  92. // serialization and if not then we will use the old style
  93. $this->job->arguments = unserialize($this->job->arguments);
  94. if (!is_array($this->job->arguments)) {
  95. $this->job->arguments = explode("::", $this->job->arguments);
  96. }
  97. }
  98. /**
  99. * Creates a new job.
  100. *
  101. * @param $details
  102. * An associative array of the job details or a single job_id. If the
  103. * details are provided then the job is created and added to the database
  104. * otherwise if a job_id is provided then the object is loaded from the
  105. * database. The following keys are allowed:
  106. * - job_name: The human readable name for the job.
  107. * - modulename: The name of the module adding the job.
  108. * - callback: The name of a function to be called when the job is executed.
  109. * - arguments: An array of arguments to be passed on to the callback.
  110. * - uid: The uid of the user adding the job
  111. * - priority: The priority at which to run the job where the highest
  112. * priority is 10 and the lowest priority is 1. The default
  113. * priority is 10.
  114. * - includes: An array of paths to files that should be included in order
  115. * to execute the job. Use the module_load_include function to get a path
  116. * for a given file.
  117. * - ignore_duplicate: (Optional). Set to TRUE to ignore a job if it has
  118. * the same name as another job which has not yet run. If TRUE and a job
  119. * already exists then this object will reference the job already in the
  120. * queue rather than a new submission. The default is TRUE.
  121. *
  122. * @throws Exception
  123. * On failure an exception is thrown.
  124. *
  125. * @return
  126. * Returns TRUE if the job was succesfully created. Returns FALSE otherwise.
  127. * A return of FALSE does not mean the job creation failed. If the
  128. * ignore_duplicate is set to false and the job already is present in the
  129. * queue then the return value will be FALSE.
  130. */
  131. public function create($details) {
  132. // Set some defaults
  133. if (!array_key_exists('prority', $details)) {
  134. $details['priority'] = 10;
  135. }
  136. if (!array_key_exists('includes', $details)) {
  137. $details['includes'] = [];
  138. }
  139. if (!array_key_exists('ignore_duplicate', $details)) {
  140. $details['ignore_duplicate'] = FALSE;
  141. }
  142. // Make sure the arguments are correct.
  143. if (!$details['job_name']) {
  144. throw new Exception("Must provide a 'job_name' to create a job.");
  145. }
  146. if (!$details['modulename']) {
  147. throw new Exception("Must provide a 'modulename' to create a job.");
  148. }
  149. if (!$details['callback']) {
  150. throw new Exception("Must provide a 'callback' to create a job.");
  151. }
  152. if ($details['ignore_duplicate'] !== FALSE and $details['ignore_duplicate'] !== TRUE) {
  153. throw new Exception("Must provide either TRUE or FALSE for the ignore_duplicate option when creating a job.");
  154. }
  155. $includes = $details['includes'];
  156. if ($includes and is_array($includes)) {
  157. foreach ($includes as $path) {
  158. $full_path = $_SERVER['DOCUMENT_ROOT'] . base_path() . $path;
  159. if (!empty($path)) {
  160. if (file_exists($path)) {
  161. require_once($path);
  162. }
  163. elseif (file_exists($full_path)) {
  164. require_once($path);
  165. }
  166. elseif (!empty($path)) {
  167. throw new Exception("Included files for Tripal Job must exist. This path ($full_path) doesn't exist.");
  168. }
  169. }
  170. }
  171. }
  172. if (!function_exists($details['callback'])) {
  173. throw new Exception("Must provide a valid callback function to the tripal_add_job() function.");
  174. }
  175. if (!is_numeric($details['uid'])) {
  176. throw new Exception("Must provide a numeric \$uid argument to the tripal_add_job() function.");
  177. }
  178. $priority = $details['priority'];
  179. if (!$priority or !is_numeric($priority) or $priority < 1 or $priority > 10) {
  180. throw new Exception("Must provide a numeric \$priority argument between 1 and 10 to the tripal_add_job() function.");
  181. }
  182. $arguments = $details['arguments'];
  183. if (!is_array($arguments)) {
  184. throw new Exception("Must provide an array as the \$arguments argument to the tripal_add_job() function.");
  185. }
  186. // convert the arguments into a string for storage in the database
  187. $args = [];
  188. if (is_array($arguments)) {
  189. $args = serialize($arguments);
  190. }
  191. try {
  192. // Before inserting a new record, and if ignore_duplicate is TRUE then
  193. // check to see if the job already exists.
  194. if ($details['ignore_duplicate'] === TRUE) {
  195. $query = db_select('tripal_jobs', 'tj');
  196. $query->fields('tj', ['job_id']);
  197. $query->condition('job_name', $details['job_name']);
  198. $query->isNull('start_time');
  199. $job_id = $query->execute()->fetchField();
  200. if ($job_id) {
  201. $this->load($job_id);
  202. return FALSE;
  203. }
  204. }
  205. $job_id = db_insert('tripal_jobs')
  206. ->fields([
  207. 'job_name' => $details['job_name'],
  208. 'modulename' => $details['modulename'],
  209. 'callback' => $details['callback'],
  210. 'status' => 'Waiting',
  211. 'submit_date' => time(),
  212. 'uid' => $details['uid'],
  213. 'priority' => $priority,
  214. 'arguments' => $args,
  215. 'includes' => serialize($includes),
  216. ])
  217. ->execute();
  218. // Now load the job into this object.
  219. $this->load($job_id);
  220. return TRUE;
  221. } catch (Exception $e) {
  222. throw new Exception('Cannot create job: ' . $e->getMessage());
  223. }
  224. }
  225. /**
  226. * Cancels the job and prevents it from running.
  227. */
  228. public function cancel() {
  229. if (!$this->job) {
  230. throw new Exception("There is no job associated with this object. Cannot cancel");
  231. }
  232. if ($this->job->status == 'Running') {
  233. throw new Exception("Job Cannot be cancelled it is currently running.");
  234. }
  235. if ($this->job->status == 'Completed') {
  236. throw new Exception("Job Cannot be cancelled it has already finished.");
  237. }
  238. if ($this->job->status == 'Error') {
  239. throw new Exception("Job Cannot be cancelled it is in an error state.");
  240. }
  241. if ($this->job->status == 'Cancelled') {
  242. throw new Exception("Job Cannot be cancelled it is already cancelled.");
  243. }
  244. // Set the end time for this job.
  245. try {
  246. if ($this->job->start_time == 0) {
  247. $record = new stdClass();
  248. $record->job_id = $this->job->job_id;
  249. $record->status = 'Cancelled';
  250. $record->progress = '0';
  251. drupal_write_record('tripal_jobs', $record, 'job_id');
  252. }
  253. } catch (Exception $e) {
  254. throw new Exception('Cannot cancel job: ' . $e->getMessage());
  255. }
  256. }
  257. /**
  258. * Executes the job.
  259. */
  260. public function run() {
  261. $this->start_time = time();
  262. $this->progress_start_time = time();
  263. if (!$this->job) {
  264. throw new Exception('Cannot launch job as no job is associated with this object.');
  265. }
  266. try {
  267. // Include the necessary files needed to run the job.
  268. if (is_array($this->job->includes)) {
  269. foreach ($this->job->includes as $path) {
  270. if ($path) {
  271. require_once $path;
  272. }
  273. }
  274. }
  275. // Set the start time for this job.
  276. $record = new stdClass();
  277. $record->job_id = $this->job->job_id;
  278. $record->start_time = $this->start_time;
  279. $record->status = 'Running';
  280. $record->pid = getmypid();
  281. drupal_write_record('tripal_jobs', $record, 'job_id');
  282. // Callback functions need the job in order to update
  283. // progress. But prior to Tripal v3 the job callback functions
  284. // only accepted a $job_id as the final argument. So, we need
  285. // to see if the callback is Tv3 compatible or older. If older
  286. // we want to still support it and pass the job_id.
  287. $arguments = $this->job->arguments;
  288. $callback = $this->job->callback;
  289. $ref = new ReflectionFunction($callback);
  290. $refparams = $ref->getParameters();
  291. if (count($refparams) > 0) {
  292. $lastparam = $refparams[count($refparams) - 1];
  293. if ($lastparam->getName() == 'job_id') {
  294. $arguments[] = $this->job->job_id;
  295. }
  296. else {
  297. $arguments[] = $this;
  298. }
  299. }
  300. // Launch the job.
  301. call_user_func_array($callback, $arguments);
  302. // Set the end time for this job.
  303. $record = new stdClass();
  304. $record->job_id = $this->job->job_id;
  305. $record->end_time = time();
  306. $record->error_msg = $this->job->error_msg;
  307. $record->progress = 100;
  308. $record->status = 'Completed';
  309. $record->pid = '';
  310. drupal_write_record('tripal_jobs', $record, 'job_id');
  311. $this->load($this->job->job_id);
  312. } catch (Exception $e) {
  313. $record->end_time = time();
  314. $record->error_msg = $this->job->error_msg;
  315. $record->progress = $this->job->progress;
  316. $record->status = 'Error';
  317. $record->pid = '';
  318. drupal_write_record('tripal_jobs', $record, 'job_id');
  319. drupal_set_message('Job execution failed: ' . $e->getMessage(), 'error');
  320. }
  321. }
  322. /**
  323. * Inidcates if the job is running.
  324. *
  325. * @return
  326. * TRUE if the job is running, FALSE otherwise.
  327. */
  328. public function isRunning() {
  329. if (!$this->job) {
  330. throw new Exception('Cannot check running status as no job is associated with this object.');
  331. }
  332. $status = shell_exec('ps -p ' . escapeshellarg($this->job->pid) . ' -o pid=');
  333. if ($this->job->pid && $status) {
  334. // The job is still running.
  335. return TRUE;
  336. }
  337. // return FALSE to indicate that no jobs are currently running.
  338. return FALSE;
  339. }
  340. /**
  341. * Retrieve the job object as if from a database query.
  342. */
  343. public function getJob() {
  344. return $this->job;
  345. }
  346. /**
  347. * Retrieves the job ID.
  348. */
  349. public function getJobID() {
  350. return $this->job->job_id;
  351. }
  352. /**
  353. * Retrieves the user ID of the user that submitted the job.
  354. */
  355. public function getUID() {
  356. return $this->job->uid;
  357. }
  358. /**
  359. * Retrieves the job name.
  360. */
  361. public function getJobName() {
  362. return $this->job->job_name;
  363. }
  364. /**
  365. * Retrieves the name of the module that submitted the job.
  366. */
  367. public function getModuleName() {
  368. return $this->job->modulename;
  369. }
  370. /**
  371. * Retrieves the callback function for the job.
  372. */
  373. public function getCallback() {
  374. return $this->job->callback;
  375. }
  376. /**
  377. * Retrieves the array of arguments for the job.
  378. */
  379. public function getArguments() {
  380. return $this->job->arguments;
  381. }
  382. /**
  383. * Retrieves the current percent complete (i.e. progress) of the job.
  384. */
  385. public function getProgress() {
  386. return $this->job->progress;
  387. }
  388. /**
  389. * Sets the current percent complete of a job.
  390. *
  391. * @param $percent_done
  392. * A value between 0 and 100 indicating the percentage complete of the job.
  393. */
  394. public function setProgress($percent_done) {
  395. if (!$this->job) {
  396. throw new Exception('Cannot set progress as no job is associated with this object.');
  397. }
  398. $this->job->progress = $percent_done;
  399. $progress = sprintf("%d", $percent_done);
  400. db_update('tripal_jobs')
  401. ->fields([
  402. 'progress' => $progress,
  403. ])
  404. ->condition('job_id', $this->job->job_id)
  405. ->execute();
  406. }
  407. /**
  408. * Sets the total number if items to be processed.
  409. *
  410. * This should typically be called near the beginning of the loading process
  411. * to indicate the number of items that must be processed.
  412. *
  413. * @param $total_items
  414. * The total number of items to process.
  415. */
  416. public function setTotalItems($total_items) {
  417. $this->progress_start_time = time();
  418. $this->total_items = $total_items;
  419. }
  420. /**
  421. * Adds to the count of the total number of items that have been handled.
  422. *
  423. * @param $num_handled
  424. */
  425. public function addItemsHandled($num_handled) {
  426. $items_handled = $this->num_handled = $this->num_handled + $num_handled;
  427. $this->setItemsHandled($items_handled);
  428. }
  429. /**
  430. * Sets the number of items that have been processed.
  431. *
  432. * This should be called anytime the loader wants to indicate how many
  433. * items have been processed. The amount of progress will be
  434. * calculated using this number. If the amount of items handled exceeds
  435. * the interval specified then the progress is reported to the user. If
  436. * this loader is associated with a job then the job progress is also updated.
  437. *
  438. * @param $total_handled
  439. * The total number of items that have been processed.
  440. */
  441. public function setItemsHandled($total_handled) {
  442. // First set the number of items handled.
  443. $this->num_handled = $total_handled;
  444. if ($total_handled == 0) {
  445. $memory = number_format(memory_get_usage());
  446. print "Percent complete: 0%. Memory: " . $memory . " bytes.\r";
  447. return;
  448. }
  449. // Now see if we need to report to the user the percent done. A message
  450. // will be printed on the command-line if the job is run there.
  451. $percent = ($this->num_handled / $this->total_items) * 100;
  452. $ipercent = (int) $percent;
  453. // If we've reached our interval then print update info.
  454. if ($ipercent > 0 and $ipercent != $this->reported and ($ipercent % $this->interval) == 0) {
  455. $duration = (time() - $this->progress_start_time) / 60;
  456. $duration = sprintf("%.2f", $duration);
  457. $memory = memory_get_usage();
  458. $fmemory = number_format($memory);
  459. $spercent = sprintf("%d", $percent);
  460. print "Percent complete: " . $spercent . "%. Memory: " . $fmemory . " bytes. Duration: " . $duration . " mins\r";
  461. $this->setProgress($percent);
  462. $this->reported = $ipercent;
  463. }
  464. }
  465. /**
  466. * Updates the percent interval when the job progress is updated.
  467. *
  468. * Updating the job
  469. * progress incurrs a database write which takes time and if it occurs to
  470. * frequently can slow down the loader. This should be a value between
  471. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  472. * progress every time the num_handled increases by 1%).
  473. *
  474. * @param $interval
  475. * A number between 0 and 100.
  476. */
  477. public function setInterval($interval) {
  478. $this->interval = $interval;
  479. }
  480. /**
  481. * Retrieves the status of the job.
  482. */
  483. public function getStatus() {
  484. return $this->job->status;
  485. }
  486. /**
  487. * Retrieves the time the job was submitted.
  488. */
  489. public function getSubmitTime() {
  490. return $this->job->submit_date;
  491. }
  492. /**
  493. * Retieves the time the job began execution (i.e. the start time).
  494. */
  495. public function getStartTime() {
  496. return $this->job->start_time;
  497. }
  498. /**
  499. * Retieves the time the job completed execution (i.e. the end time).
  500. */
  501. public function getEndTime() {
  502. return $this->job->end_time;
  503. }
  504. /**
  505. * Retieves the log for the job.
  506. *
  507. * @return
  508. * A large string containing the text of the job log. It contains both
  509. * status upates and errors.
  510. */
  511. public function getLog() {
  512. return $this->job->error_msg;
  513. }
  514. /**
  515. * Retrieves the process ID of the job.
  516. */
  517. public function getPID() {
  518. return $this->job->pid;
  519. }
  520. /**
  521. * Retreieves the priority that is currently set for the job.
  522. */
  523. public function getPriority() {
  524. return $this->job->priority;
  525. }
  526. /**
  527. * Get the MLock value of the job.
  528. *
  529. * The MLock value indicates if no other jobs from a give module
  530. * should be executed while this job is running.
  531. */
  532. public function getMLock() {
  533. return $this->job->mlock;
  534. }
  535. /**
  536. * Get the lock value of the job.
  537. *
  538. * The lock value indicates if no other jobs from any module
  539. * should be executed while this job is running.
  540. */
  541. public function getLock() {
  542. return $this->job->lock;
  543. }
  544. /**
  545. * Get the list of files that must be included prior to job execution.
  546. */
  547. public function getIncludes() {
  548. return $this->job->includes;
  549. }
  550. /**
  551. * Logs a message for the job.
  552. *
  553. * There is no distinction between status messages and error logs. Any
  554. * message that is intended for the user to review the status of the job
  555. * can be provided here.
  556. *
  557. * Messages that are are of severity TRIPAL_CRITICAL or TRIPAL_ERROR
  558. * are also logged to the watchdog.
  559. *
  560. * Logging works regardless if the job uses a transaction. If the
  561. * transaction must be rolled back to to an error the error messages will
  562. * persist.
  563. *
  564. * If a function can be executed by the Tripal job system (and hence the
  565. * job object is passed in) then you can directly use this function to
  566. * log messages. However, if the function can be run via drush on the
  567. * command-line, consider using the tripal_report_error() function which can
  568. * accept a job object as an $option and will print to both the terminal
  569. * and to the job object. If you use the tripal_report_error() be sure
  570. * to set the 'watchdog' option only if you need log messages also going
  571. * to the watchdog.
  572. *
  573. * @param $message
  574. * The message to store in the log. Keep $message translatable by not
  575. * concatenating dynamic values into it! Variables in the message should
  576. * be added by using placeholder strings alongside the variables argument
  577. * to declare the value of the placeholders. See t() for documentation on
  578. * how $message and $variables interact.
  579. * @param $variables
  580. * Array of variables to replace in the message on display or NULL if
  581. * message is already translated or not possible to translate.
  582. * @param $severity
  583. * The severity of the message; one of the following values:
  584. * - TRIPAL_CRITICAL: Critical conditions.
  585. * - TRIPAL_ERROR: Error conditions.
  586. * - TRIPAL_WARNING: Warning conditions.
  587. * - TRIPAL_NOTICE: Normal but significant conditions.
  588. * - TRIPAL_INFO: (default) Informational messages.
  589. * - TRIPAL_DEBUG: Debug-level messages.
  590. */
  591. public function logMessage($message, $variables = [], $severity = TRIPAL_INFO) {
  592. // Generate a translated message.
  593. $tmessage = t($message, $variables);
  594. // For the sake of the command-line user, print the message to the
  595. // terminal.
  596. print $tmessage . "\n";
  597. // Add this message to the job's log.
  598. $this->job->error_msg .= "\n" . $tmessage;
  599. // Report this message to watchdog or set a message.
  600. if ($severity == TRIPAL_CRITICAL or $severity == TRIPAL_ERROR) {
  601. tripal_report_error('tripal_job', $severity, $message, $variables);
  602. $this->job->status = 'Error';
  603. }
  604. }
  605. }