use strict; use Carp; use Job48; use FIG_Config; use FIG; use Getopt::Long; # # Run a jobdirectory in one shot. For batch offload to a remote cluster that # doesn't have our scheduler, etc available. # # # Stages are as follows; for now this is a copy and paste exercise from # FortyEight/check_jobs.pl. Use caution, don't run with scissors. # # # upload # rp # Check status of keep_genecalls, then qc # Check status of correction, then correction # preprocess_sims # sims # bbhs # auto_assign # glue_contigs # pchs # scenario # export # my $parallel = 1; my @phase; my $usage = "Usage: $0 [--parallel N] -phase N [--phase N ..] jobdir\n"; if (!GetOptions("parallel=i" => \$parallel, "phase=i" => \@phase)) { die $usage; } @ARGV == 1 or die $usage; @phase > 0 or die $usage; my %phase = map { $_ => 1 } @phase; my $job_dir = shift; if (-f "$job_dir/CANCEL") { die "Job exiting due to earlier CANCEL\n"; } my $job = new Job48($job_dir); my $sims_data_dir = $FIG_Config::rast_sims_data; if (!defined($sims_data_dir)) { $sims_data_dir = $FIG_Config::fortyeight_data; } my $sims_nr = "$sims_data_dir/nr"; my $sims_peg_synonyms = "$sims_data_dir/peg.synonyms"; my $sims_keep_count = 300; my $job48 = new Job48($job_dir); my $meta = $job48->meta; my $host = `hostname`; chomp $host; $meta->add_log_entry($0, "Running phases @phase on $host"); # # Emulate execution of SGE parallel environment via the # --parallel N argument. # if ($parallel > 1) { $ENV{PE} = 'cluster'; $ENV{NSLOTS} = $parallel; } if ($phase{1}) { &do_upload($job); &do_rp($job); } if ($phase{2}) { &do_qc($job); &do_correction($job); &do_sims_preprocess($job); } if ($phase{3}) { # # If running inside a SGE task array job, execute # our task. Otherwise run all of them. # if ($ENV{SGE_TASK_ID}) { &run("$FIG_Config::bin/rp_compute_sims", $job->dir); } else { &do_sims($job); } } if ($phase{4}) { &do_sims_postprocess($job); &do_bbhs($job); &do_auto_assign($job); &do_glue_contigs($job); &do_pchs($job); &do_scenario($job); &do_export($job); &mark_job_user_complete($job); } sub do_upload { my($job) = @_; return; } sub do_rp { my($job) = @_; &run("$FIG_Config::bin/rp_rapid_propagation", $job->dir); } sub do_qc { my($job) = @_; if ($job->meta->get_metadata("keep_genecalls")) { $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking qc as complete"); $job->meta->set_metadata("status.qc", "complete"); return; } &run("$FIG_Config::bin/rp_quality_check", $job->dir); } sub do_correction { my($job) = @_; if ($job->meta->get_metadata("keep_genecalls")) { $job->meta->add_log_entry($0, "keep_genecalls is enabled: marking correction as complete"); $job->meta->set_metadata("status.correction", "complete"); return; } my $correction_list = $job->meta->get_metadata("correction.request"); if (ref($correction_list)) { my $correction_str = join(",", @$correction_list); &run("$FIG_Config::bin/rp_correction", $job->dir, $correction_str); } } sub do_sims_preprocess { my($job) = @_; &run("$FIG_Config::bin/rp_preprocess_sims", $job->dir, $sims_nr, $sims_peg_synonyms); } sub do_sims { my($job) = @_; if (!open(CHUNK, "<", $job->dir. "/sims.job/chunk.out")) { die "Error opening $job_dir/sims.job/chunk.out: $!"; } # # Extract created task ids # my($task_start, $task_end); while () { print; chomp; if (/^tasks\s+(\d+)\s+(\d+)/) { $task_start = $1; $task_end = $2; } } close(CHUNK); if (!defined($task_start)) { die "Tasks not found"; } for my $task ($task_start .. $task_end) { $ENV{SGE_TASK_ID} = $task; &run("$FIG_Config::bin/rp_compute_sims", $job->dir); } } sub do_sims_postprocess { my($job) = @_; my $sims_nr_len = $sims_nr; if (-f "$sims_nr-len.btree") { $sims_nr_len = "$sims_nr-len.btree"; } &run("$FIG_Config::bin/rp_postproc_sims", $job->dir, $sims_nr_len, $sims_peg_synonyms, $sims_keep_count); } sub do_bbhs { my($job) = @_; &run("$FIG_Config::bin/rp_compute_bbhs", $job->dir); } sub do_auto_assign { my($job) = @_; &run("$FIG_Config::bin/rp_auto_assign", $job->dir); } sub do_glue_contigs { my($job) = @_; &run("$FIG_Config::bin/rp_glue_contigs", $job->dir); } sub do_pchs { my($job) = @_; &run("$FIG_Config::bin/rp_compute_pchs", $job->dir); } sub do_scenario { my($job) = @_; &run("$FIG_Config::bin/rp_scenarios", $job->dir); } sub do_export { my($job) = @_; &run("$FIG_Config::bin/rp_write_exports", $job->dir); } sub mark_job_user_complete { my($job) = @_; my $job_dir = $job->dir; my $meta = $job->meta; my $job_id = $job->id; system("$FIG_Config::bin/send_job_completion_email", $job_dir); $meta->set_metadata("status.final", "complete"); # # If the job is a SEED candidate, send VV email. # if ($meta->get_metadata("import.suggested") or $meta->get_metadata("import.candidate")) { my $gname = $job->genome_name; my $mail = Mail::Mailer->new(); $mail->open({ To => 'Veronika Vonstein , Robert Olson, Andreas Wilke', From => 'Annotation Server ', Subject => "RAST job $job_id marked for SEED inclusion", }); print $mail <close(); # # We also mark the job as ACTIVE again so that the # normal post-seed-acceptance pipeline stages may execute. # open(F, ">$job_dir/ACTIVE"); close(F); } else { # # Otherwise it is completely done. # &mark_job_done($job); } } sub mark_job_done { my($job) = @_; # # If we spooled the job out onto the lustre disk, we need to # spool it back. # my $meta = $job->meta; my $job_dir = $job->dir; if ($meta->get_metadata("lustre_required")) { &run("$FIG_Config::bin/rp_lustre_finish", $job_dir); } if (open(D, ">$job_dir/DONE")) { print D time . "\n"; close(D); } else { warn "Error opening $job_dir/DONE: $!\n"; } unlink("$job_dir/ACTIVE"); } sub run { my(@cmd) = @_; my $cmd_str = join(" ", @cmd); print "Start: $cmd_str\n"; $meta->add_log_entry($0, ['Start', $cmd_str]); my $rc = system(@cmd); if ($rc != 0) { $meta->add_log_entry($0, ['Failed', $rc, $cmd_str]); if (open(FH, ">$job_dir/CANCEL")) { print FH "Cancel job due to error in $0 @cmd\n"; close(FH); } # # Attempt to qdel any other parts of this job that are queued or running. # my @jobs; for my $k ($meta->get_metadata_keys()) { if ($k =~ /ph_.*\.sge_job_id/) { my $job_id = $meta->get_metadata($k); # # Don't qdel this job. # if ($job_id =~ /^\d+$/ && $job_id != $ENV{JOB_ID}) { push(@jobs, $job_id); } } } if (@jobs) { my $rc2 = system("qdel", @jobs); print "qdel @jobs returned $rc2\n"; $meta->add_log_entry($0, "Qdel @jobs due to failure returned status $rc2"); } confess "Cmd failed with rc=$rc: $cmd_str\n"; } $meta->add_log_entry($0, ['Done', $cmd_str]); print "Done: $cmd_str\n"; }