[Bio] / FortyEight / batch_rast.pl Repository:
ViewVC logotype

View of /FortyEight/batch_rast.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.8 - (download) (as text) (annotate)
Tue Jan 19 17:16:32 2010 UTC (9 years, 10 months ago) by olson
Branch: MAIN
Changes since 1.7: +77 -0 lines
add job completion code

use strict;
use Carp;
use Job48;
use FIG_Config;
use FIG;
use Getopt::Long;

#
# Run a jobdirectory in one shot. For batch offload to a remote cluster that
# doesn't have our scheduler, etc available.
#

#
# Stages are as follows; for now this is a copy and paste exercise from
# FortyEight/check_jobs.pl. Use caution, don't run with scissors.
#

#
# upload
# rp
# Check status of keep_genecalls, then qc
# Check status of correction, then correction
# preprocess_sims
# sims
# bbhs
# auto_assign
# glue_contigs
# pchs
# scenario
# export
#

my $parallel = 1;
my @phase;

my $usage = "Usage: $0 [--parallel N] -phase N [--phase N ..] jobdir\n";

if (!GetOptions("parallel=i" => \$parallel,
		"phase=i" => \@phase))
{
    die $usage;
}
    
@ARGV == 1 or die $usage;

@phase > 0 or die $usage;
my %phase = map { $_ => 1 } @phase;

my $job_dir = shift;

my $job = new Job48($job_dir);

my $sims_data_dir = $FIG_Config::rast_sims_data;

if (!defined($sims_data_dir))
{
    $sims_data_dir = $FIG_Config::fortyeight_data;
}

my $sims_nr = "$sims_data_dir/nr";
my $sims_peg_synonyms = "$sims_data_dir/peg.synonyms";
my $sims_keep_count = 300;

my $job48 = new Job48($job_dir);
my $meta = $job48->meta;

my $host = `hostname`;
chomp $host;
$meta->add_log_entry($0, "Running phases @phase on $host");

#
# Emulate execution of SGE parallel environment via the
# --parallel N argument.
#
if ($parallel > 1)
{
    $ENV{PE} = 'cluster';
    $ENV{NSLOTS} = $parallel;
}

if ($phase{1})
{
    &do_upload($job);
    &do_rp($job);
}

if ($phase{2})
{
    &do_qc($job);
    &do_correction($job);
    &do_sims_preprocess($job);
}

if ($phase{3})
{
    #
    # If running inside a SGE task array job, execute
    # our task. Otherwise run all of them.
    #
    if ($ENV{SGE_TASK_ID})
    {
	&run("$FIG_Config::bin/rp_compute_sims", $job->dir);
    }
    else
    {
	&do_sims($job);
    }
}

if ($phase{4})
{
    &do_sims_postprocess($job);
    &do_bbhs($job);
    &do_auto_assign($job);
    &do_glue_contigs($job);
    &do_pchs($job);
    &do_scenario($job);
    &do_export($job);
    &mark_job_user_complete($job);
}

sub do_upload
{
    my($job) = @_;
    return;
}

sub do_rp
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_rapid_propagation", $job->dir);
}

sub do_qc
{
    my($job) = @_;

    if ($job->meta->get_metadata("keep_genecalls"))
    {
	$job->meta->add_log_entry($0, "keep_genecalls is enabled: marking qc as complete");
	$job->meta->set_metadata("status.qc", "complete");
	return;
    }

    &run("$FIG_Config::bin/rp_quality_check", $job->dir);
}

sub do_correction
{
    my($job) = @_;

    if ($job->meta->get_metadata("keep_genecalls"))
    {
	$job->meta->add_log_entry($0, "keep_genecalls is enabled: marking correction as complete");
	$job->meta->set_metadata("status.correction", "complete");
	return;
    }

    my $correction_list = $job->meta->get_metadata("correction.request");

    if (ref($correction_list))
    {
	my $correction_str = join(",", @$correction_list);
	&run("$FIG_Config::bin/rp_correction", $job->dir, $correction_str);
    }
}

sub do_sims_preprocess
{
    my($job) = @_;

    &run("$FIG_Config::bin/rp_preprocess_sims", $job->dir, $sims_nr, $sims_peg_synonyms);
    
}

sub do_sims
{
    my($job) = @_;

    if (!open(CHUNK, "<", $job->dir.  "/sims.job/chunk.out"))
    {
	die "Error opening $job_dir/sims.job/chunk.out: $!";
    }
	
    #
    # Extract created task ids
    #
    
    my($task_start, $task_end);
    while (<CHUNK>)
    {
	print;
	chomp;
	if (/^tasks\s+(\d+)\s+(\d+)/)
	{
	    $task_start = $1;
	    $task_end = $2;
	}
    }
    close(CHUNK);
    
    if (!defined($task_start))
    {
	die "Tasks not found";
    }

    for my $task ($task_start .. $task_end)
    {
	$ENV{SGE_TASK_ID} = $task;
	&run("$FIG_Config::bin/rp_compute_sims", $job->dir);
    }
}

sub do_sims_postprocess
{
    my($job) = @_;
    
    my $sims_nr_len = $sims_nr;
    if (-f "$sims_nr-len.btree")
    {
	$sims_nr_len = "$sims_nr-len.btree";
    }

    &run("$FIG_Config::bin/rp_postproc_sims", $job->dir, $sims_nr_len, $sims_peg_synonyms, $sims_keep_count);
}

sub do_bbhs
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_compute_bbhs", $job->dir);
}

sub do_auto_assign
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_auto_assign", $job->dir);
}

sub do_glue_contigs
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_glue_contigs", $job->dir);
}

sub do_pchs
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_compute_pchs", $job->dir);
}

sub do_scenario
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_scenarios", $job->dir);
}

sub do_export
{
    my($job) = @_;
    &run("$FIG_Config::bin/rp_write_exports", $job->dir);
}

sub mark_job_user_complete
{
    my($job) = @_;

    my $job_dir = $job->dir;
    my $meta = $job->meta;
    my $job_id = $job->id;

    system("$FIG_Config::bin/send_job_completion_email", $job_dir);

    $meta->set_metadata("status.final", "complete");

    #
    # If the job is a SEED candidate, send VV email.
    #

    if ($meta->get_metadata("import.suggested") or
	$meta->get_metadata("import.candidate"))
    {
	my $gname = $job->genome_name;
	my $mail = Mail::Mailer->new();
	$mail->open({
	    To => 'Veronika Vonstein <veronika@thefig.info>, Robert Olson<olson@mcs.anl.gov>, Andreas Wilke<wilke@mcs.anl.gov>',
	    From => 'Annotation Server <rast@mcs.anl.gov>',
	    Subject => "RAST job $job_id marked for SEED inclusion",
	});

	print $mail <<END;
RAST job #$job_id ($gname) was submitted for inclusion into the SEED, and has finished its processing.
END
    	$mail->close();

	#
	# We also mark the job as ACTIVE again so that the
	# normal post-seed-acceptance pipeline stages may execute.
	#
	open(F, ">$job_dir/ACTIVE");
	close(F);
    }
    else
    {
	#
	# Otherwise it is completely done.
	#
	&mark_job_done($job);
    }
}

sub mark_job_done
{
    my($job) = @_;

    #
    # If we spooled the job out onto the lustre disk, we need to
    # spool it back. 
    #

    my $meta = $job->meta;
    my $job_dir = $job->dir;
    
    if ($meta->get_metadata("lustre_required"))
    {
	&run("$FIG_Config::bin/rp_lustre_finish", $job_dir);
    }
    if (open(D, ">$job_dir/DONE"))
    {
	print D time . "\n";
	close(D);
    }
    else
    {
	warn "Error opening $job_dir/DONE: $!\n";
    }
    
    unlink("$job_dir/ACTIVE");
}

sub run
{
    my(@cmd) = @_;

    print "Start: @cmd\n";
    my $rc = system(@cmd);
    if ($rc != 0)
    {
	confess "Cmd failed with rc=$rc: @cmd\n";
    }
    print "Done: @cmd\n";
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3