[Bio] / Sprout / ERDBGenerator.pl Repository:
ViewVC logotype

Diff of /Sprout/ERDBGenerator.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1, Tue Sep 30 15:23:55 2008 UTC revision 1.6, Mon Mar 2 22:15:04 2009 UTC
# Line 50  Line 50 
50    
51  =item groups  =item groups
52    
53  List of the table groups to load, or C<*> to load all table groups.  List of the table groups to load. A C<+> at the end of the list indicates that
54    all groups that follow the last-named group in the standard order should
55    be loaded. A C<+> by itself loads all groups in standard order.
56    
57  =back  =back
58    
# Line 95  Line 97 
97    
98  If specified, then this prcoess will terminate after the specified number of  If specified, then this prcoess will terminate after the specified number of
99  section load errors; otherwise, the process will keep going after a section  section load errors; otherwise, the process will keep going after a section
100  error.  error. A value of C<0> means the process will ignore all errors. A value of
101    C<1> means it will stop after the first error. The default is C<1>.
102    
103  =item phone  =item phone
104    
105  Phone number to message when the script is complete.  Phone number to message when the script is complete.
106    
 =item resume  
   
 If specified, then the group list must contain a single group. The specified  
 group and all groups after it in the group list will be processed.  
   
107  =item sections  =item sections
108    
109  Name of a file containing a list of sections to process. If C<*> is specified (the  Name of a file containing a list of sections to process. If C<*> is specified (the
# Line 131  Line 129 
129    
130  Create an event in the RSS feed when an error occurs.  Create an event in the RSS feed when an error occurs.
131    
132    =item label
133    
134    Name of this process, for display during tracing.
135    
136    =item resume
137    
138    If specified, load files that already exist will not be regenerated.
139    
140  =item workers  =item workers
141    
142  If C<0>, then this is considered to be a worker process and the sections in the  If C<0>, then this is considered to be a worker process and the sections in the
# Line 139  Line 145 
145  then the appropriate number of worker processes are generated and the sections  then the appropriate number of worker processes are generated and the sections
146  are assigned to them in a round-robin fashion.  are assigned to them in a round-robin fashion.
147    
148    =item DBD
149    
150    Name of the DBD file. If specified, the DBD must be in the main FIG directory
151    (specified in C<$FIG_Config::fig>). This option allows the use of an alternate
152    DBD during load, so that access to the database by other processes is not
153    compromised.
154    
155  =back  =back
156    
157  =cut  =cut
# Line 149  Line 162 
162                                                clear => ["", "if specified, the entire load directory will be cleared"],                                                clear => ["", "if specified, the entire load directory will be cleared"],
163                                                clean => ["", "if specified, temporary files in the load directory will be deleted"],                                                clean => ["", "if specified, temporary files in the load directory will be deleted"],
164                                                clearGroups => ["", "if specified, pre-exising load files from the groups processed will be deleted"],                                                clearGroups => ["", "if specified, pre-exising load files from the groups processed will be deleted"],
165                                                trace => ["2", "tracing level"],                                                maxErrors => ["1", "if non-zero, the maximum allowed number of section failures"],
                                               workers => ["1", "number of worker processes"],  
166                                                phone => ["", "phone number (international format) to call when load finishes"],                                                phone => ["", "phone number (international format) to call when load finishes"],
167                                                resume => ["", "if specified, the specified group and all groups that normally come after it will be loaded"],                                                trace => ["3", "tracing level"],
168                                                  workers => ["1", "number of worker processes"],
169                                                  label => ["Main", "name of this process"],
170                                                  resume => ["", "if specified, only groups and sections that do not already have load files will be processed"],
171                                                sections => ["*", "name of a file in the database's load directory containing a list of sections to process"],                                                sections => ["*", "name of a file in the database's load directory containing a list of sections to process"],
172                                                maxErrors => ["", "if specified, the maximum allowed number of section failures"],                                                DBD => ["", "if specified, the name of a DBD file in the FIG directory"],
173                                             },                                             },
174                                             "<database> <group1> <group2> ...",                                             "<database> <group1> <group2> ...",
175                                             @ARGV);                                             @ARGV);
176  # This is a list of the options that are for manager scripts only.  # This is a list of the options that are for manager scripts only.
177  my @managerOptions = qw(clear clean clearGroups resume sections);  my @managerOptions = qw(clear clean clearGroups sections);
178  # We're doing heavy pipe stuff, so we need to throw an error on a broken-pipe signal.  # We're doing heavy pipe stuff, so we need to throw an error on a broken-pipe signal.
179  local $SIG{PIPE} = sub { Confess("Broken pipe.") };  local $SIG{PIPE} = sub { Confess("Broken pipe.") };
180  # Insure we catch errors.  # Insure we catch errors.
181  eval {  eval {
182      # Get the parameters.      # Get the parameters.
183      my ($database, @groups) = @parameters;      my ($database, @groups) = @parameters;
184      # Connect to the database.      # Check for an alternate DBD.
185      my $erdb = ERDB::GetDatabase($database);      my $altDBD = ($options->{DBD} ? "$FIG_Config::fig/$options->{DBD}" : undef);
186      # Fix the group list.      # Connect to the database and get its load directory.
187      my @realGroups = ERDBLoadGroup::ComputeGroups($erdb, $options, \@groups);      my $erdb = ERDB::GetDatabase($database, $altDBD);
     # Get the source object and load directory for this database.  
     my $source = $erdb->GetSourceObject();  
188      my $directory = $erdb->LoadDirectory();      my $directory = $erdb->LoadDirectory();
189        # Fix the group list.
190        my @realGroups = ERDBLoadGroup::ComputeGroups($erdb, \@groups);
191      # Are we a worker or a manager?      # Are we a worker or a manager?
192      if ($options->{workers} == 0) {      if ($options->{workers} == 0) {
193          # Yes, we're a worker.          # Yes, we're a worker.
194          Trace("Worker process $$ started.") if T(2);          Trace("Worker process $options->{label} started.") if T(2);
195          LoadFromInput(\*STDIN, $erdb, $source, \@realGroups, $options);          LoadFromInput(\*STDIN, $erdb, \@realGroups, $options);
196      } else {      } else {
197          # Here we're a manager. If the user wants us to clear the directory,          # Here we're a manager. If the user wants us to clear the directory,
198          # do that first.          # do that first.
# Line 188  Line 203 
203              my @files = ERDBGenerate::GetLoadFiles($directory);              my @files = ERDBGenerate::GetLoadFiles($directory);
204              # It's worth noting if we didn't find any.              # It's worth noting if we didn't find any.
205              if (! @files) {              if (! @files) {
206                  Tracer("Load directory is already clear.") if T(2);                  Trace("Load directory is already clear.") if T(2);
207              } else {              } else {
208                  # Delete the files we found.                  # Delete the files we found.
209                  for my $file (@files) {                  for my $file (@files) {
210                      unlink "$directory/$file";                      unlink "$directory/$file";
211                      $deleteCount++;                      $deleteCount++;
212                  }                  }
213                  Tracer("$deleteCount files deleted from load directory during Clear.") if T(2);                  Trace("$deleteCount files deleted from load directory during Clear.") if T(2);
214              }              }
215          } elsif ($options->{clearGroups}) {          } elsif ($options->{clearGroups}) {
216              # Here the user only wants to clear the load files for the specified              # Here the user only wants to clear the load files for the specified
# Line 216  Line 231 
231                      # This is one of our tables, so delete the file.                      # This is one of our tables, so delete the file.
232                      unlink "$directory/$file";                      unlink "$directory/$file";
233                      $deleteCount++;                      $deleteCount++;
234                        Trace("$deleteCount files deleted.") if T(3) && $deleteCount % 100 == 0;
235                  }                  }
236              }              }
237              Trace("$deleteCount files deleted from load directory during ClearGroups.") if T(2);              Trace("$deleteCount files deleted from load directory during ClearGroups.") if T(2);
238          }          }
239            # Delete any leftover kill file if it exists.
240            my $killFileName = ERDBLoadGroup::KillFileName($erdb, $directory);
241            if (-f $killFileName) {
242                Trace("Deleting kill file $killFileName.") if T(2);
243                unlink $killFileName;
244            }
245          # Now we need to get our list of sections. Check to see if the user          # Now we need to get our list of sections. Check to see if the user
246          # supplied a section file.          # supplied a section file.
247          my $sectionFile = $options->{sections};          my $sectionFile = $options->{sections};
# Line 227  Line 249 
249              # No, so we must create one.              # No, so we must create one.
250              $sectionFile = "$directory/Sections$$.txt";              $sectionFile = "$directory/Sections$$.txt";
251              Open(\*SECTIONS, ">$sectionFile");              Open(\*SECTIONS, ">$sectionFile");
252              for my $section ($erdb->SectionList($source)) {              for my $section ($erdb->SectionList()) {
253                  print SECTIONS "$section\n";                  print SECTIONS "$section\n";
254              }              }
255              close SECTIONS;              close SECTIONS;
# Line 251  Line 273 
273          # Are we a sequential load or a multi-worker manager?          # Are we a sequential load or a multi-worker manager?
274          my $numWorkers = $options->{workers};          my $numWorkers = $options->{workers};
275          if ($numWorkers == 1) {          if ($numWorkers == 1) {
276              # We're sequential. We do all the work ourselves.              # We're sequential, so we do all the work ourselves.
277              Trace("Sequential load started.") if T(2);              Trace("Sequential load started.") if T(2);
278              LoadFromInput($ih, $erdb, $source, \@realGroups, \%workerOptions);              LoadFromInput($ih, $erdb, \@realGroups, \%workerOptions);
279          } else {          } else {
280              # Here we need to create the workers. The following array will contain              # Here we need to create the workers. The following array will contain
281              # a list of open file handles. Each one will correspond to a worker.              # a descriptor for each worker.
             # Writing to the file sends a section to a worker.  
282              my @workers = ();              my @workers = ();
283              # Compute the command line to use for the worker.              # Compute the positional parameters to use for the workers.
284              my $command = "$0 " . Tracer::UnparseOptions(\%workerOptions) .              my $commandParms = join(" ", $database, @realGroups);
285                  " $database " . join(" ", @realGroups);              my $command = $0;
             Trace("Worker command is: $command") if T(3);  
286              # Create the workers.              # Create the workers.
287              for (my $i = 0; $i < $numWorkers; $i++) {              for (my $i = 0; $i < $numWorkers; $i++) {
288                  my $oh = Open(undef, "| $command");                  my $label = "$options->{label}$i";
289                  push @workers, $oh;                  $workerOptions{label} = $label;
290                    my $commandOptions = Tracer::UnparseOptions(\%workerOptions);
291                    my $inFile = "$FIG_Config::temp/Pipe-$label.tbl";
292                    my $oh = Open(undef, ">$inFile");
293                    my $command = "$command $commandOptions $commandParms <$inFile >null &";
294                    push @workers, { handle => $oh, label => $label, command => $command };
295              }              }
296              # Now we assign sections to the workers.              # Now we assign sections to the workers.
297                my $w = 0;
298              while (! eof $ih) {              while (! eof $ih) {
299                  # Get the name of the next section.                  # Get the name of the next section.
300                  my $line = <$ih>;                  my $line = <$ih>;
301                  # Get the next worker in rotation.                  # Get the output handle for the next worker in rotation.
302                  my $worker = shift @workers;                  my $wh = $workers[$w]->{handle};
                 push @workers, $worker;  
303                  # Send this section to it.                  # Send this section to it.
304                  print $worker $line;                  print $wh $line;
305                    Trace(Tracer::Strip($line) . " sent to $workers[$w]->{label}") if T(3);
306                    # Position on the next worker.
307                    $w = ($w + 1) % $numWorkers;
308              }              }
309              # All done, wait for the workers to finish.              # All done, close the files.
310              for my $worker (@workers) {              for my $worker (@workers) {
311                  close $worker;                  close $worker->{handle};
312              }              }
313                # Now start the workers.
314                for my $worker (@workers) {
315                    my $cmd = $worker->{command};
316                    Trace("Starting: $cmd") if T(3);
317                    system($worker->{command});
318          }          }
319      }      }
320      Trace("Load manager completed.") if T(2);      Trace("Load manager completed.") if T(2);
321        }
322  };  };
323  if ($@) {  if ($@) {
324      Trace("Script failed with error: $@") if T(0);      Trace("Script failed with error: $@") if T(0);
# Line 300  Line 334 
334      }      }
335  }  }
336    
337    =head2 Internal Methods
338    
339  =head3 LoadFromInput  =head3 LoadFromInput
340    
341      LoadFromInput($ih, $erdb, \@groups, \%options);      LoadFromInput($ih, $erdb, \@groups, \%options);
# Line 318  Line 354 
354    
355  Database object containing information about the tables being loaded.  Database object containing information about the tables being loaded.
356    
 =item source  
   
 Source object used to access the data from which the load file is created.  
   
357  =item groups  =item groups
358    
359  Reference to a list of the names for the load groups to process.  Reference to a list of the names for the load groups to process.
# Line 336  Line 368 
368    
369  sub LoadFromInput {  sub LoadFromInput {
370      # Get the parameters.      # Get the parameters.
371      my ($ih, $erdb, $source, $groups, $options) = @_;      my ($ih, $erdb, $groups, $options) = @_;
     # Create a load object for each group.  
     my %loaders = map { $_ => $erdb->Loader($_, $source, $options) } @{$groups};  
372      # We'll count our errors in here.      # We'll count our errors in here.
373      my $errorCount = 0;      my $errorCount = 0;
374      my $maxErrors = $options->{maxErrors};      my $maxErrors = $options->{maxErrors};
375      # Loop through the sections.      # Compute the kill file name.
376        my $killFileName = ERDBLoadGroup::KillFileName($erdb, $erdb->LoadDirectory());
377        my $killed = 0;
378        # Slurp in the sections.
379        my @sections = ();
380      while (! eof $ih) {      while (! eof $ih) {
381          # Get this section ID.          push @sections, Tracer::GetLine($ih);
382          my ($section) = Tracer::GetLine($ih);      }
383          # Process it for each load group.      # Loop through the groups.
384          for my $group (@$groups) {          for my $group (@$groups) {
385              Trace("Processing section $section for group $group in worker $$.") if T(3);          # Create a loader for this group.
386              my $ok = $loaders{$group}->ProcessSection($section);          my $loader = $erdb->Loader($group, $options);
387              # Check to see if we've exceeded the maximum error count.          # Loop through the sections.
388              if (! $ok && $maxErrors ne '' && $errorCount++ >= $maxErrors) {          for my $section (@sections) {
389                  Confess("Error limit exceeded in database loader.");              # Only proceed if we haven't been killed.
390                if (! $killed) {
391                    # Check for a kill file.
392                    if (-f $killFileName) {
393                        # Found one, so kill ourselves.
394                        Trace("$options->{label} terminated by kill file.") if T(2);
395                        $killed = 1;
396                    } else {
397                        # No kill file, so we process the section.
398                        Trace("Processing section $section for group $group in $options->{label}.") if T(3);
399                        my $ok = $loader->ProcessSection($section);
400                        # Check to see if we've exceeded the maximum error count. We only care
401                        # if maxErrors is nonzero.
402                        if (! $ok && $maxErrors && ++$errorCount >= $maxErrors) {
403                            Trace("Error limit exceeded in database loader.") if T(0);
404                            $killed = 1;
405              }              }
406          }          }
407      }      }
     # Now we display the statistics for each group.  
     for my $group (@$groups) {  
         Trace("Statistics for group $group\n" . $loaders{$group}->DisplayStats()) if T(2);  
408      }      }
409      Trace("Processing finished for worker $$.") if T(2);          Trace("Statistics for $group in $options->{label}:\n" . $loader->DisplayStats()) if T(2);
410        }
411        Trace("Processing finished for worker $options->{label}.") if T(2);
412  }  }
413    
414  1;  1;

Legend:
Removed from v.1.1  
changed lines
  Added in v.1.6

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3