[Bio] / FortyEight / batch_rast.pl Repository:
ViewVC logotype

Annotation of /FortyEight/batch_rast.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.12 - (view) (download) (as text)

1 : olson 1.1
2 :     use strict;
3 :     use Carp;
4 :     use Job48;
5 :     use FIG_Config;
6 :     use FIG;
7 : olson 1.3 use Getopt::Long;
8 : olson 1.1
9 :     #
10 :     # Run a jobdirectory in one shot. For batch offload to a remote cluster that
11 :     # doesn't have our scheduler, etc available.
12 :     #
13 :    
14 :     #
15 :     # Stages are as follows; for now this is a copy and paste exercise from
16 :     # FortyEight/check_jobs.pl. Use caution, don't run with scissors.
17 :     #
18 :    
19 :     #
20 :     # upload
21 :     # rp
22 :     # Check status of keep_genecalls, then qc
23 :     # Check status of correction, then correction
24 :     # preprocess_sims
25 :     # sims
26 :     # bbhs
27 :     # auto_assign
28 :     # glue_contigs
29 :     # pchs
30 :     # scenario
31 :     # export
32 :     #
33 :    
34 : olson 1.3 my $parallel = 1;
35 : olson 1.4 my @phase;
36 : olson 1.2
37 : olson 1.4 my $usage = "Usage: $0 [--parallel N] -phase N [--phase N ..] jobdir\n";
38 : olson 1.3
39 :     if (!GetOptions("parallel=i" => \$parallel,
40 : olson 1.4 "phase=i" => \@phase))
41 : olson 1.3 {
42 :     die $usage;
43 :     }
44 :    
45 :     @ARGV == 1 or die $usage;
46 : olson 1.1
47 : olson 1.4 @phase > 0 or die $usage;
48 :     my %phase = map { $_ => 1 } @phase;
49 :    
50 : olson 1.1 my $job_dir = shift;
51 :    
52 : olson 1.12 #
53 :     # Only write process startup log if we're not doing sims
54 :     # or we're not in the SGE context. Otherwise we are
55 :     # flaying the NFS locking system badly.
56 :     #
57 :     my $log_subprocesses;
58 :     if (!$phase{3} || ($ENV{SGE_TASK_ID} eq '' || $ENV{SGE_TASK_ID} < 2))
59 :     {
60 :     $log_subprocesses++;
61 :     }
62 :    
63 : olson 1.11 if (-f "$job_dir/CANCEL")
64 :     {
65 :     die "Job exiting due to earlier CANCEL\n";
66 :     }
67 :    
68 : olson 1.1 my $job = new Job48($job_dir);
69 :    
70 : olson 1.2 my $sims_data_dir = $FIG_Config::rast_sims_data;
71 :    
72 :     if (!defined($sims_data_dir))
73 :     {
74 :     $sims_data_dir = $FIG_Config::fortyeight_data;
75 :     }
76 :    
77 :     my $sims_nr = "$sims_data_dir/nr";
78 :     my $sims_peg_synonyms = "$sims_data_dir/peg.synonyms";
79 :     my $sims_keep_count = 300;
80 :    
81 : olson 1.7 my $job48 = new Job48($job_dir);
82 :     my $meta = $job48->meta;
83 :    
84 :     my $host = `hostname`;
85 :     chomp $host;
86 : olson 1.12 $meta->add_log_entry($0, "Running phases @phase on $host") if $log_subprocesses;
87 : olson 1.7
88 : olson 1.4 #
89 :     # Emulate execution of SGE parallel environment via the
90 :     # --parallel N argument.
91 :     #
92 : olson 1.3 if ($parallel > 1)
93 :     {
94 :     $ENV{PE} = 'cluster';
95 :     $ENV{NSLOTS} = $parallel;
96 :     }
97 : olson 1.2
98 : olson 1.5 if ($phase{1})
99 : olson 1.3 {
100 :     &do_upload($job);
101 :     &do_rp($job);
102 :     }
103 :    
104 : olson 1.5 if ($phase{2})
105 : olson 1.3 {
106 :     &do_qc($job);
107 :     &do_correction($job);
108 :     &do_sims_preprocess($job);
109 :     }
110 :    
111 : olson 1.5 if ($phase{3})
112 : olson 1.3 {
113 : olson 1.4 #
114 :     # If running inside a SGE task array job, execute
115 :     # our task. Otherwise run all of them.
116 :     #
117 :     if ($ENV{SGE_TASK_ID})
118 :     {
119 :     &run("$FIG_Config::bin/rp_compute_sims", $job->dir);
120 :     }
121 :     else
122 :     {
123 :     &do_sims($job);
124 :     }
125 : olson 1.3 }
126 :    
127 : olson 1.5 if ($phase{4})
128 : olson 1.3 {
129 : olson 1.6 &do_sims_postprocess($job);
130 : olson 1.3 &do_bbhs($job);
131 :     &do_auto_assign($job);
132 :     &do_glue_contigs($job);
133 :     &do_pchs($job);
134 :     &do_scenario($job);
135 :     &do_export($job);
136 : olson 1.8 &mark_job_user_complete($job);
137 : olson 1.3 }
138 : olson 1.1
139 :     sub do_upload
140 :     {
141 :     my($job) = @_;
142 :     return;
143 :     }
144 :    
145 :     sub do_rp
146 :     {
147 :     my($job) = @_;
148 :     &run("$FIG_Config::bin/rp_rapid_propagation", $job->dir);
149 :     }
150 :    
151 :     sub do_qc
152 :     {
153 :     my($job) = @_;
154 :    
155 :     if ($job->meta->get_metadata("keep_genecalls"))
156 :     {
157 :     $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking qc as complete");
158 :     $job->meta->set_metadata("status.qc", "complete");
159 :     return;
160 :     }
161 :    
162 :     &run("$FIG_Config::bin/rp_quality_check", $job->dir);
163 :     }
164 :    
165 :     sub do_correction
166 :     {
167 :     my($job) = @_;
168 :    
169 :     if ($job->meta->get_metadata("keep_genecalls"))
170 :     {
171 :     $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking correction as complete");
172 :     $job->meta->set_metadata("status.correction", "complete");
173 :     return;
174 :     }
175 :    
176 :     my $correction_list = $job->meta->get_metadata("correction.request");
177 : olson 1.3
178 :     if (ref($correction_list))
179 :     {
180 :     my $correction_str = join(",", @$correction_list);
181 :     &run("$FIG_Config::bin/rp_correction", $job->dir, $correction_str);
182 :     }
183 : olson 1.1 }
184 :    
185 :     sub do_sims_preprocess
186 :     {
187 :     my($job) = @_;
188 : olson 1.2
189 :     &run("$FIG_Config::bin/rp_preprocess_sims", $job->dir, $sims_nr, $sims_peg_synonyms);
190 :    
191 : olson 1.1 }
192 :    
193 :     sub do_sims
194 :     {
195 :     my($job) = @_;
196 : olson 1.2
197 :     if (!open(CHUNK, "<", $job->dir. "/sims.job/chunk.out"))
198 :     {
199 :     die "Error opening $job_dir/sims.job/chunk.out: $!";
200 :     }
201 :    
202 :     #
203 :     # Extract created task ids
204 :     #
205 :    
206 :     my($task_start, $task_end);
207 :     while (<CHUNK>)
208 :     {
209 :     print;
210 :     chomp;
211 :     if (/^tasks\s+(\d+)\s+(\d+)/)
212 :     {
213 :     $task_start = $1;
214 :     $task_end = $2;
215 :     }
216 :     }
217 :     close(CHUNK);
218 :    
219 :     if (!defined($task_start))
220 :     {
221 :     die "Tasks not found";
222 :     }
223 :    
224 : olson 1.6 for my $task ($task_start .. $task_end)
225 :     {
226 :     $ENV{SGE_TASK_ID} = $task;
227 :     &run("$FIG_Config::bin/rp_compute_sims", $job->dir);
228 :     }
229 :     }
230 :    
231 :     sub do_sims_postprocess
232 :     {
233 :     my($job) = @_;
234 :    
235 : olson 1.2 my $sims_nr_len = $sims_nr;
236 :     if (-f "$sims_nr-len.btree")
237 :     {
238 :     $sims_nr_len = "$sims_nr-len.btree";
239 :     }
240 :    
241 : olson 1.6 &run("$FIG_Config::bin/rp_postproc_sims", $job->dir, $sims_nr_len, $sims_peg_synonyms, $sims_keep_count);
242 : olson 1.1 }
243 :    
244 :     sub do_bbhs
245 :     {
246 :     my($job) = @_;
247 : olson 1.2 &run("$FIG_Config::bin/rp_compute_bbhs", $job->dir);
248 : olson 1.1 }
249 :    
250 :     sub do_auto_assign
251 :     {
252 :     my($job) = @_;
253 : olson 1.2 &run("$FIG_Config::bin/rp_auto_assign", $job->dir);
254 : olson 1.1 }
255 :    
256 :     sub do_glue_contigs
257 :     {
258 :     my($job) = @_;
259 : olson 1.2 &run("$FIG_Config::bin/rp_glue_contigs", $job->dir);
260 : olson 1.1 }
261 :    
262 :     sub do_pchs
263 :     {
264 :     my($job) = @_;
265 : olson 1.2 &run("$FIG_Config::bin/rp_compute_pchs", $job->dir);
266 : olson 1.1 }
267 :    
268 :     sub do_scenario
269 :     {
270 :     my($job) = @_;
271 : olson 1.2 &run("$FIG_Config::bin/rp_scenarios", $job->dir);
272 : olson 1.1 }
273 :    
274 :     sub do_export
275 :     {
276 :     my($job) = @_;
277 : olson 1.2 &run("$FIG_Config::bin/rp_write_exports", $job->dir);
278 : olson 1.1 }
279 :    
280 : olson 1.8 sub mark_job_user_complete
281 :     {
282 :     my($job) = @_;
283 :    
284 :     my $job_dir = $job->dir;
285 :     my $meta = $job->meta;
286 :     my $job_id = $job->id;
287 :    
288 :     system("$FIG_Config::bin/send_job_completion_email", $job_dir);
289 :    
290 :     $meta->set_metadata("status.final", "complete");
291 :    
292 :     #
293 :     # If the job is a SEED candidate, send VV email.
294 :     #
295 :    
296 :     if ($meta->get_metadata("import.suggested") or
297 :     $meta->get_metadata("import.candidate"))
298 :     {
299 :     my $gname = $job->genome_name;
300 :     my $mail = Mail::Mailer->new();
301 :     $mail->open({
302 :     To => 'Veronika Vonstein <veronika@thefig.info>, Robert Olson<olson@mcs.anl.gov>, Andreas Wilke<wilke@mcs.anl.gov>',
303 :     From => 'Annotation Server <rast@mcs.anl.gov>',
304 :     Subject => "RAST job $job_id marked for SEED inclusion",
305 :     });
306 :    
307 :     print $mail <<END;
308 :     RAST job #$job_id ($gname) was submitted for inclusion into the SEED, and has finished its processing.
309 :     END
310 :     $mail->close();
311 :    
312 :     #
313 :     # We also mark the job as ACTIVE again so that the
314 :     # normal post-seed-acceptance pipeline stages may execute.
315 :     #
316 :     open(F, ">$job_dir/ACTIVE");
317 :     close(F);
318 :     }
319 :     else
320 :     {
321 :     #
322 :     # Otherwise it is completely done.
323 :     #
324 :     &mark_job_done($job);
325 :     }
326 :     }
327 :    
328 :     sub mark_job_done
329 :     {
330 :     my($job) = @_;
331 :    
332 :     #
333 :     # If we spooled the job out onto the lustre disk, we need to
334 :     # spool it back.
335 :     #
336 :    
337 :     my $meta = $job->meta;
338 :     my $job_dir = $job->dir;
339 :    
340 :     if ($meta->get_metadata("lustre_required"))
341 :     {
342 :     &run("$FIG_Config::bin/rp_lustre_finish", $job_dir);
343 :     }
344 :     if (open(D, ">$job_dir/DONE"))
345 :     {
346 :     print D time . "\n";
347 :     close(D);
348 :     }
349 :     else
350 :     {
351 :     warn "Error opening $job_dir/DONE: $!\n";
352 :     }
353 :    
354 :     unlink("$job_dir/ACTIVE");
355 :     }
356 : olson 1.1
357 :     sub run
358 :     {
359 :     my(@cmd) = @_;
360 :    
361 : olson 1.9 my $cmd_str = join(" ", @cmd);
362 :     print "Start: $cmd_str\n";
363 : olson 1.12 $meta->add_log_entry($0, ['Start', $cmd_str]) if $log_subprocesses;
364 : olson 1.1 my $rc = system(@cmd);
365 :     if ($rc != 0)
366 :     {
367 : olson 1.9 $meta->add_log_entry($0, ['Failed', $rc, $cmd_str]);
368 : olson 1.12 print STDERR "Failed: $rc $cmd_str\n";
369 :     if (open(FH, ">", "$job_dir/CANCEL"))
370 : olson 1.11 {
371 :     print FH "Cancel job due to error in $0 @cmd\n";
372 :     close(FH);
373 :     }
374 :     #
375 :     # Attempt to qdel any other parts of this job that are queued or running.
376 : olson 1.12 # Only if we are running in the SGE environment.
377 : olson 1.11 #
378 : olson 1.12 if ($ENV{SGE_ARCH} ne '')
379 : olson 1.11 {
380 : olson 1.12 my @jobs;
381 :    
382 :     for my $k ($meta->get_metadata_keys())
383 : olson 1.11 {
384 : olson 1.12 if ($k =~ /p_.*\.sge_job_id/)
385 : olson 1.11 {
386 : olson 1.12 my $job_id = $meta->get_metadata($k);
387 :     #
388 :     # Don't qdel this job.
389 :     #
390 :     if ($job_id =~ /^\d+$/ && $job_id != $ENV{JOB_ID})
391 :     {
392 :     push(@jobs, $job_id);
393 :     }
394 :    
395 : olson 1.11 }
396 :     }
397 : olson 1.12 if (@jobs)
398 :     {
399 :     my $rc2 = system("qdel", @jobs);
400 :     print "qdel @jobs returned $rc2\n";
401 :     $meta->add_log_entry($0, "Qdel @jobs due to failure returned status $rc2") if $log_subprocesses;
402 :     }
403 : olson 1.11 }
404 :    
405 : olson 1.9 confess "Cmd failed with rc=$rc: $cmd_str\n";
406 : olson 1.1 }
407 : olson 1.12 $meta->add_log_entry($0, ['Done', $cmd_str]) if $log_subprocesses;
408 : olson 1.9 print "Done: $cmd_str\n";
409 : olson 1.1 }

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3