[Bio] / FortyEightMeta / check_jobs_mg.pl Repository:
ViewVC logotype

View of /FortyEightMeta/check_jobs_mg.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.18 - (download) (as text) (annotate)
Wed Aug 27 17:57:06 2008 UTC (11 years, 5 months ago) by olson
Branch: MAIN
CVS Tags: mgrast_dev_08112011, mgrast_dev_08022011, mgrast_dev_05262011, mgrast_dev_04082011, mgrast_version_3_2, mgrast_dev_12152011, mgrast_dev_06072011, mgrast_dev_10262011, mgrast_dev_02212011, mgrast_rel_2008_0923, mgrast_release_3_0, mgrast_dev_03252011, mgrast_rel_2008_0924, mgrast_rel_2008_1110_v2, mgrast_rel_2008_0625, mgrast_release_3_0_4, mgrast_release_3_0_2, mgrast_release_3_0_3, mgrast_release_3_0_1, mgrast_dev_03312011, mgrast_release_3_1_2, mgrast_release_3_1_1, mgrast_release_3_1_0, mgrast_dev_04132011, mgrast_dev_04012011, mgrast_rel_2008_0919, mgrast_rel_2008_1110, myrast_33, mgrast_rel_2008_0917, mgrast_dev_04052011, mgrast_dev_02222011, HEAD
Changes since 1.17: +6 -0 lines
add mgrast2 check


=head1 check_jobs.pl

Check the status of the jobs in the 48-hour run queue to see if any 
action should be taken.

Actions taken are determined based on the metadata kept in meta.xml.

We do a quick check by looking for the file ACTIVE in the job directory.
If this file does not exist, the job should not be considered.

=cut

    
use strict;
use FIG;
use FIG_Config;
use GenomeMeta;
use Data::Dumper;
use Tracer;
use Job48;
use Mail::Mailer;
use ClusterStage;
use SGE;
use Filesys::DfPortable;

TSetup("2 main FIG", "TEXT");

my $job_spool_dir = $FIG_Config::fortyeight_jobs;

#
# Verify we have at least 10G of space left.
#

my $df = dfportable($job_spool_dir, 1024*1024*1024);
if (!defined($df))
{
    die "dfportable call failed on $job_spool_dir: $!";
}
if ($df->{bavail} < 10)
{
    die sprintf "Not enough free space available (%.1f GB) in $job_spool_dir", $df->{bavail};
}

opendir(D, $job_spool_dir) or  die "Cannot open job directory $job_spool_dir: $!\n";

my @jobs = sort { $a <=> $b } grep { /^\d+$/ and -d "$job_spool_dir/$_" } readdir(D);

my @stages = ([uploaded => \&process_upload],
	      [preprocess => ClusterStage->new('mg_preprocess',
					       sge_flag => "-l mg_preprocess",
					      )],
	      [sims => ClusterStage->new('mg_sims',
					 start_locally => 1,
					)],
	      [check_sims => ClusterStage->new('mg_check_sims',
					 start_locally => 1,
					)],
	      [create_seed_org => ClusterStage->new('mg_create_seed_org',
						     sge_flag => "-l mg_postproc_taxa_sims",
						    )],
	      [export => ClusterStage->new('mg_export',
					   sge_flag => "-l mg_export",
					  )],
	      );

my $sge = new SGE;

for my $job (@jobs)
{
    check_job($job, "$job_spool_dir/$job", \@stages, $sge);
}

sub check_job
{
    my($job_id, $job_dir, $stages, $sge) = @_;
    
    Trace("Checking $job_id at $job_dir\n") if T(1);

    if (! -f "$job_dir/ACTIVE")
    {
	Trace("Skipping job $job_id as not active\n") if T(2);
	return;
    }

    if (! -f "$job_dir/MGRAST2")
    {
	Trace("Skipping job $job_id: it is not a mgrast2 job\n") if T(2);
	return;
    }

    if (-f "$job_dir/DONE")
    {
	Trace("Skipping job $job_id as done\n") if T(2);
	return;
    }

    my $meta = new GenomeMeta("metajob_$job_id", "$job_dir/meta.xml");

    if (!$meta)
    {
	Confess("Could not create meta for $job_dir/meta.xml");
	return;
    }

    for my $stage (@stages)
    {
	my($name, $processor) = @$stage;

	my $status = $meta->get_metadata("status.$name");

	next if $status eq "complete";
	return if $status eq "error" or $status eq 'in_progress' or $status eq 'running';

	#
	# Stage is not complete and not in error. Process it.
	#
	# Note that if the stage is marked as queued, we will
	# invoke the processor. This as designed, so that an
	# SGE-aware processor can ensure the task is still queued
	# and hasn't failed in a way that it did not get marked
	# as running.
	#

	eval {
	    if (ref($processor) eq 'CODE')
	    {
		&$processor($name, $job_id, $job_dir, $meta, $sge);
	    }
	    elsif (ref($processor))
	    {
		print Dumper($processor);
		$processor->process($name, $job_id, $job_dir, $meta, $sge);
	    }
	    else
	    {
		warn "Unknown processor " . Dumper($processor);
	    }
	};
	if ($@)
	{
	    print "Error processing job $job_id\n$@\n";
	}
	return;
    }

    #
    # This job is done.
    #

    mark_job_done( $job_id, $job_dir, $meta);
}

sub process_upload
{
    return;
}

sub process_sims
{
    my($name, $job_id, $job_dir, $meta, $sge) = @_;

    my $rc = system("$FIG_Config::bin/mg_sims", $job_dir);
}

   

sub mark_job_done
{
    my($job_id, $job_dir, $meta, $req) = @_;

    if (open(D, ">$job_dir/DONE"))
    {
	print D time . "\n";
	close(D);
    }
    else
    {
	warn "Error opening $job_dir/DONE: $!\n";
    }

    my $job = new Job48($job_id);

    my $userobj = $job->getUserObject();

    print "setting meta $meta\n";
    $meta->set_metadata("status.final","complete");
    print "setting meta $meta .. done\n";

    if ($userobj)
    {
	my $email = $userobj->email();
	my $name = join(" " , $userobj->firstname(), $userobj->lastname());

	my $full = $name ? "$name <$email>" : $email;
	print "send email to $full\n";
    
	my $mail = Mail::Mailer->new();
	$mail->open({
	    To => $full,
	    From => 'Metagenome RAST server <mg-rast@mcs.anl.gov>',
	    Subject => "MG-RAST job completed"
	    });

	my $gname = $job->genome_name;
	my $entry = $FIG_Config::fortyeight_home;
	$entry = "http://metagenomics.nmpdr.org/" if $entry eq '';
	print $mail "The annotation job that you submitted for $gname has completed.\n";
	print $mail "It is available for browsing at $entry as job number $job_id.\n";
	$mail->close();
    }
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3