فهرست منبع

Tripal Bulk Loader: Sharing changes to increase scalalbility (reduce memory leaked per line of file)

laceysanderson 14 سال پیش
والد
کامیت
1ac3e5af32
2فایلهای تغییر یافته به همراه164 افزوده شده و 124 حذف شده
  1. 163 123
      tripal_bulk_loader/tripal_bulk_loader.loader.inc
  2. 1 1
      tripal_bulk_loader/tripal_bulk_loader.module

+ 163 - 123
tripal_bulk_loader/tripal_bulk_loader.loader.inc

@@ -92,10 +92,14 @@ function tripal_bulk_loader_add_loader_job_form_submit ($form, $form_state) {
  */
 function tripal_bulk_loader_load_data($nid) {
   
+  print "Memory Usage (Start): ".number_format((memory_get_usage() * 0.000000953674316), 5, '.', ',') . " Mb\n";
+  
   $node = node_load($nid);
   print "Template: ".$node->template->name." (".$node->template_id.")\n";
   print "File: ".$node->file."\n";
   
+  print "Memory Usage (After Node Load): ".number_format((memory_get_usage() * 0.000000953674316), 5, '.', ',') . " Mb\n";
+  
   // Prep Work ==================================================================================
   // Generate default values array
   $default_data = array();
@@ -133,7 +137,9 @@ function tripal_bulk_loader_load_data($nid) {
       }    
     } // end of foreach field
   } //end of foreach record
-
+  
+  print "Memory Usage (end of prep work): ".number_format((memory_get_usage() * 0.000000953674316), 5, '.', ',') . " Mb\n";
+    
   //print "\nDefault Values Array: ".print_r($default_data, TRUE)."\n";
   //print "\nField to Column Mapping: ".print_r($field2column, TRUE)."\n";
   
@@ -144,136 +150,34 @@ function tripal_bulk_loader_load_data($nid) {
   $num_lines = 0;
   $num_errors = 0;
   while (!feof($file_handle)) {
-    $line = array();
+
+    // Clear variables
+    // Was added to fix memory leak
+    unset($line);                     unset($raw_line);
+    unset($data);                     unset($data_keys);
+    unset($priority);                 unset($sql);
+    unset($result);                   
+    
     $raw_line = fgets($file_handle, 4096);
     $raw_line = trim($raw_line);
-    if (preg_match('/^\s*$/', $raw_line)) { continue; } // skips blank lines
-    $line = preg_split("/\t/", $raw_line);
+    if (empty($raw_line)) { continue; } // skips blank lines
+    $line = explode("\t", $raw_line);
     $num_lines++;
 
     $data = $default_data;
-    
-    foreach ($data as $priority => $table_data) {
-      $table = $table_data['table'];
-      $values = $table_data['values_array'];
-      
-      //print 'default values:'.print_r($values,TRUE)."\n";
-      if ($table_data['need_further_processing']) {
-        $values = tripal_bulk_loader_add_spreadsheetdata_to_values ($values, $line, $field2column[$priority]);
-        //print 'after spreadsheet values:'.print_r($values,TRUE)."\n";
-        $values = tripal_bulk_loader_add_foreignkey_to_values($values, $data, $record2priority);
-        //print 'after fk values:'.print_r($values,TRUE)."\n";
-      }
-      $values = tripal_bulk_loader_regex_tranform_values($values, $table_data, $line);
-      //print 'after regex values:'.print_r($values,TRUE)."\n";
-
-      if (!$values) {
-        $msg = $table_data['record_id'].' ('.$table_data['mode'].') Aborted due to error in previous record.';
-        watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING); 
-        print "ERROR: ".$msg."\n";
-        $data[$priority]['error'] = TRUE;
-      }
-      
-      $table_desc = module_invoke_all('chado_'.$table.'_schema');
-      if (preg_match('/optional/', $table_array['mode'])) {
-        // Check all db required fields are set
-        $fields = $table_desc['fields'];
-        foreach($fields as $field => $def){
-          // a field is considered missing if it cannot be null and there is no default
-          // value for it or it is of type 'serial'
-          if($def['not null'] == 1 and !array_key_exists($field,$insert_values) and !isset($def['default']) and strcmp($def['type'],serial)!=0){
-             $msg = $table_data['record_id'].' ('.$table_data['mode'].') Missing Database Required Value: '.$table.'.'.$field;
-             watchdog('T_bulk_loader', $msg, array(), WATCHDOG_NOTICE); 
-             $data[$priority]['error'] = TRUE;
-          }
-        }
-      } //end of if optional record
-      
-      // Check required fields are present
-      foreach ($table_data['required'] as $field => $required) {
-        if ($required) {
-          if (!isset($values[$field])) {
-            $msg = $table_data['record_id'].' ('.$table_data['mode'].') Missing Template Required Value: '.$table.'.'.$field;
-            watchdog('T_bulk_loader', $msg, array(), WATCHDOG_NOTICE); 
-            $data[$priority]['error'] = TRUE;            
-          }
-        }
-      }
-      
-      // add new values array into the data array
-      $data[$priority]['values_array'] = $values;
-      
-      // check if it is already inserted
-      if ($table_data['inserted']) {
-        continue;
-      }
-      
-      // if there was an error already -> don't insert
-      if ($data[$priority]['error']) {
-        continue;
-      }
-      
-      // if insert unique then check to ensure unique
-      if (preg_match('/insert_unique/',$table_data['mode'])) {
-        $unique = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values, array('has_record'=>TRUE));
-        //print 'Unique?'.print_r(array('table' => $table, 'columns' => array_keys($table_desc['fields']), 'values' => $values),TRUE).' returns '.$unique."\n";
-        if ($unique > 0) {
-          continue;
-        }
-      }
-      
-        
-      if (!preg_match('/select/',$table_data['mode'])) {
-        $record = tripal_core_chado_insert($table, $values);
-        if (!$record) {
-          $msg = $table_data['record_id'].' ('.$table_data['mode'].') Unable to insert record into '.$table.' where values:'.print_r($values,TRUE);
-          watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR); 
-          print "ERROR: ".$msg."\n";  
-          $data[$priority]['error'] = TRUE;
-        } else {
-          //add changes back to values array
-          $data[$priority]['values_array'] = $record;
-          $values = $record;
-          
-          // if mode=insert_once then ensure we only insert it once
-          if (preg_match('/insert_once/',$table_data['mode'])) {
-            $default_data[$priority]['inserted'] = TRUE;
-          }
-          
-          // add to tripal_bulk_loader_inserted
-          $insert_record = db_fetch_object(db_query(
-            "SELECT * FROM {tripal_bulk_loader_inserted} WHERE table_inserted_into='%s' AND nid=%d",
-            $table,
-            $nid
-          ));
-          if ($insert_record) {
-            $insert_record->ids_inserted .= ',' . $values[ $table_desc['primary key'][0] ];
-            drupal_write_record('tripal_bulk_loader_inserted', $insert_record, 'tripal_bulk_loader_inserted_id');
-            //print 'Update: '.print_r($insert_record,TRUE)."\n";
-          } else {
-            $insert_record = array(
-              'nid' => $nid,
-              'table_inserted_into' => $table,
-              'table_primary_key' => $table_desc['primary key'][0],
-              'ids_inserted' => $values[ $table_desc['primary key'][0] ],
-            );
-            //print 'New: '.print_r($insert_record,TRUE)."\n";
-            $success = drupal_write_record('tripal_bulk_loader_inserted', $insert_record);
-          }//end of if insert record
-          
-        } //end of if insert was successful
-      } else {
-        $exists = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values, array('has_record'=>TRUE));
-        if (!$exists) {
-          // No record on select
-          $msg = $table_data['record_id'].' ('.$table_data['mode'].') No Matching record in '.$table.' where values:'.print_r($values,TRUE);
-          watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING); 
-          $data[$priority]['error'] = TRUE;        
-        }
-      }
 
+    $data_keys = array_keys($data); 
+    foreach ($data_keys as $priority) {
+      process_data_array_for_line($priority, $data, $default_data, $field2column, $record2priority, $line, $nid);
     } // end of foreach table in default data array
 
+    // determine memory increase
+    $line_mem_increase = memory_get_usage() - $memory;
+    if ($num_lines != 1) { 
+      $increased_mem = $increased_mem + $line_mem_increase;
+    }
+    $memory = memory_get_usage();
+    
   } //end of foreach line of file
   
   // check that data was inserted and update job_status
@@ -283,6 +187,142 @@ function tripal_bulk_loader_load_data($nid) {
     $node->job_status = 'Data Inserted';
     drupal_write_record('node',$node,'nid');
   }
+  
+  $avg_line_increase = ( $increased_mem / $num_lines) * 0.0078125;
+  print "Average Increase in Memory per Line: ".number_format($avg_line_increase, 5, '.', ',') . " Kb\n";
+  print "Peak Memory Usage: ".number_format((memory_get_peak_usage() * 0.000000953674316), 5, '.', ',') . " Mb\n";
+  print "End Memory Usage: ".number_format((memory_get_usage() * 0.000000953674316), 5, '.', ',') . " Mb\n";
+
+}
+
+/**
+ * 
+ *
+ */
+function process_data_array_for_line ($priority, &$data, &$default_data, $field2column, $record2priority, $line, $nid) {
+  $table_data = $data[$priority];
+
+  $table = $table_data['table'];
+  $values = $table_data['values_array'];
+  
+  //print 'default values:'.print_r($values,TRUE)."\n";
+  if ($table_data['need_further_processing']) {
+    $values = tripal_bulk_loader_add_spreadsheetdata_to_values ($values, $line, $field2column[$priority]);
+    //print 'after spreadsheet values:'.print_r($values,TRUE)."\n";
+    $values = tripal_bulk_loader_add_foreignkey_to_values($values, $data, $record2priority);
+    //print 'after fk values:'.print_r($values,TRUE)."\n";
+  }
+  $values = tripal_bulk_loader_regex_tranform_values($values, $table_data, $line);
+  //print 'after regex values:'.print_r($values,TRUE)."\n";
+
+  if (!$values) {
+    $msg = $table_data['record_id'].' ('.$table_data['mode'].') Aborted due to error in previous record.';
+    watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING); 
+    print "ERROR: ".$msg."\n";
+    $data[$priority]['error'] = TRUE;
+  }
+  
+  $table_desc = module_invoke_all('chado_'.$table.'_schema');
+  if (preg_match('/optional/', $table_array['mode'])) {
+    // Check all db required fields are set
+    $fields = $table_desc['fields'];
+    foreach($fields as $field => $def){
+      // a field is considered missing if it cannot be null and there is no default
+      // value for it or it is of type 'serial'
+      if($def['not null'] == 1 and !array_key_exists($field,$insert_values) and !isset($def['default']) and strcmp($def['type'],serial)!=0){
+         $msg = $table_data['record_id'].' ('.$table_data['mode'].') Missing Database Required Value: '.$table.'.'.$field;
+         watchdog('T_bulk_loader', $msg, array(), WATCHDOG_NOTICE); 
+         $data[$priority]['error'] = TRUE;
+      }
+    }
+  } //end of if optional record
+  
+  // Check required fields are present
+  foreach ($table_data['required'] as $field => $required) {
+    if ($required) {
+      if (!isset($values[$field])) {
+        $msg = $table_data['record_id'].' ('.$table_data['mode'].') Missing Template Required Value: '.$table.'.'.$field;
+        watchdog('T_bulk_loader', $msg, array(), WATCHDOG_NOTICE); 
+        $data[$priority]['error'] = TRUE;            
+      }
+    }
+  }
+  
+  // add new values array into the data array
+  $data[$priority]['values_array'] = $values;
+  
+  // check if it is already inserted
+  if ($table_data['inserted']) {
+    return FALSE;
+  }
+  
+  // if there was an error already -> don't insert
+  if ($data[$priority]['error']) {
+    return FALSE;
+  }
+  
+  // if insert unique then check to ensure unique
+  if (preg_match('/insert_unique/',$table_data['mode'])) {
+    $unique = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values, array('has_record'=>TRUE));
+    //print 'Unique?'.print_r(array('table' => $table, 'columns' => array_keys($table_desc['fields']), 'values' => $values),TRUE).' returns '.$unique."\n";
+    if ($unique > 0) {
+      $default_data[$priority]['inserted'] = TRUE;
+      return FALSE;
+    }
+  }
+  
+    
+  if (!preg_match('/select/',$table_data['mode'])) {
+    $record = tripal_core_chado_insert($table, $values);
+    if (!$record) {
+      $msg = $table_data['record_id'].' ('.$table_data['mode'].') Unable to insert record into '.$table.' where values:'.print_r($values,TRUE);
+      watchdog('T_bulk_loader', $msg, array(), WATCHDOG_ERROR); 
+      print "ERROR: ".$msg."\n";  
+      $data[$priority]['error'] = TRUE;
+    } else {
+      //add changes back to values array
+      $data[$priority]['values_array'] = $record;
+      $values = $record;
+      
+      // if mode=insert_once then ensure we only insert it once
+      if (preg_match('/insert_once/',$table_data['mode'])) {
+        $default_data[$priority]['inserted'] = TRUE;
+      }
+      
+      // add to tripal_bulk_loader_inserted
+      $insert_record = db_fetch_object(db_query(
+        "SELECT * FROM {tripal_bulk_loader_inserted} WHERE table_inserted_into='%s' AND nid=%d",
+        $table,
+        $nid
+      ));
+      if ($insert_record) {
+        $insert_record->ids_inserted .= ',' . $values[ $table_desc['primary key'][0] ];
+        drupal_write_record('tripal_bulk_loader_inserted', $insert_record, 'tripal_bulk_loader_inserted_id');
+        //print 'Update: '.print_r($insert_record,TRUE)."\n";
+        return TRUE;
+      } else {
+        $insert_record = array(
+          'nid' => $nid,
+          'table_inserted_into' => $table,
+          'table_primary_key' => $table_desc['primary key'][0],
+          'ids_inserted' => $values[ $table_desc['primary key'][0] ],
+        );
+        //print 'New: '.print_r($insert_record,TRUE)."\n";
+        $success = drupal_write_record('tripal_bulk_loader_inserted', $insert_record);
+        return TRUE;
+      }//end of if insert record
+      
+    } //end of if insert was successful
+  } else {
+    $exists = tripal_core_chado_select($table, array_keys($table_desc['fields']), $values, array('has_record'=>TRUE));
+    if (!$exists) {
+      // No record on select
+      $msg = $table_data['record_id'].' ('.$table_data['mode'].') No Matching record in '.$table.' where values:'.print_r($values,TRUE);
+      watchdog('T_bulk_loader', $msg, array(), WATCHDOG_WARNING); 
+      $data[$priority]['error'] = TRUE;        
+    }
+  }
+  return FALSE;
 }
 
 /**

+ 1 - 1
tripal_bulk_loader/tripal_bulk_loader.module

@@ -236,7 +236,7 @@ function tripal_bulk_loader_form ($node){
    $form['has_header'] = array(
     '#type' => 'radios',
     '#title' => 'File has a Header',
-    '#options' => array( 't' => 'Yes', 'f' => 'No'),
+    '#options' => array( 1 => 'Yes', 2 => 'No'),
     '#weight' => -7,
     '#default_value' => $node->file_has_header,
    );