TripalJob.inc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. <?php
  2. class TripalJob {
  3. /**
  4. * The ID of the job.
  5. */
  6. protected $job_id = NULL;
  7. /**
  8. * Contains the job record for this job.
  9. */
  10. protected $job = NULL;
  11. /**
  12. * The number of items that this importer needs to process. A progress
  13. * can be calculated by dividing the number of items process by this
  14. * number.
  15. */
  16. private $total_items;
  17. /**
  18. * The number of items that have been handled so far. This must never
  19. * be below 0 and never exceed $total_items;
  20. */
  21. private $num_handled;
  22. /**
  23. * The interval when the job progress should be updated. Updating the job
  24. * progress incurrs a database write which takes time and if it occurs to
  25. * frequently can slow down the loader. This should be a value between
  26. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  27. * progress every time the num_handled increases by 1%).
  28. */
  29. private $interval;
  30. /**
  31. * Each time the job progress is updated this variable gets set. It is
  32. * used to calculate if the $interval has passed for the next update.
  33. */
  34. private $prev_update;
  35. /**
  36. * Instantiates a new TripalJob object.
  37. *
  38. * By default the job object is "empty". It must be associated with
  39. * job details either by calling the load() function or the
  40. * create() function.
  41. */
  42. public function __construct() {
  43. }
  44. /**
  45. * Loads a job for this object.
  46. *
  47. * @param $job_id
  48. * The ID of the job.
  49. */
  50. public function load($job_id) {
  51. // Make sure we have a numeric job_id.
  52. if (!$job_id or !is_numeric($job_id)) {
  53. // If we don't then do a quick double check in case this is a
  54. // TripalJob object in which case, I still have the job_id.
  55. if (is_object($job_id) AND is_a($job_id, 'TripalJob')) {
  56. $job_id = $job_id->job->job_id;
  57. }
  58. // Finally just throw an exception.
  59. // I can't load a job if I don't know which one.
  60. else {
  61. throw new Exception("You must provide the job_id to load the job.");
  62. }
  63. }
  64. $sql = 'SELECT j.* FROM {tripal_jobs} j WHERE j.job_id = :job_id';
  65. $args = array(':job_id' => $job_id);
  66. $this->job = db_query($sql, $args)->fetchObject();
  67. if (!$this->job) {
  68. throw new Exception("Cannot find a job with this ID provided.");
  69. }
  70. // Fix the date/time fields.
  71. $this->job->submit_date_string = $this->job->submit_date ? format_date($this->job->submit_date) : '';
  72. $this->job->start_time_string = $this->job->start_time ? format_date($this->job->start_time): '';
  73. $this->job->end_time_string = $this->job->end_time ? format_date($this->job->end_time): '';
  74. // Unserialize the includes.
  75. $this->job->includes = unserialize($this->job->includes);
  76. // Arguments for jobs used to be stored as plain string with a double colon
  77. // separating them. But as of Tripal v2.0 the arguments are stored as
  78. // a serialized array. To be backwards compatible, we should check for
  79. // serialization and if not then we will use the old style
  80. $this->job->arguments = unserialize($this->job->arguments);
  81. if (!is_array($this->job->arguments)) {
  82. $this->job->arguments = explode("::", $this->job->arguments);
  83. }
  84. }
  85. /**
  86. * Creates a new job.
  87. *
  88. * @param $details
  89. * An associative array of the job details or a single job_id. If the
  90. * details are provided then the job is created and added to the database
  91. * otherwise if a job_id is provided then the object is loaded from the
  92. * database. The following keys are allowed:
  93. * - job_name: The human readable name for the job.
  94. * - modulename: The name of the module adding the job.
  95. * - callback: The name of a function to be called when the job is executed.
  96. * - arguments: An array of arguments to be passed on to the callback.
  97. * - uid: The uid of the user adding the job
  98. * - priority: The priority at which to run the job where the highest
  99. * priority is 10 and the lowest priority is 1. The default
  100. * priority is 10.
  101. * - includes: An array of paths to files that should be included in order
  102. * to execute the job. Use the module_load_include function to get a path
  103. * for a given file.
  104. * - ignore_duplicate: (Optional). Set to TRUE to ignore a job if it has
  105. * the same name as another job which has not yet run. If TRUE and a job
  106. * already exists then this object will reference the job already in the
  107. * queue rather than a new submission. The default is TRUE.
  108. *
  109. * @throws Exception
  110. * On failure an exception is thrown.
  111. *
  112. * @return
  113. * Returns TRUE if the job was succesfully created. Returns FALSE otherwise.
  114. * A return of FALSE does not mean the job creation failed. If the
  115. * ignore_duplicate is set to false and the job already is present in the
  116. * queue then the return value will be FALSE.
  117. */
  118. public function create($details) {
  119. // Set some defaults
  120. if (!array_key_exists('prority', $details)) {
  121. $details['priority'] = 10;
  122. }
  123. if (!array_key_exists('includes', $details)) {
  124. $details['includes'] = array();
  125. }
  126. if (!array_key_exists('ignore_duplicate', $details)) {
  127. $details['ignore_duplicate'] = FALSE;
  128. }
  129. // Make sure the arguments are correct.
  130. if (!$details['job_name']) {
  131. throw new Exception("Must provide a 'job_name' to create a job.");
  132. }
  133. if (!$details['modulename']) {
  134. throw new Exception("Must provide a 'modulename' to create a job.");
  135. }
  136. if (!$details['callback']) {
  137. throw new Exception("Must provide a 'callback' to create a job.");
  138. }
  139. if ($details['ignore_duplicate'] !== FALSE and $details['ignore_duplicate'] !== TRUE) {
  140. throw new Exception("Must provide either TRUE or FALSE for the ignore_duplicate option when creating a job.");
  141. }
  142. $includes = $details['includes'];
  143. if ($includes and is_array($includes)) {
  144. foreach ($includes as $path) {
  145. $full_path = $_SERVER['DOCUMENT_ROOT'] . base_path() . $path;
  146. if (!empty($path)) {
  147. if (file_exists($path)) {
  148. require_once($path);
  149. }
  150. elseif (file_exists($full_path)) {
  151. require_once($path);
  152. }
  153. elseif (!empty($path)) {
  154. throw new Exception("Included files for Tripal Job must exist. This path ($full_path) doesn't exist.");
  155. }
  156. }
  157. }
  158. }
  159. if (!function_exists($details['callback'])) {
  160. throw new Exception("Must provide a valid callback function to the tripal_add_job() function.");
  161. }
  162. if (!is_numeric($details['uid'])) {
  163. throw new Exception("Must provide a numeric \$uid argument to the tripal_add_job() function.");
  164. }
  165. $priority = $details['priority'];
  166. if (!$priority or !is_numeric($priority) or $priority < 1 or $priority > 10) {
  167. throw new Exception("Must provide a numeric \$priority argument between 1 and 10 to the tripal_add_job() function.");
  168. }
  169. $arguments = $details['arguments'];
  170. if (!is_array($arguments)) {
  171. throw new Exception("Must provide an array as the \$arguments argument to the tripal_add_job() function.");
  172. }
  173. // convert the arguments into a string for storage in the database
  174. $args = array();
  175. if (is_array($arguments)) {
  176. $args = serialize($arguments);
  177. }
  178. try {
  179. // Before inserting a new record, and if ignore_duplicate is TRUE then
  180. // check to see if the job already exists.
  181. if ($details['ignore_duplicate'] === TRUE) {
  182. $query = db_select('tripal_jobs', 'tj');
  183. $query->fields('tj', array('job_id'));
  184. $query->condition('job_name', $details['job_name']);
  185. $query->isNull('start_time');
  186. $job_id = $query->execute()->fetchField();
  187. if ($job_id) {
  188. $this->load($job_id);
  189. return FALSE;
  190. }
  191. }
  192. $job_id = db_insert('tripal_jobs')
  193. ->fields(array(
  194. 'job_name' => $details['job_name'],
  195. 'modulename' => $details['modulename'],
  196. 'callback' => $details['callback'],
  197. 'status' => 'Waiting',
  198. 'submit_date' => time(),
  199. 'uid' => $details['uid'],
  200. 'priority' => $priority,
  201. 'arguments' => $args,
  202. 'includes' => serialize($includes),
  203. ))
  204. ->execute();
  205. // Now load the job into this object.
  206. $this->load($job_id);
  207. return TRUE;
  208. }
  209. catch (Exception $e) {
  210. throw new Exception('Cannot create job: ' . $e->getMessage());
  211. }
  212. }
  213. /**
  214. * Cancels the job and prevents it from running.
  215. */
  216. public function cancel() {
  217. if (!$this->job) {
  218. throw new Exception("There is no job associated with this object. Cannot cancel");
  219. }
  220. if ($this->job->status == 'Running') {
  221. throw new Exception("Job Cannot be cancelled it is currently running.");
  222. }
  223. if ($this->job->status == 'Completed') {
  224. throw new Exception("Job Cannot be cancelled it has already finished.");
  225. }
  226. if ($this->job->status == 'Error') {
  227. throw new Exception("Job Cannot be cancelled it is in an error state.");
  228. }
  229. if ($this->job->status == 'Cancelled') {
  230. throw new Exception("Job Cannot be cancelled it is already cancelled.");
  231. }
  232. // Set the end time for this job.
  233. try {
  234. if ($this->job->start_time == 0) {
  235. $record = new stdClass();
  236. $record->job_id = $this->job->job_id;
  237. $record->status = 'Cancelled';
  238. $record->progress = '0';
  239. drupal_write_record('tripal_jobs', $record, 'job_id');
  240. }
  241. }
  242. catch (Exception $e) {
  243. throw new Exception('Cannot cancel job: ' . $e->getMessage());
  244. }
  245. }
  246. /**
  247. * Executes the job.
  248. */
  249. public function run() {
  250. if (!$this->job) {
  251. throw new Exception('Cannot launch job as no job is associated with this object.');
  252. }
  253. try {
  254. // Include the necessary files needed to run the job.
  255. if (is_array($this->job->includes)) {
  256. foreach ($this->job->includes as $path) {
  257. if ($path) {
  258. require_once $path;
  259. }
  260. }
  261. }
  262. // Set the start time for this job.
  263. $record = new stdClass();
  264. $record->job_id = $this->job->job_id;
  265. $record->start_time = time();
  266. $record->status = 'Running';
  267. $record->pid = getmypid();
  268. drupal_write_record('tripal_jobs', $record, 'job_id');
  269. // Callback functions need the job in order to update
  270. // progress. But prior to Tripal v3 the job callback functions
  271. // only accepted a $job_id as the final argument. So, we need
  272. // to see if the callback is Tv3 compatible or older. If older
  273. // we want to still support it and pass the job_id.
  274. $arguments = $this->job->arguments;
  275. $callback = $this->job->callback;
  276. $ref = new ReflectionFunction($callback);
  277. $refparams = $ref->getParameters();
  278. if (count($refparams) > 0) {
  279. $lastparam = $refparams[count($refparams)-1];
  280. if ($lastparam->getName() == 'job_id') {
  281. $arguments[] = $this->job->job_id;
  282. }
  283. else {
  284. $arguments[] = $this;
  285. }
  286. }
  287. // Launch the job.
  288. call_user_func_array($callback, $arguments);
  289. // Set the end time for this job.
  290. $record = new stdClass();
  291. $record->job_id = $this->job->job_id;
  292. $record->end_time = time();
  293. $record->error_msg = $this->job->error_msg;
  294. $record->progress = 100;
  295. $record->status = 'Completed';
  296. $record->pid = '';
  297. drupal_write_record('tripal_jobs', $record, 'job_id');
  298. $this->load($this->job->job_id);
  299. }
  300. catch (Exception $e) {
  301. $record->end_time = time();
  302. $record->error_msg = $this->job->error_msg;
  303. $record->progress = $this->job->progress;
  304. $record->status = 'Error';
  305. $record->pid = '';
  306. drupal_write_record('tripal_jobs', $record, 'job_id');
  307. drupal_set_message('Job execution failed: ' . $e->getMessage(), 'error');
  308. }
  309. }
  310. /**
  311. * Inidcates if the job is running.
  312. *
  313. * @return
  314. * TRUE if the job is running, FALSE otherwise.
  315. */
  316. public function isRunning() {
  317. if (!$this->job) {
  318. throw new Exception('Cannot check running status as no job is associated with this object.');
  319. }
  320. $status = shell_exec('ps -p ' . escapeshellarg($this->job->pid) . ' -o pid=');
  321. if ($this->job->pid && $status) {
  322. // The job is still running.
  323. return TRUE;
  324. }
  325. // return FALSE to indicate that no jobs are currently running.
  326. return FALSE;
  327. }
  328. /**
  329. * Retrieve the job object as if from a database query.
  330. */
  331. public function getJob(){
  332. return $this->job;
  333. }
  334. /**
  335. * Retrieves the job ID.
  336. */
  337. public function getJobID(){
  338. return $this->job->job_id;
  339. }
  340. /**
  341. * Retrieves the user ID of the user that submitted the job.
  342. */
  343. public function getUID() {
  344. return $this->job->uid;
  345. }
  346. /**
  347. * Retrieves the job name.
  348. */
  349. public function getJobName() {
  350. return $this->job->job_name;
  351. }
  352. /**
  353. * Retrieves the name of the module that submitted the job.
  354. */
  355. public function getModuleName() {
  356. return $this->job->modulename;
  357. }
  358. /**
  359. * Retrieves the callback function for the job.
  360. */
  361. public function getCallback() {
  362. return $this->job->callback;
  363. }
  364. /**
  365. * Retrieves the array of arguments for the job.
  366. */
  367. public function getArguments() {
  368. return $this->job->arguments;
  369. }
  370. /**
  371. * Retrieves the current percent complete (i.e. progress) of the job.
  372. */
  373. public function getProgress() {
  374. return $this->job->progress;
  375. }
  376. /**
  377. * Sets the current percent complete of a job.
  378. *
  379. * @param $percent_done
  380. * A value between 0 and 100 indicating the percentage complete of the job.
  381. */
  382. public function setProgress($percent_done) {
  383. if (!$this->job) {
  384. throw new Exception('Cannot set progress as no job is associated with this object.');
  385. }
  386. $this->job->progress = $percent_done;
  387. $progress = sprintf("%d", $percent_done);
  388. db_update('tripal_jobs')
  389. ->fields(array(
  390. 'progress' => $progress,
  391. ))
  392. ->condition('job_id', $this->job->job_id)
  393. ->execute();
  394. }
  395. /**
  396. * Sets the total number if items to be processed.
  397. *
  398. * This should typically be called near the beginning of the loading process
  399. * to indicate the number of items that must be processed.
  400. *
  401. * @param $total_items
  402. * The total number of items to process.
  403. */
  404. public function setTotalItems($total_items) {
  405. $this->total_items = $total_items;
  406. }
  407. /**
  408. * Adds to the count of the total number of items that have been handled.
  409. *
  410. * @param $num_handled
  411. */
  412. public function addItemsHandled($num_handled) {
  413. $items_handled = $this->num_handled = $this->num_handled + $num_handled;
  414. $this->setItemsHandled($items_handled);
  415. }
  416. /**
  417. * Sets the number of items that have been processed.
  418. *
  419. * This should be called anytime the loader wants to indicate how many
  420. * items have been processed. The amount of progress will be
  421. * calculated using this number. If the amount of items handled exceeds
  422. * the interval specified then the progress is reported to the user. If
  423. * this loader is associated with a job then the job progress is also updated.
  424. *
  425. * @param $total_handled
  426. * The total number of items that have been processed.
  427. */
  428. public function setItemsHandled($total_handled) {
  429. // First set the number of items handled.
  430. $this->num_handled = $total_handled;
  431. if ($total_handled == 0) {
  432. $memory = number_format(memory_get_usage());
  433. print "Percent complete: 0%. Memory: " . $memory . " bytes.\r";
  434. return;
  435. }
  436. // Now see if we need to report to the user the percent done. A message
  437. // will be printed on the command-line if the job is run there.
  438. $percent = sprintf("%.2f", ($this->num_handled / $this->total_items) * 100);
  439. $diff = $percent - $this->prev_update;
  440. if ($diff >= $this->interval) {
  441. $memory = number_format(memory_get_usage());
  442. print "Percent complete: " . $percent . "%. Memory: " . $memory . " bytes.\r";
  443. $this->prev_update = $diff;
  444. $this->setProgress($percent);
  445. }
  446. }
  447. /**
  448. * Updates the percent interval when the job progress is updated.
  449. *
  450. * Updating the job
  451. * progress incurrs a database write which takes time and if it occurs to
  452. * frequently can slow down the loader. This should be a value between
  453. * 0 and 100 to indicate a percent interval (e.g. 1 means update the
  454. * progress every time the num_handled increases by 1%).
  455. *
  456. * @param $interval
  457. * A number between 0 and 100.
  458. */
  459. public function setInterval($interval) {
  460. $this->interval = $interval;
  461. }
  462. /**
  463. * Retrieves the status of the job.
  464. */
  465. public function getStatus() {
  466. return $this->job->status;
  467. }
  468. /**
  469. * Retrieves the time the job was submitted.
  470. */
  471. public function getSubmitTime() {
  472. return $this->job->submit_date;
  473. }
  474. /**
  475. * Retieves the time the job began execution (i.e. the start time).
  476. */
  477. public function getStartTime() {
  478. return $this->job->start_time;
  479. }
  480. /**
  481. * Retieves the time the job completed execution (i.e. the end time).
  482. */
  483. public function getEndTime() {
  484. return $this->job->end_time;
  485. }
  486. /**
  487. * Retieves the log for the job.
  488. *
  489. * @return
  490. * A large string containing the text of the job log. It contains both
  491. * status upates and errors.
  492. */
  493. public function getLog() {
  494. return $this->job->error_msg;
  495. }
  496. /**
  497. * Retrieves the process ID of the job.
  498. */
  499. public function getPID() {
  500. return $this->job->pid;
  501. }
  502. /**
  503. * Retreieves the priority that is currently set for the job.
  504. */
  505. public function getPriority() {
  506. return $this->job->priority;
  507. }
  508. /**
  509. * Get the MLock value of the job.
  510. *
  511. * The MLock value indicates if no other jobs from a give module
  512. * should be executed while this job is running.
  513. */
  514. public function getMLock() {
  515. return $this->job->mlock;
  516. }
  517. /**
  518. * Get the lock value of the job.
  519. *
  520. * The lock value indicates if no other jobs from any module
  521. * should be executed while this job is running.
  522. */
  523. public function getLock() {
  524. return $this->job->lock;
  525. }
  526. /**
  527. * Get the list of files that must be included prior to job execution.
  528. */
  529. public function getIncludes() {
  530. return $this->job->includes;
  531. }
  532. /**
  533. * Logs a message for the job.
  534. *
  535. * There is no distinction between status messages and error logs. Any
  536. * message that is intended for the user to review the status of the job
  537. * can be provided here.
  538. *
  539. * Messages that are are of severity TRIPAL_CRITICAL or TRIPAL_ERROR
  540. * are also logged to the watchdog.
  541. *
  542. * Logging works regardless if the job uses a transaction. If the
  543. * transaction must be rolled back to to an error the error messages will
  544. * persist.
  545. *
  546. * @param $message
  547. * The message to store in the log. Keep $message translatable by not
  548. * concatenating dynamic values into it! Variables in the message should
  549. * be added by using placeholder strings alongside the variables argument
  550. * to declare the value of the placeholders. See t() for documentation on
  551. * how $message and $variables interact.
  552. * @param $variables
  553. * Array of variables to replace in the message on display or NULL if
  554. * message is already translated or not possible to translate.
  555. * @param $severity
  556. * The severity of the message; one of the following values:
  557. * - TRIPAL_CRITICAL: Critical conditions.
  558. * - TRIPAL_ERROR: Error conditions.
  559. * - TRIPAL_WARNING: Warning conditions.
  560. * - TRIPAL_NOTICE: Normal but significant conditions.
  561. * - TRIPAL_INFO: (default) Informational messages.
  562. * - TRIPAL_DEBUG: Debug-level messages.
  563. */
  564. public function logMessage($message, $variables = array(), $severity = TRIPAL_INFO) {
  565. // Generate a translated message.
  566. $tmessage = t($message, $variables);
  567. // For the sake of the command-line user, print the message to the
  568. // terminal.
  569. print $tmessage . "\n";
  570. // Add this message to the job's log.
  571. $this->job->error_msg .= "\n" . $tmessage;
  572. // Report this message to watchdog or set a message.
  573. if ($severity == TRIPAL_CRITICAL or $severity == TRIPAL_ERROR) {
  574. tripal_report_error('tripal_job', $severity, $message, $variables);
  575. $this->job->status = 'Error';
  576. }
  577. }
  578. }