[Bio] / Sprout / ERDBGenerator.pl Repository:
ViewVC logotype

Annotation of /Sprout/ERDBGenerator.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (view) (download) (as text)

1 : parrello 1.1 #!/usr/bin/perl -w
2 :    
3 :     #
4 :     # Copyright (c) 2003-2006 University of Chicago and Fellowship
5 :     # for Interpretations of Genomes. All Rights Reserved.
6 :     #
7 :     # This file is part of the SEED Toolkit.
8 :     #
9 :     # The SEED Toolkit is free software. You can redistribute
10 :     # it and/or modify it under the terms of the SEED Toolkit
11 :     # Public License.
12 :     #
13 :     # You should have received a copy of the SEED Toolkit Public License
14 :     # along with this program; if not write to the University of Chicago
15 :     # at info@ci.uchicago.edu or the Fellowship for Interpretation of
16 :     # Genomes at veronika@thefig.info or download a copy from
17 :     # http://www.theseed.org/LICENSE.TXT.
18 :     #
19 :    
20 :     use strict;
21 :     use Tracer;
22 :     use ERDB;
23 :     use ERDBGenerate;
24 :     use ERDBLoadGroup;
25 :    
26 :     =head1 ERDBGenerator Script
27 :    
28 :     ERDBGenerator [options] database group1 group2 ...
29 :    
30 :     Generate ERDB table load files
31 :    
32 :     =head2 Introduction
33 :    
34 :     This script manages the generation of load files for an ERDB database. It can
35 :     either function as a worker process that reads section IDs from the standard
36 :     input and generates the load files for each, or it can function as a management
37 :     process that starts a bunch of workers and gives them work.
38 :    
39 :     The positional parameters include a list of load groups to process and the name
40 :     of the database.
41 :    
42 :     =head2 Positional Parameters
43 :    
44 :     =over 4
45 :    
46 :     =item database
47 :    
48 :     Name of the ERDB database. This should be the class name for the [[ErdbPm]]
49 :     subclass used to access the database.
50 :    
51 :     =item groups
52 :    
53 :     List of the table groups to load, or C<*> to load all table groups.
54 :    
55 :     =back
56 :    
57 :     =head2 Command-Line Options
58 :    
59 :     =over 4
60 :    
61 :     =item background
62 :    
63 :     Save the standard and error output to files. The files will be created
64 :     in the FIG temporary directory and will be named C<err>I<User>C<.log> and
65 :     C<out>I<User>C<.log>, respectively, where I<User> is the value of the
66 :     B<user> option above.
67 :    
68 :     =item clean
69 :    
70 :     Remove temporary files from the load directory. Use this option with care,
71 :     since it will crash if a worker process is still running.
72 :    
73 :     =item clear
74 :    
75 :     If specified, all generated files in the load directory with a C<dt>X suffix
76 :     will be erased. This restores the load directory to a pristine, pre-loading state.
77 :    
78 :     =item clearGroups
79 :    
80 :     If specified, all generated files related to each specified group will be
81 :     erased prior to any further processing. This is useful if a single group
82 :     needs to be reloaded and we don't want to be confused by files leftover
83 :     from previous loads.
84 :    
85 :     =item forked
86 :    
87 :     If specified, then the trace file will not be erased during initialization.
88 :     This prevents the worker processes from stepping on each other's trace output.
89 :    
90 :     =item help
91 :    
92 :     Display this command's parameters and options.
93 :    
94 :     =item maxErrors
95 :    
96 :     If specified, then this prcoess will terminate after the specified number of
97 :     section load errors; otherwise, the process will keep going after a section
98 :     error.
99 :    
100 :     =item phone
101 :    
102 :     Phone number to message when the script is complete.
103 :    
104 :     =item resume
105 :    
106 :     If specified, then the group list must contain a single group. The specified
107 :     group and all groups after it in the group list will be processed.
108 :    
109 :     =item sections
110 :    
111 :     Name of a file containing a list of sections to process. If C<*> is specified (the
112 :     default), all sections are processed. This options is ignored if C<workers> is
113 :     C<0>. In that case, the list of sections is taken from the standard input. When
114 :     a file name is specified, if it is not an absolute file name, it is presumed to
115 :     be in the database's default load directory.
116 :    
117 :     =item sql
118 :    
119 :     If specified, turns on tracing of SQL activity.
120 :    
121 :     =item trace
122 :    
123 :     Specifies the tracing level. The higher the tracing level, the more messages
124 :     will appear in the trace log. Use E to specify emergency tracing.
125 :    
126 :     =item user
127 :    
128 :     Name suffix to be used for log files. If omitted, the PID is used.
129 :    
130 :     =item warn
131 :    
132 :     Create an event in the RSS feed when an error occurs.
133 :    
134 :     =item workers
135 :    
136 :     If C<0>, then this is considered to be a worker process and the sections in the
137 :     standard input are processed. If C<1>, then all sections are processed without
138 :     any parallelism and the standard input is ignored. If it is any other number,
139 :     then the appropriate number of worker processes are generated and the sections
140 :     are assigned to them in a round-robin fashion.
141 :    
142 :     =back
143 :    
144 :     =cut
145 :    
146 :     # Get the command-line options and parameters.
147 :     my ($options, @parameters) = StandardSetup([qw(ERDBLoadGroup ERDBGenerate ERDB Stats) ],
148 :     {
149 :     clear => ["", "if specified, the entire load directory will be cleared"],
150 :     clean => ["", "if specified, temporary files in the load directory will be deleted"],
151 :     clearGroups => ["", "if specified, pre-exising load files from the groups processed will be deleted"],
152 :     trace => ["2", "tracing level"],
153 :     workers => ["1", "number of worker processes"],
154 :     phone => ["", "phone number (international format) to call when load finishes"],
155 :     resume => ["", "if specified, the specified group and all groups that normally come after it will be loaded"],
156 :     sections => ["*", "name of a file in the database's load directory containing a list of sections to process"],
157 :     maxErrors => ["", "if specified, the maximum allowed number of section failures"],
158 :     },
159 :     "<database> <group1> <group2> ...",
160 :     @ARGV);
161 :     # This is a list of the options that are for manager scripts only.
162 :     my @managerOptions = qw(clear clean clearGroups resume sections);
163 :     # We're doing heavy pipe stuff, so we need to throw an error on a broken-pipe signal.
164 :     local $SIG{PIPE} = sub { Confess("Broken pipe.") };
165 :     # Insure we catch errors.
166 :     eval {
167 :     # Get the parameters.
168 :     my ($database, @groups) = @parameters;
169 :     # Connect to the database.
170 :     my $erdb = ERDB::GetDatabase($database);
171 :     # Fix the group list.
172 :     my @realGroups = ERDBLoadGroup::ComputeGroups($erdb, $options, \@groups);
173 :     # Get the source object and load directory for this database.
174 :     my $source = $erdb->GetSourceObject();
175 :     my $directory = $erdb->LoadDirectory();
176 :     # Are we a worker or a manager?
177 :     if ($options->{workers} == 0) {
178 :     # Yes, we're a worker.
179 :     Trace("Worker process $$ started.") if T(2);
180 :     LoadFromInput(\*STDIN, $erdb, $source, \@realGroups, $options);
181 :     } else {
182 :     # Here we're a manager. If the user wants us to clear the directory,
183 :     # do that first.
184 :     if ($options->{clear}) {
185 :     # Count the number of files deleted.
186 :     my $deleteCount = 0;
187 :     # Get a list of the applicable file names.
188 :     my @files = ERDBGenerate::GetLoadFiles($directory);
189 :     # It's worth noting if we didn't find any.
190 :     if (! @files) {
191 :     Tracer("Load directory is already clear.") if T(2);
192 :     } else {
193 :     # Delete the files we found.
194 :     for my $file (@files) {
195 :     unlink "$directory/$file";
196 :     $deleteCount++;
197 :     }
198 :     Tracer("$deleteCount files deleted from load directory during Clear.") if T(2);
199 :     }
200 :     } elsif ($options->{clearGroups}) {
201 :     # Here the user only wants to clear the load files for the specified
202 :     # groups. This operation requires significantly greater care. Get
203 :     # the hash of groups to table names.
204 :     my $groupHash = ERDBLoadGroup::GetGroupHash($erdb);
205 :     # Get a list of the files in this directory in alphabetical order.
206 :     my @files = ERDBGenerate::GetLoadFiles($directory);
207 :     # Get a hash of all the tables to be deleted.
208 :     my %tables = map { $_ => 1 } map { @{$groupHash->{$_}} } @realGroups;
209 :     # We'll count the number of files deleted in here.
210 :     my $deleteCount = 0;
211 :     # Loop through all the files in the directory.
212 :     for my $file (@files) {
213 :     # Extract the relevant table name from the file.
214 :     my ($table) = ERDBGenerate::ParseFileName($file);
215 :     if ($tables{$table}) {
216 :     # This is one of our tables, so delete the file.
217 :     unlink "$directory/$file";
218 :     $deleteCount++;
219 :     }
220 :     }
221 :     Trace("$deleteCount files deleted from load directory during ClearGroups.") if T(2);
222 :     }
223 :     # Now we need to get our list of sections. Check to see if the user
224 :     # supplied a section file.
225 :     my $sectionFile = $options->{sections};
226 :     if ($sectionFile eq "*") {
227 :     # No, so we must create one.
228 :     $sectionFile = "$directory/Sections$$.txt";
229 :     Open(\*SECTIONS, ">$sectionFile");
230 :     for my $section ($erdb->SectionList($source)) {
231 :     print SECTIONS "$section\n";
232 :     }
233 :     close SECTIONS;
234 :     } elsif ($sectionFile =~ m#^\w#) {
235 :     # Yes, but it doesn't have a directory name, so add one.
236 :     $sectionFile = "$directory/$sectionFile";
237 :     }
238 :     # Compute the options to be used for worker processes (or ourselves if
239 :     # we're sequential).
240 :     my %workerOptions = %{$options};
241 :     # Get rid of the manager-only options.
242 :     for my $optionID (@managerOptions) {
243 :     delete $workerOptions{$optionID};
244 :     }
245 :     # Insure the worker knows what it is.
246 :     $workerOptions{workers} = 0;
247 :     $workerOptions{forked} = 1;
248 :     $workerOptions{background} = 1;
249 :     # Prepare to read the section file.
250 :     my $ih = Open(undef, "<$sectionFile");
251 :     # Are we a sequential load or a multi-worker manager?
252 :     my $numWorkers = $options->{workers};
253 :     if ($numWorkers == 1) {
254 :     # We're sequential. We do all the work ourselves.
255 :     Trace("Sequential load started.") if T(2);
256 :     LoadFromInput($ih, $erdb, $source, \@realGroups, \%workerOptions);
257 :     } else {
258 :     # Here we need to create the workers. The following array will contain
259 :     # a list of open file handles. Each one will correspond to a worker.
260 :     # Writing to the file sends a section to a worker.
261 :     my @workers = ();
262 :     # Compute the command line to use for the worker.
263 :     my $command = "$0 " . Tracer::UnparseOptions(\%workerOptions) .
264 :     " $database " . join(" ", @realGroups);
265 :     Trace("Worker command is: $command") if T(3);
266 :     # Create the workers.
267 :     for (my $i = 0; $i < $numWorkers; $i++) {
268 :     my $oh = Open(undef, "| $command");
269 :     push @workers, $oh;
270 :     }
271 :     # Now we assign sections to the workers.
272 :     while (! eof $ih) {
273 :     # Get the name of the next section.
274 :     my $line = <$ih>;
275 :     # Get the next worker in rotation.
276 :     my $worker = shift @workers;
277 :     push @workers, $worker;
278 :     # Send this section to it.
279 :     print $worker $line;
280 :     }
281 :     # All done, wait for the workers to finish.
282 :     for my $worker (@workers) {
283 :     close $worker;
284 :     }
285 :     }
286 :     }
287 :     Trace("Load manager completed.") if T(2);
288 :     };
289 :     if ($@) {
290 :     Trace("Script failed with error: $@") if T(0);
291 :     } else {
292 :     Trace("Script complete.") if T(2);
293 :     }
294 :     if ($options->{phone}) {
295 :     my $msgID = Tracer::SendSMS($options->{phone}, "ERDBGenerator has ended.");
296 :     if ($msgID) {
297 :     Trace("Phone message sent with ID $msgID.") if T(2);
298 :     } else {
299 :     Trace("Phone message not sent.") if T(2);
300 :     }
301 :     }
302 :    
303 :     =head3 LoadFromInput
304 :    
305 :     LoadFromInput($ih, $erdb, \@groups, \%options);
306 :    
307 :     Load one or more sections of data for the specified table groups. The IDs
308 :     of the data sections will be read from the standard input. The groups
309 :     will be loaded in the order specified, once per section.
310 :    
311 :     =over 4
312 :    
313 :     =item ih
314 :    
315 :     File handle for the input stream containing the list of sections to process.
316 :    
317 :     =item erdb
318 :    
319 :     Database object containing information about the tables being loaded.
320 :    
321 :     =item source
322 :    
323 :     Source object used to access the data from which the load file is created.
324 :    
325 :     =item groups
326 :    
327 :     Reference to a list of the names for the load groups to process.
328 :    
329 :     =item options
330 :    
331 :     Reference to a hash of the options passed in from the command line.
332 :    
333 :     =back
334 :    
335 :     =cut
336 :    
337 :     sub LoadFromInput {
338 :     # Get the parameters.
339 :     my ($ih, $erdb, $source, $groups, $options) = @_;
340 :     # Create a load object for each group.
341 :     my %loaders = map { $_ => $erdb->Loader($_, $source, $options) } @{$groups};
342 :     # We'll count our errors in here.
343 :     my $errorCount = 0;
344 :     my $maxErrors = $options->{maxErrors};
345 :     # Loop through the sections.
346 :     while (! eof $ih) {
347 :     # Get this section ID.
348 :     my ($section) = Tracer::GetLine($ih);
349 :     # Process it for each load group.
350 :     for my $group (@$groups) {
351 :     Trace("Processing section $section for group $group in worker $$.") if T(3);
352 :     my $ok = $loaders{$group}->ProcessSection($section);
353 :     # Check to see if we've exceeded the maximum error count.
354 :     if (! $ok && $maxErrors ne '' && $errorCount++ >= $maxErrors) {
355 :     Confess("Error limit exceeded in database loader.");
356 :     }
357 :     }
358 :     }
359 :     # Now we display the statistics for each group.
360 :     for my $group (@$groups) {
361 :     Trace("Statistics for group $group\n" . $loaders{$group}->DisplayStats()) if T(2);
362 :     }
363 :     Trace("Processing finished for worker $$.") if T(2);
364 :     }
365 :    
366 :     1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3