[Bio] / FortyEight / import_pipeline.pl Repository:
ViewVC logotype

View of /FortyEight/import_pipeline.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (download) (as text) (annotate)
Wed Sep 5 20:59:03 2007 UTC (12 years, 2 months ago) by olson
Branch: MAIN
CVS Tags: mgrast_dev_08112011, rast_rel_2009_05_18, mgrast_dev_08022011, rast_rel_2014_0912, rast_rel_2008_06_18, rast_rel_2008_06_16, mgrast_dev_05262011, rast_rel_2008_12_18, mgrast_dev_04082011, rast_rel_2008_07_21, rast_rel_2010_0928, rast_2008_0924, mgrast_version_3_2, mgrast_dev_12152011, rast_rel_2008_04_23, mgrast_dev_06072011, rast_rel_2008_09_30, rast_rel_2009_0925, rast_rel_2010_0526, rast_rel_2014_0729, mgrast_dev_02212011, rast_rel_2010_1206, mgrast_release_3_0, mgrast_dev_03252011, rast_rel_2010_0118, mgrast_rel_2008_0924, mgrast_rel_2008_1110_v2, rast_rel_2009_02_05, rast_rel_2011_0119, mgrast_rel_2008_0625, mgrast_release_3_0_4, mgrast_release_3_0_2, mgrast_release_3_0_3, mgrast_release_3_0_1, mgrast_dev_03312011, mgrast_release_3_1_2, mgrast_release_3_1_1, mgrast_release_3_1_0, mgrast_dev_04132011, rast_rel_2008_10_09, mgrast_dev_04012011, rast_release_2008_09_29, mgrast_rel_2008_0806, mgrast_rel_2008_0923, mgrast_rel_2008_0919, rast_rel_2009_07_09, rast_rel_2010_0827, mgrast_rel_2008_1110, myrast_33, rast_rel_2011_0928, rast_rel_2008_09_29, mgrast_rel_2008_0917, rast_rel_2008_10_29, mgrast_dev_04052011, mgrast_dev_02222011, rast_rel_2009_03_26, mgrast_dev_10262011, rast_rel_2008_11_24, rast_rel_2008_08_07, HEAD
Rollup of RAST->SEED import stuff.


#
# Pipeline for processing RAST -> SEED import jobs
#


use strict;
use Job48;
use ClusterStage;
use ImportJob;
use SGE;
use Tracer;

TSetup("2 main FIG", "TEXT");

my $job_spool_dir = $FIG_Config::import_jobs;
if ($job_spool_dir eq '' or ! -d $job_spool_dir)
{
    die "Job directory not defined or missing\n";
}

my @jobs = ImportJob::all_jobs();

print "Jobs=" . Dumper(\@jobs);
@jobs = grep { -f $_->dir . "/ACTIVE" } @jobs;
if (@jobs > 1)
{
    die "there shoudl only be one active job\n";
}

my @stages = ([build_nr => ClusterStage->new('imp_build_nr',
					     no_localdb => 1,
					     sge_flag => ["-l", "smp"],
					    )],
	      [prepare_sims => ClusterStage->new('imp_prepare_sims',
						 #no_localdb => 1,
						 start_locally => 1,
						)],
	      [submit_tl_sims => ClusterStage->new('imp_submit_tl_sims',
						   no_localdb => 1,
						   sge_flag => ["-l", "timelogic"])],
	      [check_tl_sim_status => ClusterStage->new('imp_check_tl_sim_status',
							no_localdb => 1,
							sge_flag => ["-l", "timelogic"])],
	      [finish_tl_sims => ClusterStage->new('imp_finish_tl_sims',
						   start_locally => 1)],
	       
	      );

my $sge = new SGE;

for my $job (@jobs)
{
    check_job($job, $job->dir, \@stages, $sge);
}

sub check_job
{
    my($job, $job_dir, $stages, $sge) = @_;

    my $job_id = $job->id;
    
    Trace("Checking $job_id at $job_dir\n") if T(1);

    if (! -f "$job_dir/ACTIVE")
    {
	Trace("Skipping job $job_id as not active\n") if T(2);
	return;
    }

    if (-f "$job_dir/DONE")
    {
	Trace("Skipping job $job_id as done\n") if T(2);
	return;
    }

    my $meta = $job->meta;

    if (!$meta)
    {
	Confess("Could not create meta for $job_dir/meta.xml");
	return;
    }

    for my $stage (@stages)
    {
	my($name, $processor) = @$stage;

	my $status = $meta->get_metadata("status.$name");

	next if $status eq "complete";
	return if $status eq "error";

	#
	# Stage is not complete and not in error. Process it.
	#
	# Note that if the stage is marked as queued or running, we will
	# invoke the processor. This as designed, so that an
	# SGE-aware processor can ensure the task is still queued
	# and hasn't failed in a way that it did not get marked
	# as running.
	#

	eval {
	    if (ref($processor) eq 'CODE')
	    {
		&$processor($name, $job_id, $job_dir, $meta, $sge);
	    }
	    elsif (ref($processor))
	    {
		print Dumper($processor);
		$processor->process($name, $job_id, $job_dir, $meta, $sge);
	    }
	    else
	    {
		warn "Unknown processor " . Dumper($processor);
	    }
	};
	if ($@)
	{
	    print "Error processing job $job_id\n$@\n";
	}
	return;
    }

    #
    # This job is done.
    #

    mark_job_done( $job_id, $job_dir, $meta);
}

sub mark_job_done
{
    my($job_id, $job_dir, $meta, $req) = @_;

    if (open(D, ">$job_dir/DONE"))
    {
	print D time . "\n";
	close(D);
    }
    else
    {
	warn "Error opening $job_dir/DONE: $!\n";
    }

    my $job = new ImportJob($job_id);

    print "setting meta $meta\n";
    $meta->set_metadata("status.final","complete");
    print "setting meta $meta .. done\n";
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3