[Bio] / MGRASTBackend / mgrast_pipeline.pl Repository:
ViewVC logotype

View of /MGRASTBackend/mgrast_pipeline.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (download) (as text) (annotate)
Fri May 1 19:39:47 2009 UTC (10 years, 7 months ago) by jared
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +5 -0 lines
*** empty log message ***

use FIG;
use FIG_Config;
use GenomeMeta;
use Data::Dumper;
use SGE;
use Tracer;

use MGRASTBackend::MGRASTPipeline qw( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done );

use strict;

TSetup("2 main FIG", "TEXT");

my $job_spool_dir = $FIG_Config::mgrast_jobs;

# Verify we have at least 10G of space left.
&check_free_space($job_spool_dir);

# Get job list
my $jobs = &get_jobs($job_spool_dir);

# Setup SGE
my $sge = new SGE;

# Get pipeline stages and check job
for my $job (@$jobs){
  Trace("Getting pipeline for $job at $job\n") if T(1);
  my $stages = &get_pipeline_for_job("$job_spool_dir/$job");
  if(defined $stages){
    Trace("Stages found for job $job\n") if T(2);
    check_job($job, "$job_spool_dir/$job", $stages, $sge);
  } else {
    Confess("Stages not found for job $job.");
    return;
  }
}

sub check_job{
  my($job_id, $job_dir, $stages, $sge) = @_;
  
  Trace("Checking $job_id at $job_dir\n") if T(1);
  
  if (-f "$job_dir/DELETE"){
    Trace("Skipping job $job_id: marked as deleted\n") if T(2);
    return;
  } 
 
  if (! -f "$job_dir/ACTIVE"){
    Trace("Skipping job $job_id as not active\n") if T(2);
    return;
  }

  if (! -f "$job_dir/MGRAST2"){
    Trace("Skipping job $job_id: it is not a mgrast2 job\n") if T(2);
    return;
  }
  
  if (-f "$job_dir/DONE"){
    Trace("Skipping job $job_id as done\n") if T(2);
    return;
  }

  my $meta = new GenomeMeta("metajob_$job_id", "$job_dir/meta.xml");
  
  if (!$meta){
    Confess("Could not create meta for $job_dir/meta.xml");
    return;
  }

  for my $stage (@$stages){
    my($name, $processor) = @$stage;
    
    my $status = $meta->get_metadata("status.$name");
    
    next if $status eq "complete";
    return if $status eq "error" or $status eq 'in_progress' or $status eq 'running';
    
    #
    # Stage is not complete and not in error. Process it.
    #
    # Note that if the stage is marked as queued, we will
    # invoke the processor. This as designed, so that an
    # SGE-aware processor can ensure the task is still queued
    # and hasn't failed in a way that it did not get marked
    # as running.
    #
    
    eval {
      if (ref($processor) eq 'CODE'){
	&$processor($name, $job_id, $job_dir, $meta, $sge);
      }
      elsif (ref($processor)){
	print Dumper($processor);
	$processor->process($name, $job_id, $job_dir, $meta, $sge);
      }
      else{
	warn "Unknown processor " . Dumper($processor);
      }
    };

    if ($@){
      print "Error processing job $job_id\n$@\n";
    } 
    return;
  }
  
  #
  # This job is done.
  #
  
  &mark_job_done( $job_id, $job_dir, $meta);
}

   




MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3