[Bio] / FortyEight / check_jobs_ross.pl Repository:
ViewVC logotype

View of /FortyEight/check_jobs_ross.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (download) (as text) (annotate)
Wed Dec 19 23:10:56 2007 UTC (11 years, 11 months ago) by overbeek
Branch: MAIN
CVS Tags: mgrast_dev_08112011, rast_rel_2009_05_18, mgrast_dev_08022011, rast_rel_2014_0912, rast_rel_2008_06_18, rast_rel_2008_06_16, mgrast_dev_05262011, rast_rel_2008_12_18, mgrast_dev_04082011, rast_rel_2008_07_21, rast_rel_2010_0928, rast_2008_0924, mgrast_version_3_2, mgrast_dev_12152011, rast_rel_2008_04_23, mgrast_dev_06072011, rast_rel_2008_09_30, rast_rel_2009_0925, rast_rel_2010_0526, rast_rel_2014_0729, mgrast_dev_02212011, rast_rel_2010_1206, mgrast_release_3_0, mgrast_dev_03252011, rast_rel_2010_0118, mgrast_rel_2008_0924, mgrast_rel_2008_1110_v2, rast_rel_2009_02_05, rast_rel_2011_0119, mgrast_rel_2008_0625, mgrast_release_3_0_4, mgrast_release_3_0_2, mgrast_release_3_0_3, mgrast_release_3_0_1, mgrast_dev_03312011, mgrast_release_3_1_2, mgrast_release_3_1_1, mgrast_release_3_1_0, mgrast_dev_04132011, rast_rel_2008_10_09, mgrast_dev_04012011, rast_release_2008_09_29, mgrast_rel_2008_0806, mgrast_rel_2008_0923, mgrast_rel_2008_0919, rast_rel_2009_07_09, rast_rel_2010_0827, mgrast_rel_2008_1110, myrast_33, rast_rel_2011_0928, rast_rel_2008_09_29, mgrast_rel_2008_0917, rast_rel_2008_10_29, mgrast_dev_04052011, mgrast_dev_02222011, rast_rel_2009_03_26, mgrast_dev_10262011, rast_rel_2008_11_24, rast_rel_2008_08_07, HEAD
this is a hacked check_jobs for the ross RAST


=head1 check_jobs.pl

Check the status of the jobs in the 48-hour run queue to see if any 
action should be taken.

Actions taken are determined based on the metadata kept in meta.xml.

We do a quick check by looking for the file ACTIVE in the job directory.
If this file does not exist, the job should not be considered.

=cut

    
use strict;
use FIG;
use FIG_Config;
use GenomeMeta;
use Data::Dumper;
use Tracer;
use Job48;
use Mail::Mailer;
use Mantis;
use Filesys::DfPortable;

TSetup("2 main FIG", "TEXT");

my $job_spool_dir = $FIG_Config::fortyeight_jobs;

#
# Verify we have at least 10G of space left.
#

my $df = dfportable($job_spool_dir, 1024*1024*1024);
if (!defined($df))
{
    die "dfportable call failed on $job_spool_dir: $!";
}
if ($df->{bavail} < 10)
{
    die sprintf "Not enough free space available (%.1f GB) in $job_spool_dir", $df->{bavail};
}

#my $sims_data_dir = "/scratch/48-hour/Data";
#my $sims_data_dir = "/vol/48-hour/Data";

my $sims_data_dir = $FIG_Config::fortyeight_data;

my $sims_nr = "$sims_data_dir/nr";
my $sims_peg_synonyms = "$sims_data_dir/peg.synonyms";
my $sims_keep_count = 300;

opendir(D, $job_spool_dir) or  die "Cannot open job directory $job_spool_dir: $!\n";

my $qstat = read_qstat();
#print Dumper($qstat);
#exit;

my @jobs = sort { $a <=> $b } grep { /^\d+$/ and -d "$job_spool_dir/$_" } readdir(D);

for my $job (@jobs)
{
    check_job($job, "$job_spool_dir/$job");
}

sub check_job
{
    my($job_id, $job_dir) = @_;
    Trace("Checking $job_id at $job_dir\n") if T(1);

    if (! -f "$job_dir/ACTIVE")
    {
	Trace("Skipping job $job_id as not active\n") if T(2);
	return;
    }

    if (-f "$job_dir/DONE")
    {
	Trace("Skipping job $job_id as done\n") if T(2);
	return;
    }

    if (! -d "$job_dir/sge_output")
    {
	mkdir "$job_dir/sge_output";
    }

    my $genome = &FIG::file_head("$job_dir/GENOME_ID", 1);
    if (!$genome)
    {
	Trace("Skipping job $job_id: no GENOME_ID\n");
	return;
    }
    chomp $genome;
    print "Genome is $genome\n";

    my $meta = new GenomeMeta($genome, "$job_dir/meta.xml");

    if (!$meta)
    {
	Confess("Could not create meta for $genome $job_dir/meta.xml");
	return;
    }

    #
    # Now go through the stages of life for a genome dir.
    #

    if ($meta->get_metadata("status.uploaded") ne "complete")
    {
	process_upload();
	return;
    }

    #
    # If rapid progation is not complete, schedule it, unless it
    # had errored. In any event, if it's not complete, do not proceed.
    #
    if ($meta->get_metadata("status.rp") ne "complete")
    {
	if ($meta->get_metadata("status.rp") ne "error")
	{
	    process_rp($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "rp");
	}

	return;
    }

    #
    # We do not touch the QC or correction phases if keep_genecalls is enabled.
    #
    my $keep_genecalls = $meta->get_metadata("keep_genecalls");

    if ($meta->get_metadata("status.qc") ne "complete")
    {
	if ($keep_genecalls)
	{
	    $meta->add_log_entry($0, "keep_genecalls is enabled: marking qc as complete");
	    $meta->set_metadata("status.qc", "complete");
	}
	elsif ($meta->get_metadata("status.qc") ne "error")
	{
	    process_qc($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "qc");
	}

	return;
    }

    #
    # See if we need to perform correction.
    #

    my $cor_status = $meta->get_metadata("status.correction");

    if ($cor_status ne 'complete')
    {
	if ($keep_genecalls)
	{
	    $meta->add_log_entry($0, "keep_genecalls is enabled: marking correction as complete");
	    $meta->set_metadata("status.correction", "complete");
	}
	elsif ($cor_status ne "error" and $cor_status ne 'requires_intervention')
	{
	    my $req = $meta->get_metadata("correction.request");
	    process_correction($genome, $job_id, $job_dir, $meta, $req);
	}
	elsif ($cor_status eq 'error')
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "correction");
	}
	return;
    }

    #
    # Determine if we have no errors that require user intervention.
    #

#    if ($meta->get_metadata("status.stoplight") ne "complete")
#    {
#	check_qc_status_for_intervention($genome, $job_id, $job_dir, $meta, $req);
#    }

    #
    # User interaction stoplight stuff must have completed for us to proceed.
    #
#    if ($meta->get_metadata("status.stoplight") ne "complete")
#    {
#	return;
#    }

    if ((my $sim_status = $meta->get_metadata("status.sims")) ne "complete")
    {
	if ($sim_status ne "error")
	{
	    process_sims($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "sims");
	}
	return;
    }

    if ((my $sim_status = $meta->get_metadata("status.bbhs")) ne "complete")
    {
	if ($sim_status ne "error")
	{
	    process_bbhs($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "bbhs");
	}
	return;
    }

    if ((my $aa_status = $meta->get_metadata("status.auto_assign")) ne "complete")
    {
	if ($aa_status ne "error")
	{
	    process_auto_assign($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "auto_assign");
	}
	return;
    }

    if ((my $aa_status = $meta->get_metadata("status.glue_contigs")) ne "complete")
    {
	if ($aa_status ne "error")
	{
	    process_glue_contigs($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "glue_contigs");
	}
	return;
    }

    if ((my $pch_status = $meta->get_metadata("status.pchs")) ne "complete")
    {
	if ($pch_status ne "error")
	{
	    process_pchs($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "pchs");
	}
	return;
    }

    if ((my $scenario_status = $meta->get_metadata("status.scenario")) ne "complete")
    {
	if ($scenario_status ne "error")
	{
	    process_scenario($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "scenario");
	}
	return;
    }

    #
    # Here marks the end of the stock processing stages. Anything beyond is triggered
    # only if this genome is marked for inclusion into the SEED.
    #

    if ($meta->get_metadata("status.final") ne "complete")
    {
	mark_job_user_complete($genome, $job_id, $job_dir, $meta);
    }

    #
    # If the job is marked as a JGI candidate, let it flow past. If not, we need
    # to do more thorough checks on seed submission status.
    #

    if (not $meta->get_metadata("submit.JGI"))
    {

	#
	# If this job is not even a candidate for seed inclusion, mark it as completely done.
	#

	if (not ($meta->get_metadata("submit.suggested") or $meta->get_metadata("submit.candidate")))
	{
	    print "Job not a candidate, marking as done\n";
	    mark_job_done($genome, $job_id, $job_dir, $meta);
	    return;
	}

	#
	# The job was a candidate. If it has been rejected (submit.never is set), mark it completely done.
	#

	if ($meta->get_metadata("submit.never"))
	{
	    print "Job rejected, marking as done\n";
	    mark_job_done($genome, $job_id, $job_dir, $meta);
	    return;
	}
	    

	#
	# If the job has not yet been approved, just return and check again later.
	#
	
	if (not($meta->get_metadata("submit.seed") or $meta->get_metadata("submit.nmpdr")))
	{
	    print "Job not yet checked, returning\n";
	    return;
	}

	#
	# Otherwise, it's an approved candidate, and we can go ahead and process.
	#
	print "Continuing\n";
    }

    #
    # Perform Glimmer and Critica calls if marked for JGI-teach inclusion.
    #

    if ($meta->get_metadata("submit.JGI"))
    {
	if ((my $glimmer_status = $meta->get_metadata("status.glimmer")) ne "complete")
	{
	    if ($glimmer_status ne "error")
	    {
		process_glimmer($genome, $job_id, $job_dir, $meta);
	    }
	    else
	    {
		flag_error($genome, $job_id, $job_dir, $meta, "glimmer");
	    }
	    return;
	}
	if ((my $critica_status = $meta->get_metadata("status.critica")) ne "complete")
	{
	    if ($critica_status ne "error")
	    {
		process_critica($genome, $job_id, $job_dir, $meta);
	    }
	    else
	    {
		flag_error($genome, $job_id, $job_dir, $meta, "critica");
	    }
	    return;
	}
    }

    if ((my $pfam_status = $meta->get_metadata("status.pfam")) ne "complete")
    {
	if ($pfam_status ne "error")
	{
	    process_pfam($genome, $job_id, $job_dir, $meta);
	}
	else
	{
	    flag_error($genome, $job_id, $job_dir, $meta, "pfam");
	}
	return;
    }

#     if ((my $cello_status = $meta->get_metadata("status.cello")) ne "complete")
#     {
# 	if ($cello_status ne "error")
# 	{
# 	    process_cello($genome, $job_id, $job_dir, $meta);
# 	}
# 	else
# 	{
# 	    flag_error($genome, $job_id, $job_dir, $meta, "cello");
# 	}
# 	return;
#     }

#     if ((my $phobius_status = $meta->get_metadata("status.phobius")) ne "complete")
#     {
# 	if ($phobius_status ne "error")
# 	{
# 	    process_phobius($genome, $job_id, $job_dir, $meta);
# 	}
# 	else
# 	{
# 	    flag_error($genome, $job_id, $job_dir, $meta, "phobius");
# 	}
# 	return;
#     }


    #
    # This job is done.
    #

    mark_job_done($genome, $job_id, $job_dir, $meta);
}

#
# Flag an error.
#
# This will send an email to the user notifying them that their job had
# an error; it copies the rast list in order to alert them that
# such an error occurred.
#
sub flag_error
{
    my($genome, $job_id, $job_dir, $meta, $stage) = @_;

    my $msg;
    if ($stage eq 'rp')
    {
	#
	# Hunt down some more details on the error.
	#

	my $err = $meta->get_metadata("rp.error");

	if ($err =~ /raw genome directory.*does not exist/ or
	    $err =~ /Unformatted contigs file.*does not exists/)
	{
	    $msg = "An error occurred during the upload of your data.";
	}
	elsif ($err =~ /reformat command failed/)
	{
	    my $f = &FIG::file_read("$job_dir/rp.errors/reformat_contigs_split.stderr");
	    if ($f =~ /File does not appear to be in FASTA/)
	    {
		$msg = "An error occurred during the parsing of your input file.";
	    }
	    else
	    {
		$msg = "An error occurred during the upload of your data.";
	    }
	}
	elsif ($err =~ /rapid_propagation command failed/ or
	       $err =~ /rapid_propagation did not create any features/)
	{
	    $msg = "An error occurred during the annotation of your data.";
	}
	elsif (-f "$job_dir/rp.errors/find_neighbors_using_figfams.stderr")
	{
	    my $ff = &FIG::file_read("$job_dir/rp.errors/find_neighbors_using_figfams.stderr");
	    if ($ff =~ /Could not find any features of sufficient length/)
	    {
		$msg = "RAST processing could not determine the phylogenetic neighborhood of your genome.\n";
		$msg .= "This may mean the genome was a fragment too small for RAST processing to be effective.";
	    }
	}
    }
    elsif ($stage eq 'qc')
    {
	$msg = "An error occurred during the quality check phase of your genome's analysis.";
    }
    elsif ($stage eq 'sims')
    {
	$msg = "An error occurred during the similarity computation phase of your genome's analysis.";
    }
    elsif ($stage eq 'bbhs')
    {
	$msg = "An error occurred during the BBH computation phase of your genome's analysis.";
    }
    elsif ($stage eq 'auto_assign')
    {
	$msg = "An error occurred during the automated assignment phase of your genome's analysis.";
    }
    elsif ($stage eq 'glue_contings')
    {
	$msg = "An error occurred during the postprocessing phase of your genome's analysis.";
    }
    elsif ($stage eq 'pchs')
    {
	$msg = "An error occurred during the coupling computation phase of your genome's analysis.";
    }
    elsif ($stage eq 'scenario')
    {
	$msg = "An error occurred during the scenario computation phase of your genome's analysis.";
    }

    if (!$msg)
    {
	$msg = "An error occurred during the analysis of your genome.";
    }

    #
    # Use the mantis info if there to figure out what server this is.
    #

    my $server_info;
    if (my $mi = $FIG_Config::mantis_info)
    {
	my $system = $mi->{system};
	my $server = $mi->{server_value};
	$server_info = " in the $system $server server"
    }
    else
    {
	$server_info = "";
    }
    
    my $genome_name = &FIG::file_head("$job_dir/GENOME", 1);
    chomp $genome_name;
    my $body = <<END;
This message is regarding the RAST processing of your genome $genome_name, job number $job_id$server_info.

$msg

RAST developers will be investigating the cause of the error and possbily contacting
you for more information. 
END

    if (open(E, ">$job_dir/ERROR"))
    {
	print E "$msg\n";
	close(E);
    }
    $meta->set_metadata("genome.error", $msg);

    unlink("$job_dir/ACTIVE");

    my $job = new Job48($job_id);
    my $userobj = $job->getUserObject();
    my($email, $name);
    
    if ($userobj)
    {
	$email = $userobj->eMail();
	$name = join(" " , $userobj->firstName(), $userobj->lastName());
    }

    #
    # if we are configured for Mantis integration, notify Mantis.
    # But only if we are not the batch user.
    #

    my($bug_id, $bug_url);
    if ($FIG_Config::mantis_info and $job->user() ne 'batch')
    {
	eval {
	    my $mantis = Mantis->new($FIG_Config::mantis_info);
	    
	    ($bug_id, $bug_url) = $mantis->report_bug(stage => $stage,
						     genome => $genome,
						     genome_name => $genome_name,
						     job_id => $job_id,
						     job_dir => $job_dir,
						     user_email => $email,
						     user_name => $name,
						     meta => $meta,
						     msg => $msg);

	    $body .= "\nBug report number $bug_id has been filed in the RAST bugtracking system for this error.\n";
	    $body .= "It may be viewed at the url $bug_url\n";
	};
	if ($@)
	{
	    warn "Exception while reporting Mantis bug:\n$@\n";
	}
    }

    if ($meta->get_metadata("genome.error_notification_sent") ne "yes")
    {
	if ($email)
	{
	    my $full = $name ? "$name <$email>" : $email;
	    
	    my $mail = Mail::Mailer->new();
	    $mail->open({
		To => $full,
		Cc => 'Annotation Server <rast@mcs.anl.gov>',
		From => 'Annotation Server <rast@mcs.anl.gov>',
		Subject => "RAST annotation server error on job $job_id",
		});

	    print $mail $body;
	    $mail->close();
	    $meta->set_metadata("genome.error_notification_sent", "yes");
	    $meta->set_metadata("genome.error_notification_time", time);
	    $meta->set_metadata("genome.error_notification_sent_address", $email);
	}
    }
}

    


#
# Hm, probably nothing we can really do here.
#
sub process_upload
{
    return;
}

=head2 process_rp 

Start or check status on the rapid propagation.

=cut

sub process_rp_old
{
    my($genome, $job, $job_dir, $meta) = @_;

    #
    # See if we have started running RP here.
    #

    if ($meta->get_metadata("rp.running") eq 'yes')
    {
	Trace("RP is running for $job") if T(1);

	#
	# Is the job now done?
	#
	my $sge_id = $meta->get_metadata("rp.sge_job_id");
	my $status;
	
	if ($sge_id)
	{
	    my $stat = $qstat->{$sge_id};
	    if ($stat)
	    {
		for my $k (keys %$stat)
		{
		    my $mdk = "rp.sge_status.$k";
		    my $cur = $meta->get_metadata($mdk);
		    if ($stat->{$k} ne $cur)
		    {
			$meta->set_metadata($mdk, $stat->{$k});
		    }
		}
	    }
	    else
	    {
		$stat = { status => 'missing' };
	    }
		
	    if ($stat->{status} eq 'r')
	    {
		#
		# see if queue or host has changed
		#
		my $q = $meta->get_metadata("rp.sge_status.queue");
		my $h = $meta->get_metadata("rp.sge_status.host");
	    }
	    elsif ($stat->{status} eq 'qw')
	    {
		$meta->set_metadata('status.rp', 'queued');
	    }
	    else
	    {
		Trace("RP is done") if T(1);
		
		$meta->set_metadata("rp.running", "no");

		#
		# Need to determine if run succeeded. We say it did if a
		# genome dir got copied.
		#


		if (-d "$job_dir/rp/$genome")
		{
		    $meta->set_metadata("status.rp", "complete");
		}
		else
		{
		    $meta->set_metadata("status.rp", "error");
		}
	    }
	}
	
	return;
    }
    elsif ($meta->get_metadata('status.rp') eq 'queued')
    {
	Trace("RP queued") if T(1);
    }
	

    #
    # Otherwise, set up for run and submit.
    #

    my $tmp = "tmprp.$$";
    my $tmpdir;

    $tmpdir = "/scratch/$tmp";
    my $meta_file = $meta->get_file();

    my $errdir = "$job_dir/rp.errors";
    &FIG::verify_dir($errdir);

    &FIG::verify_dir("$job_dir/rp");
    my $cmd = "qsub -N rp_$job -e $job_dir/sge_output -o $job_dir/sge_output -v PATH -l fig_resource -b yes $FIG_Config::bin/rapid_propagation --errdir $errdir --meta $meta_file --tmpdir $tmpdir $job_dir/raw/$genome $job_dir/rp/$genome";
    $meta->add_log_entry($0, $cmd);
    if (!open(Q, "$cmd|"))
    {
	Confess("Qsub failed for job $job genome $genome in $job_dir: $!");
	$meta->add_log_entry($0, "Qsub failed for job $job genome $genome in $job_dir: $!");
	return;
    }
    my $sge_job_id;
    while (<Q>)
    {
	if (/Your\s+job\s+(\d+)/)
	{
	    $sge_job_id = $1;
	}
    }
    if (!close(Q))
    {
	$meta->add_log_entry($0, "Qsub close failed: $!");
	Confess("Qsub close failed: $!");
    }

    if (!$sge_job_id)
    {
	$meta->add_log_entry($0, "did not get job id from qsub");
	Confess("did not get job id from qsub");
    }

    Trace("Submitted, job id is $sge_job_id") if T(1);

    $meta->set_metadata("rp.sge_job_id", $sge_job_id);
    $meta->set_metadata("rp.start_time", time);
    $meta->set_metadata("rp.running", "yes");
    $meta->set_metadata("status.rp", "queued");
}
    
sub read_qstat
{
    if (!open(Q, "qstat  -f -s prs |"))
    {
	warn "Could not read queue status: $!\n";
	return;
    }

    my $qstat = {};
    my $finished;
    my $queue;
    my $host;
    while (<Q>)
    {
	
	if (/FINISHED JOBS/)
	{
	    $finished++;
	    undef $queue;
	    undef $host;
	    next;
	}
	if (/^([^@]+)@(\S+)/)
	{
	    $queue = $1;
	    $host = $2;
	}
	elsif (/^----/)
	{
	    undef $queue;
	    undef $host;
	}

	if (/^\s+(\d+)\s+(.*)/)
	{
	    my $jobid = $1;
	    my $rest = $2;
	    my($uptime, $job, $user, $status, $date, $time, $slots) = split(/\s+/, $rest);
#	    print "Got job=$jobid status=$status user=$user date=$date time=$time finished=$finished\n";
	    $status = "done" if $finished;
	    my $ent = { id => $jobid, status => $status, user => $user, date => $date, time => $time, name => $job };

	    $ent->{queue} = $queue if $queue;
	    $ent->{host} = $host if $host;

	    $qstat->{$jobid} = $ent;


	}
    }
    return $qstat;
}

#
# Process the sim calculation.
#
# Invoke rp_chunk_sims to create the input job
# Queue a task-array job of rp_compute_sims.
# Queue a job rp_postproc_sims that is held on the taskarray job. This does
# the post-computation concatenation of the generated sims data when the sims
# have completed.
#
sub process_sims
{
    my($genome, $job, $job_dir, $meta) = @_;

    if ($meta->get_metadata("sims.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    my $cmd = "$FIG_Config::bin/rp_chunk_sims $job_dir/rp/$genome/Features/peg/fasta " .
	        "$sims_nr $sims_peg_synonyms $job_dir/sims.job";


    #
    # Create chunked input.
    #
    
    $meta->add_log_entry($0, ["chunk", $cmd]);
    if (!open(CHUNK, "$cmd |"))
    {
	warn "$cmd failed: $!\n";
	$meta->add_log_entry($0, ["chunk_failed", $!]);
	$meta->set_metadata("sims.running", "no");
	$meta->set_metadata("status.sims", "error");
	return;
    }

    #
    # Extract created task ids
    #
    
    my($task_start, $task_end);
    while (<CHUNK>)
    {
	print;
	chomp;
	if (/^tasks\s+(\d+)\s+(\d+)/)
	{
	    $task_start = $1;
	    $task_end = $2;
	}
    }
    close(CHUNK);
    
    if (!defined($task_start))
    {
	warn "$cmd did not return a task";
	$meta->add_log_entry($0, "chunk_no_task");
	$meta->set_metadata("sims.running", "no");
	$meta->set_metadata("status.sims", "error");
	return;
    }

    #
    # Submit.
    #
    
    my $sge_job_id;

    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_s_$job -v PATH -b yes -t $task_start-$task_end",
				 "$FIG_Config::bin/rp_compute_sims $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("sims.running", "no");
	$meta->set_metadata("status.sims", "error");
	$meta->add_log_entry($0, ["sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }

    #
    # Also submit the postprocessing job, held on the sims run.
    #

    my $pp_sge_id;
    eval {
	
	$pp_sge_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				"-N rp_ps_$job -v PATH -b yes -hold_jid $sge_job_id -l fig_resource",
				"$FIG_Config::bin/rp_postproc_sims $job_dir $sims_nr $sims_peg_synonyms $sims_keep_count");
    };

    if ($@)
    {
	$meta->set_metadata("sims.running", "no");
	$meta->set_metadata("status.sims", "error");
	$meta->add_log_entry($0, ["sge postprocess submit failed", $@]);
	warn "submit failed: $@\n";
	system("qdel", $sge_job_id);
	return;
    }

    $meta->set_metadata("sims.running", "yes");
    $meta->set_metadata("status.sims", "queued");

    $meta->set_metadata("sims.sge_job_id", $sge_job_id);
    $meta->set_metadata("sims.sge_postproc_job_id", $pp_sge_id);
    $meta->add_log_entry($0, ["submitted sims job", $sge_job_id]);
    $meta->add_log_entry($0, ["submitted postprocess job", $pp_sge_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_bbhs
{
    my($genome, $job, $job_dir, $meta) = @_;

    if ($meta->get_metadata("bbhs.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }

    #
    # Submit.
    #
    
    my $sge_job_id;

    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_bbh_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_compute_bbhs $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("bbhs.running", "no");
	$meta->set_metadata("status.bbhs", "error");
	$meta->add_log_entry($0, ["bbhs sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }

    $meta->set_metadata("bbhs.running", "yes");
    $meta->set_metadata("status.bbhs", "queued");

    $meta->set_metadata("bbhs.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted bbhs job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_auto_assign
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("auto_assign.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_aa_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_auto_assign $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("auto_assign.running", "no");
	$meta->set_metadata("status.auto_assign", "error");
	$meta->add_log_entry($0, ["auto_assign sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("auto_assign.running", "yes");
    $meta->set_metadata("status.auto_assign", "queued");
    
    $meta->set_metadata("auto_assign.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted auto_assign job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_glue_contigs
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("glue_contigs.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_glue_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_glue_contigs $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("glue_contigs.running", "no");
	$meta->set_metadata("status.glue_contigs", "error");
	$meta->add_log_entry($0, ["glue_contigs sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("glue_contigs.running", "yes");
    $meta->set_metadata("status.glue_contigs", "queued");
    
    $meta->set_metadata("glue_contigs.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted glue_contigs job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_pchs
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("pchs.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_pch_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_compute_pchs $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("pchs.running", "no");
	$meta->set_metadata("status.pchs", "error");
	$meta->add_log_entry($0, ["pchs sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("pchs.running", "yes");
    $meta->set_metadata("status.pchs", "queued");
    
    $meta->set_metadata("pchs.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted pchs job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_scenario
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("scenario.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_sc_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_scenarios $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("scenario.running", "no");
	$meta->set_metadata("status.scenario", "error");
	$meta->add_log_entry($0, ["scenario sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("scenario.running", "yes");
    $meta->set_metadata("status.scenario", "queued");
    
    $meta->set_metadata("scenario.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted scenario job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_glimmer
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("glimmer.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_gl_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_glimmer $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("glimmer.running", "no");
	$meta->set_metadata("status.glimmer", "error");
	$meta->add_log_entry($0, ["glimmer sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("glimmer.running", "yes");
    $meta->set_metadata("status.glimmer", "queued");
    
    $meta->set_metadata("glimmer.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted glimmer job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_critica
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("critica.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_cr_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_critica $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("critica.running", "no");
	$meta->set_metadata("status.critica", "error");
	$meta->add_log_entry($0, ["critica sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("critica.running", "yes");
    $meta->set_metadata("status.critica", "queued");
    
    $meta->set_metadata("critica.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted critica job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_pfam
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("pfam.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_pf_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_PFAM_attribute_generation $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("pfam.running", "no");
	$meta->set_metadata("status.pfam", "error");
	$meta->add_log_entry($0, ["pfam sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("pfam.running", "yes");
    $meta->set_metadata("status.pfam", "queued");
    
    $meta->set_metadata("pfam.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted pfam job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

=head2 process_rp 

Start or check status on the rapid propagation.

=cut

sub process_rp
{
    my($genome, $job, $job_dir, $meta) = @_;

    if ($meta->get_metadata("rp.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }

    #
    # Submit.
    #
    
    my $sge_job_id;

    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_rapid_propagation $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("rp.running", "no");
	$meta->set_metadata("status.rp", "error");
	$meta->add_log_entry($0, ["rp sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }

    $meta->set_metadata("rp.running", "yes");
    $meta->set_metadata("status.rp", "queued");

    $meta->set_metadata("rp.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted rp job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

#
# Determine if we can set the stoplight value to complete. This is the case
# if qc.embedded and qc.RNA_overlaps are both zero.
#
# If we don't automatically set the stoplight to complete, and we
# haven't done so yet, send notification email to the user.
#
sub check_qc_status_for_intervention
{
    my($genome, $job_id, $job_dir, $meta) = @_;

    my $val = $meta->get_metadata('qc.Embedded');
    my $num_embed =  $val ? $val->[1]  : 0;

    $val = $meta->get_metadata('qc.RNA_overlaps');
    my $num_overlaps =  $val ? $val->[1]  : 0;

    if ($num_embed == 0 && $num_overlaps == 0)
    {
	$meta->set_metadata("stoplight.acceptedby", "pipeline_automatic");
	$meta->set_metadata("stoplight.timestamp", time);
	$meta->set_metadata("status.stoplight", "complete");
	print "Automatically accepting quality on $job_id $genome\n";
	return;
    }

    if ($meta->get_metadata("qc.email_notification_sent") ne "yes")
    {
	my $job = new Job48($job_id);
	my $userobj = $job->getUserObject();

	if ($userobj)
	{
	    my $email = $userobj->eMail();
	    my $name = join(" " , $userobj->firstName(), $userobj->lastName());
	    
	    my $full = $name ? "$name <$email>" : $email;
	    print "send notification email to $full\n";
	    
	    my $mail = Mail::Mailer->new();
	    $mail->open({
		To => $full,
		From => 'Annotation Server <rast@mcs.anl.gov>',
		Subject => "RAST annotation server job needs attention"
		});
	    
	    my $gname = $job->genome_name;
	    my $entry = $FIG_Config::fortyeight_home;
	    $entry = "http://www.nmpdr.org/anno-server/" if $entry eq '';
	    print $mail "The annotation job that you submitted for $gname needs user input before it can proceed further.\n";
	    print $mail "You may query its status at $entry as job number $job_id.\n";
	    $mail->close();
	    $meta->set_metadata("qc.email_notification_sent", "yes");
	    $meta->set_metadata("qc.email_notification_sent_address", $email);
	}
    }
}
sub process_qc
{
    my($genome, $job, $job_dir, $meta) = @_;

    if ($meta->get_metadata("qc.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }

    #
    # Submit.
    #
    
    my $sge_job_id;

    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_qc_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_quality_check $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("qc.running", "no");
	$meta->set_metadata("status.qc", "error");
	$meta->add_log_entry($0, ["qc sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }

    $meta->set_metadata("qc.running", "yes");
    $meta->set_metadata("status.qc", "queued");

    $meta->set_metadata("qc.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted qc job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_correction
{
    my($genome, $job, $job_dir, $meta, $req) = @_;

    my $sge_job_id;

    if ($meta->get_metadata("correction.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }

    my $req_str = join(",", @$req);

    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_cor_$job -v PATH -b yes -l fig_resource",
				 "$FIG_Config::bin/rp_correction $job_dir '$req_str'");
    };
    if ($@)
    {
	$meta->set_metadata("correction.running", "no");
	$meta->set_metadata("status.correction", "error");
	$meta->add_log_entry($0, ["correction sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }

    $meta->set_metadata("correction.running", "yes");
    $meta->set_metadata("status.correction", "queued");

    $meta->set_metadata("correction.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted correction job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

#
# Mark the job as complete as far as the user is concerned.
#
# It is still active in the pipeline until it either is processed for
# SEED inclusion, or marked to not be included.
#
sub mark_job_user_complete
{
    my($genome, $job_id, $job_dir, $meta, $req) = @_;

    my $job = new Job48($job_id);

    my $userobj = $job->getUserObject();

    if ($userobj)
    {
	my $email = $userobj->eMail();
	my $name = join(" " , $userobj->firstName(), $userobj->lastName());

	my $full = $name ? "$name <$email>" : $email;
	print "send email to $full\n";
    
	my $mail = Mail::Mailer->new();
	$mail->open({
	    To => $full,
	    From => 'Annotation Server <rast@mcs.anl.gov>',
	    Subject => "RAST annotation server job completed"
	    });

	my $gname = $job->genome_name;
	my $entry = $FIG_Config::fortyeight_home;
	$entry = "http://www.nmpdr.org/anno-server/" if $entry eq '';
	print $mail "The annotation job that you submitted for $gname has completed.\n";
	print $mail "It is available for browsing at $entry as job number $job_id.\n";
	$mail->close();
    }

    $meta->set_metadata("status.final", "complete");

    #
    # If the job is a SEED candidate, send VV email.
    #

    if ($meta->get_metadata("submit.suggested") or
	$meta->get_metadata("submit.candidate"))
    {
	my $gname = $job->genome_name;
	my $mail = Mail::Mailer->new();
	$mail->open({
	    To => 'Veronika Vonstein <veronika@thefig.info>, Robert Olson<olson@mcs.anl.gov>',
	    From => 'Annotation Server <rast@mcs.anl.gov>',
	    Subject => "RAST job $job_id marked for SEED inclusion",
	});

	print $mail <<END;
RAST job #$job_id ($gname) was submitted for inclusion into the SEED, and has finished its processing.
END
    	$mail->close();
    }
}

#
# Mark the job as utterly done.
#
sub mark_job_done
{
    my($genome, $job_id, $job_dir, $meta, $req) = @_;

    if (open(D, ">$job_dir/DONE"))
    {
	print D time . "\n";
	close(D);
    }
    else
    {
	warn "Error opening $job_dir/DONE: $!\n";
    }

    unlink("$job_dir/ACTIVE");
}

sub sge_submit
{
    my($meta, $sge_args, $cmd) = @_;
    
    my $sge_cmd = "qsub $sge_args $cmd";
    
    $meta->add_log_entry($0, $sge_cmd);

    if (!open(Q, "$sge_cmd 2>&1 |"))
    {
	die "Qsub failed: $!";
    }
    my $sge_job_id;
    my $submit_output;
    while (<Q>)
    {
	$submit_output .= $_;
	print "Qsub: $_";
	if (/Your\s+job\s+(\d+)/)
	{
	    $sge_job_id = $1;
	}
	elsif (/Your\s+job-array\s+(\d+)/)
	{
	    $sge_job_id = $1;
	}
    }
    $meta->add_log_entry($0, ["qsub_output", $submit_output]);
    if (!close(Q))
    {
	die "Qsub close failed: $!";
    }

    if (!$sge_job_id)
    {
	die "did not get job id from qsub";
    }

    return $sge_job_id;
}

sub process_cello
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("cello.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_cl_$job -v PATH -b yes",
				 "$FIG_Config::bin/rp_CELLO_attribute_generation $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("cello.running", "no");
	$meta->set_metadata("status.cello", "error");
	$meta->add_log_entry($0, ["cello sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("cello.running", "yes");
    $meta->set_metadata("status.cello", "queued");
    
    $meta->set_metadata("cello.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted cello job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

sub process_phobius
{
    my($genome, $job, $job_dir, $meta) = @_;
    
    if ($meta->get_metadata("phobius.running") eq 'yes')
    {
	#
	# We're already running. we might should check for dead SGE jobs,
	# but I am going to skip that for now.
	#
	return;
    }
    
    #
    # Submit.
    #
    
    my $sge_job_id;
    
    eval {
	$sge_job_id = sge_submit($meta,
				 "-e $job_dir/sge_output -o $job_dir/sge_output " .
				 "-N rp_ph_$job -v PATH -b yes",
				 "$FIG_Config::bin/rp_PHOBIUS_attribute_generation $job_dir");
    };
    if ($@)
    {
	$meta->set_metadata("phobius.running", "no");
	$meta->set_metadata("status.phobius", "error");
	$meta->add_log_entry($0, ["phobius sge submit failed", $@]);
	warn "submit failed: $@\n";
	return;
    }
    
    $meta->set_metadata("phobius.running", "yes");
    $meta->set_metadata("status.phobius", "queued");
    
    $meta->set_metadata("phobius.sge_job_id", $sge_job_id);
    $meta->add_log_entry($0, ["submitted phobius job", $sge_job_id]);
    Trace("Submitted, job id is $sge_job_id") if T(1);
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3