[Bio] / FortyEight / batch_rast.pl Repository:
ViewVC logotype

Annotation of /FortyEight/batch_rast.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.15 - (view) (download) (as text)

1 : olson 1.1
2 :     use strict;
3 :     use Carp;
4 :     use Job48;
5 :     use FIG_Config;
6 :     use FIG;
7 : olson 1.3 use Getopt::Long;
8 : olson 1.15 use File::Basename;
9 :     use JobError qw(flag_error);
10 : olson 1.1 #
11 :     # Run a jobdirectory in one shot. For batch offload to a remote cluster that
12 :     # doesn't have our scheduler, etc available.
13 :     #
14 :    
15 :     #
16 :     # Stages are as follows; for now this is a copy and paste exercise from
17 :     # FortyEight/check_jobs.pl. Use caution, don't run with scissors.
18 :     #
19 :    
20 :     #
21 :     # upload
22 :     # rp
23 :     # Check status of keep_genecalls, then qc
24 :     # Check status of correction, then correction
25 :     # preprocess_sims
26 :     # sims
27 :     # bbhs
28 :     # auto_assign
29 :     # glue_contigs
30 :     # pchs
31 :     # scenario
32 :     # export
33 :     #
34 :    
35 : olson 1.3 my $parallel = 1;
36 : olson 1.4 my @phase;
37 : olson 1.2
38 : olson 1.4 my $usage = "Usage: $0 [--parallel N] -phase N [--phase N ..] jobdir\n";
39 : olson 1.3
40 :     if (!GetOptions("parallel=i" => \$parallel,
41 : olson 1.14 "phase=s" => \@phase))
42 : olson 1.3 {
43 :     die $usage;
44 :     }
45 :    
46 :     @ARGV == 1 or die $usage;
47 : olson 1.1
48 : olson 1.4 @phase > 0 or die $usage;
49 :     my %phase = map { $_ => 1 } @phase;
50 :    
51 : olson 1.1 my $job_dir = shift;
52 :    
53 : olson 1.12 #
54 :     # Only write process startup log if we're not doing sims
55 :     # or we're not in the SGE context. Otherwise we are
56 :     # flaying the NFS locking system badly.
57 :     #
58 :     my $log_subprocesses;
59 :     if (!$phase{3} || ($ENV{SGE_TASK_ID} eq '' || $ENV{SGE_TASK_ID} < 2))
60 :     {
61 :     $log_subprocesses++;
62 :     }
63 :    
64 : olson 1.11 if (-f "$job_dir/CANCEL")
65 :     {
66 :     die "Job exiting due to earlier CANCEL\n";
67 :     }
68 :    
69 : olson 1.1 my $job = new Job48($job_dir);
70 :    
71 : olson 1.2 my $sims_data_dir = $FIG_Config::rast_sims_data;
72 :    
73 :     if (!defined($sims_data_dir))
74 :     {
75 :     $sims_data_dir = $FIG_Config::fortyeight_data;
76 :     }
77 :    
78 :     my $sims_nr = "$sims_data_dir/nr";
79 :     my $sims_peg_synonyms = "$sims_data_dir/peg.synonyms";
80 :     my $sims_keep_count = 300;
81 :    
82 : olson 1.7 my $job48 = new Job48($job_dir);
83 :     my $meta = $job48->meta;
84 :    
85 :     my $host = `hostname`;
86 :     chomp $host;
87 : olson 1.12 $meta->add_log_entry($0, "Running phases @phase on $host") if $log_subprocesses;
88 : olson 1.7
89 : olson 1.4 #
90 :     # Emulate execution of SGE parallel environment via the
91 :     # --parallel N argument.
92 :     #
93 : olson 1.3 if ($parallel > 1)
94 :     {
95 :     $ENV{PE} = 'cluster';
96 :     $ENV{NSLOTS} = $parallel;
97 :     }
98 : olson 1.2
99 : olson 1.5 if ($phase{1})
100 : olson 1.3 {
101 :     &do_upload($job);
102 :     &do_rp($job);
103 :     }
104 :    
105 : olson 1.5 if ($phase{2})
106 : olson 1.3 {
107 :     &do_qc($job);
108 :     &do_correction($job);
109 :     &do_sims_preprocess($job);
110 : olson 1.13 #
111 :     # After we've preprocessed we know how many tasks we actually need.
112 :     # If our SGE job has more than that, we can prune it away.
113 :     #
114 :     # We can determine that because the metadata flag
115 :     # sge_job.p_3_<jobid>.tasks will be set to the number of tasks.
116 :     #
117 : olson 1.3 }
118 :    
119 : olson 1.5 if ($phase{3})
120 : olson 1.3 {
121 : olson 1.4 #
122 :     # If running inside a SGE task array job, execute
123 :     # our task. Otherwise run all of them.
124 :     #
125 :     if ($ENV{SGE_TASK_ID})
126 :     {
127 :     &run("$FIG_Config::bin/rp_compute_sims", $job->dir);
128 :     }
129 :     else
130 :     {
131 :     &do_sims($job);
132 :     }
133 : olson 1.3 }
134 :    
135 : olson 1.14 if ($phase{S})
136 :     {
137 :     #
138 :     # We are to submit a slurm batch job to compute the sims
139 :     # and await its completion.
140 :     #
141 :     # Read the task list to determine the number of tasks to submit.
142 :     #
143 :    
144 :     my $ntasks = 0;
145 :     if (open(TL, "<", "$job_dir/sims.job/task.list"))
146 :     {
147 :     while (<TL>)
148 :     {
149 :     $ntasks++;
150 :     }
151 :     close(TL);
152 :     }
153 :     else
154 :     {
155 : olson 1.15 fail($job_dir, "Cannot open sims task list $job_dir/sims.job/task.list: $!");
156 : olson 1.14 }
157 :     my @cmd = ("/disks/patric-common/slurm/bin/sbatch",
158 :     "--parsable",
159 : olson 1.15 "-M", "maas",
160 :     "-C", "sim",
161 : olson 1.14 "-D", "/tmp",
162 : olson 1.15 "--export", "NONE,PATH=/disks/rast/bin:/vol/rast-prod/FIGdisk/FIG/bin:/bin:/usr/bin",
163 :     "--job-name", "R-" . $job->id,
164 : olson 1.14 "-a", "1-$ntasks",
165 :     "-A", "rast",
166 : olson 1.15 "--mem", "6G",
167 :     "--wrap", "standalone-sims -- -1 " . $job->id,
168 :     "--cpus-per-task", 4,
169 : olson 1.14 "-n", 1);
170 :     print STDERR "Submit with @cmd\n";
171 : olson 1.15 open(SUB, "-|", @cmd) or fail($job_dir, "Cannot run submit @cmd: $!");
172 : olson 1.14 my $batch_job;
173 : olson 1.15 my $cluster;
174 : olson 1.14 while (<SUB>)
175 :     {
176 :     print STDERR "Submit output: $_";
177 : olson 1.15 if (/^(\d+)(;(\S+))?/)
178 : olson 1.14 {
179 :     $batch_job = $1;
180 : olson 1.15 $cluster = $3;
181 : olson 1.14 }
182 :     }
183 :     close(SUB);
184 :     if (!$batch_job)
185 :     {
186 : olson 1.15 fail($job_dir, "Unable to get batch job id from @cmd");
187 : olson 1.14 }
188 :    
189 :     #
190 :     # Await completion.
191 :     #
192 : olson 1.15 my @cluster = ("-M", $cluster) if $cluster;
193 : olson 1.14 while (1)
194 :     {
195 :     my $n = 0;
196 :     print STERR "Check queue for $batch_job\n";
197 : olson 1.15 open(QCHK, "-|", "/disks/patric-common/slurm/bin/squeue", "--noheader", "-j", $batch_job, @cluster) or fail($job_dir, "Cannot run squeue --noheader: $!");
198 : olson 1.14 while (<QCHK>)
199 :     {
200 :     print STDERR $_;
201 : olson 1.15 if (/^\s+\d/)
202 :     {
203 :     $n++;
204 :     }
205 : olson 1.14 }
206 : olson 1.15 my $rc = close(QCHK);
207 :    
208 :     if (!$rc)
209 :     {
210 :     print STDERR "squeue close failed: rc=$rc child_error=$?\n";
211 :     }
212 :     elsif ($n == 0)
213 : olson 1.14 {
214 :     print STDERR "Job $batch_job complete\n";
215 :     last;
216 :     }
217 :     sleep(60);
218 :     }
219 :     }
220 :    
221 : olson 1.5 if ($phase{4})
222 : olson 1.3 {
223 : olson 1.6 &do_sims_postprocess($job);
224 : olson 1.3 &do_bbhs($job);
225 :     &do_auto_assign($job);
226 :     &do_glue_contigs($job);
227 :     &do_pchs($job);
228 : olson 1.14 # &do_scenario($job);
229 : olson 1.3 &do_export($job);
230 : olson 1.8 &mark_job_user_complete($job);
231 : olson 1.3 }
232 : olson 1.1
233 :     sub do_upload
234 :     {
235 :     my($job) = @_;
236 :     return;
237 :     }
238 :    
239 :     sub do_rp
240 :     {
241 :     my($job) = @_;
242 :     &run("$FIG_Config::bin/rp_rapid_propagation", $job->dir);
243 :     }
244 :    
245 :     sub do_qc
246 :     {
247 :     my($job) = @_;
248 :    
249 :     if ($job->meta->get_metadata("keep_genecalls"))
250 :     {
251 :     $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking qc as complete");
252 :     $job->meta->set_metadata("status.qc", "complete");
253 :     return;
254 :     }
255 :    
256 :     &run("$FIG_Config::bin/rp_quality_check", $job->dir);
257 :     }
258 :    
259 :     sub do_correction
260 :     {
261 :     my($job) = @_;
262 :    
263 :     if ($job->meta->get_metadata("keep_genecalls"))
264 :     {
265 :     $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking correction as complete");
266 :     $job->meta->set_metadata("status.correction", "complete");
267 :     return;
268 :     }
269 :    
270 :     my $correction_list = $job->meta->get_metadata("correction.request");
271 : olson 1.3
272 :     if (ref($correction_list))
273 :     {
274 :     my $correction_str = join(",", @$correction_list);
275 :     &run("$FIG_Config::bin/rp_correction", $job->dir, $correction_str);
276 :     }
277 : olson 1.1 }
278 :    
279 :     sub do_sims_preprocess
280 :     {
281 :     my($job) = @_;
282 : olson 1.2
283 :     &run("$FIG_Config::bin/rp_preprocess_sims", $job->dir, $sims_nr, $sims_peg_synonyms);
284 :    
285 : olson 1.1 }
286 :    
287 :     sub do_sims
288 :     {
289 :     my($job) = @_;
290 : olson 1.2
291 :     if (!open(CHUNK, "<", $job->dir. "/sims.job/chunk.out"))
292 :     {
293 : olson 1.15 fail($job_dir, "Error opening $job_dir/sims.job/chunk.out: $!");
294 : olson 1.2 }
295 :    
296 :     #
297 :     # Extract created task ids
298 :     #
299 :    
300 :     my($task_start, $task_end);
301 :     while (<CHUNK>)
302 :     {
303 :     print;
304 :     chomp;
305 :     if (/^tasks\s+(\d+)\s+(\d+)/)
306 :     {
307 :     $task_start = $1;
308 :     $task_end = $2;
309 :     }
310 :     }
311 :     close(CHUNK);
312 :    
313 :     if (!defined($task_start))
314 :     {
315 : olson 1.15 fail($job_dir, "Tasks not found");
316 : olson 1.2 }
317 :    
318 : olson 1.6 for my $task ($task_start .. $task_end)
319 :     {
320 :     $ENV{SGE_TASK_ID} = $task;
321 :     &run("$FIG_Config::bin/rp_compute_sims", $job->dir);
322 :     }
323 :     }
324 :    
325 :     sub do_sims_postprocess
326 :     {
327 :     my($job) = @_;
328 :    
329 : olson 1.2 my $sims_nr_len = $sims_nr;
330 :     if (-f "$sims_nr-len.btree")
331 :     {
332 :     $sims_nr_len = "$sims_nr-len.btree";
333 :     }
334 :    
335 : olson 1.6 &run("$FIG_Config::bin/rp_postproc_sims", $job->dir, $sims_nr_len, $sims_peg_synonyms, $sims_keep_count);
336 : olson 1.1 }
337 :    
338 :     sub do_bbhs
339 :     {
340 :     my($job) = @_;
341 : olson 1.2 &run("$FIG_Config::bin/rp_compute_bbhs", $job->dir);
342 : olson 1.1 }
343 :    
344 :     sub do_auto_assign
345 :     {
346 :     my($job) = @_;
347 : olson 1.2 &run("$FIG_Config::bin/rp_auto_assign", $job->dir);
348 : olson 1.1 }
349 :    
350 :     sub do_glue_contigs
351 :     {
352 :     my($job) = @_;
353 : olson 1.2 &run("$FIG_Config::bin/rp_glue_contigs", $job->dir);
354 : olson 1.1 }
355 :    
356 :     sub do_pchs
357 :     {
358 :     my($job) = @_;
359 : olson 1.2 &run("$FIG_Config::bin/rp_compute_pchs", $job->dir);
360 : olson 1.1 }
361 :    
362 :     sub do_scenario
363 :     {
364 :     my($job) = @_;
365 : olson 1.2 &run("$FIG_Config::bin/rp_scenarios", $job->dir);
366 : olson 1.1 }
367 :    
368 :     sub do_export
369 :     {
370 :     my($job) = @_;
371 : olson 1.2 &run("$FIG_Config::bin/rp_write_exports", $job->dir);
372 : olson 1.1 }
373 :    
374 : olson 1.8 sub mark_job_user_complete
375 :     {
376 :     my($job) = @_;
377 :    
378 :     my $job_dir = $job->dir;
379 :     my $meta = $job->meta;
380 :     my $job_id = $job->id;
381 :    
382 :     system("$FIG_Config::bin/send_job_completion_email", $job_dir);
383 :    
384 :     $meta->set_metadata("status.final", "complete");
385 :    
386 :     #
387 :     # If the job is a SEED candidate, send VV email.
388 :     #
389 :    
390 :     if ($meta->get_metadata("import.suggested") or
391 :     $meta->get_metadata("import.candidate"))
392 :     {
393 :     my $gname = $job->genome_name;
394 :     my $mail = Mail::Mailer->new();
395 :     $mail->open({
396 :     To => 'Veronika Vonstein <veronika@thefig.info>, Robert Olson<olson@mcs.anl.gov>, Andreas Wilke<wilke@mcs.anl.gov>',
397 :     From => 'Annotation Server <rast@mcs.anl.gov>',
398 :     Subject => "RAST job $job_id marked for SEED inclusion",
399 :     });
400 :    
401 :     print $mail <<END;
402 :     RAST job #$job_id ($gname) was submitted for inclusion into the SEED, and has finished its processing.
403 :     END
404 :     $mail->close();
405 :    
406 :     #
407 :     # We also mark the job as ACTIVE again so that the
408 :     # normal post-seed-acceptance pipeline stages may execute.
409 :     #
410 :     open(F, ">$job_dir/ACTIVE");
411 :     close(F);
412 :     }
413 :     else
414 :     {
415 :     #
416 :     # Otherwise it is completely done.
417 :     #
418 :     &mark_job_done($job);
419 :     }
420 :     }
421 :    
422 :     sub mark_job_done
423 :     {
424 :     my($job) = @_;
425 :    
426 :     #
427 :     # If we spooled the job out onto the lustre disk, we need to
428 :     # spool it back.
429 :     #
430 :    
431 :     my $meta = $job->meta;
432 :     my $job_dir = $job->dir;
433 :    
434 :     if ($meta->get_metadata("lustre_required"))
435 :     {
436 :     &run("$FIG_Config::bin/rp_lustre_finish", $job_dir);
437 :     }
438 :     if (open(D, ">$job_dir/DONE"))
439 :     {
440 :     print D time . "\n";
441 :     close(D);
442 :     }
443 :     else
444 :     {
445 :     warn "Error opening $job_dir/DONE: $!\n";
446 :     }
447 :    
448 :     unlink("$job_dir/ACTIVE");
449 :     }
450 : olson 1.1
451 :     sub run
452 :     {
453 :     my(@cmd) = @_;
454 :    
455 : olson 1.9 my $cmd_str = join(" ", @cmd);
456 :     print "Start: $cmd_str\n";
457 : olson 1.12 $meta->add_log_entry($0, ['Start', $cmd_str]) if $log_subprocesses;
458 : olson 1.1 my $rc = system(@cmd);
459 :     if ($rc != 0)
460 :     {
461 : olson 1.9 $meta->add_log_entry($0, ['Failed', $rc, $cmd_str]);
462 : olson 1.12 print STDERR "Failed: $rc $cmd_str\n";
463 :     if (open(FH, ">", "$job_dir/CANCEL"))
464 : olson 1.11 {
465 :     print FH "Cancel job due to error in $0 @cmd\n";
466 :     close(FH);
467 :     }
468 :     #
469 :     # Attempt to qdel any other parts of this job that are queued or running.
470 : olson 1.12 # Only if we are running in the SGE environment.
471 : olson 1.11 #
472 : olson 1.12 if ($ENV{SGE_ARCH} ne '')
473 : olson 1.11 {
474 : olson 1.12 my @jobs;
475 :    
476 :     for my $k ($meta->get_metadata_keys())
477 : olson 1.11 {
478 : olson 1.12 if ($k =~ /p_.*\.sge_job_id/)
479 : olson 1.11 {
480 : olson 1.12 my $job_id = $meta->get_metadata($k);
481 :     #
482 :     # Don't qdel this job.
483 :     #
484 :     if ($job_id =~ /^\d+$/ && $job_id != $ENV{JOB_ID})
485 :     {
486 :     push(@jobs, $job_id);
487 :     }
488 :    
489 : olson 1.11 }
490 :     }
491 : olson 1.12 if (@jobs)
492 :     {
493 :     my $rc2 = system("qdel", @jobs);
494 :     print "qdel @jobs returned $rc2\n";
495 :     $meta->add_log_entry($0, "Qdel @jobs due to failure returned status $rc2") if $log_subprocesses;
496 :     }
497 : olson 1.11 }
498 :    
499 : olson 1.9 confess "Cmd failed with rc=$rc: $cmd_str\n";
500 : olson 1.1 }
501 : olson 1.12 $meta->add_log_entry($0, ['Done', $cmd_str]) if $log_subprocesses;
502 : olson 1.9 print "Done: $cmd_str\n";
503 : olson 1.1 }
504 : olson 1.15
505 :     #
506 :     # Use JobError to mark error and quit.
507 :     #
508 :     sub fail
509 :     {
510 :     my($job_dir, $msg) = @_;
511 :    
512 :     my $job_id = basename($job_dir);
513 :     my $genome = &FIG::file_head("$job_dir/GENOME_ID", 1);
514 :     chomp $genome;
515 :    
516 :     my $meta = GenomeMeta->new($genome, "$job_dir/meta.xml");
517 :     flag_error($genome, $job_id, $job_dir, $meta, undef, $msg);
518 :     die "Failing with error: $msg";
519 :     }

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3