[Bio] / MGRASTBackend / MGRASTPipeline.pm Repository:
ViewVC logotype

View of /MGRASTBackend/MGRASTPipeline.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (download) (as text) (annotate)
Thu Apr 30 19:18:03 2009 UTC (10 years, 7 months ago) by jared
Branch: MAIN
Changes since 1.2: +4 -4 lines
*** empty log message ***

package MGRASTBackend::MGRASTPipeline;

use ClusterStage;
use Filesys::DfPortable;
use Mail::Mailer;
use Job48;
use FIG;

use strict;
use warnings;

use base qw( Exporter );
our @EXPORT = qw ( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done);

1;


########################################
#
#  Basic methods
#
#########################################

sub check_free_space {
  my ($job_spool_dir) = @_;
  my $df = dfportable($job_spool_dir, 1024*1024*1024);
  if (!defined($df)){
    die "dfportable call failed on $job_spool_dir: $!";
  }
  if ($df->{bavail} < 10){
    die sprintf "Not enough free space available (%.1f GB) in $job_spool_dir", $df->{bavail};
  }
  return;
}

sub get_jobs {
  my ($job_spool_dir) = @_;
  opendir(D, $job_spool_dir) or  die "Cannot open job directory $job_spool_dir: $!\n";
  my @jobs = sort { $a <=> $b } grep { /^\d+$/ and -d "$job_spool_dir/$_" } readdir(D);
  return \@jobs;
}

sub get_pipeline_for_job {
  my ($job_dir) = @_;
  if(! -f "$job_dir/Pipeline"){
    return &{&pipeline("default")};
  } else {   
    my $pipe = &FIG::file_head("$job_dir/Pipeline", 1);
    chomp $pipe; 
    if (ref(&pipeline($pipe)) eq 'CODE'){
      return &{&pipeline($pipe)};
    } else {
      return undef;
    }
  }
}

########################################
#
#  Pipelines
#
#########################################

sub pipeline {
  my ($name) = @_;  
  my %pipelines = (
		   default => \&default_process,
		   other => \&other_process,
		   alt_export => \&alt_export_process
		  );
      
  if(defined $pipelines{$name}){
    return $pipelines{$name};
  } else {
    return undef;
  }
}

sub default_process {
  return [[uploaded => \&process_upload],
	  [preprocess => ClusterStage->new('mg_preprocess',
					   sge_flag => "-l mg_preprocess",
					  )],
	  [sims => ClusterStage->new('mg_sims',
				     start_locally => 1,
				    )],
	  [check_sims => ClusterStage->new('mg_check_sims',
					   start_locally => 1,
					  )],
	  [create_seed_org => ClusterStage->new('mg_create_seed_org',
						sge_flag => "-l mg_postproc_taxa_sims",
					       )],
	  [export => ClusterStage->new('mg_export',
				       sge_flag => "-l mg_export",
				      )],
	 ];
}

sub alt_export_process {
  return [[uploaded => \&process_upload],
	  [preprocess => ClusterStage->new('mg_preprocess',
					   sge_flag => "-l mg_preprocess",
					  )],
	  [sims => ClusterStage->new('mg_sims',
				     start_locally => 1,
				    )],
	  [check_sims => ClusterStage->new('mg_check_sims',
					   start_locally => 1,
					  )],
	  [create_seed_org => ClusterStage->new('mg_create_seed_org',
						sge_flag => "-l mg_postproc_taxa_sims",
					       )],
	  [export => ClusterStage->new('mg_sim_to_gff_and_gbk',
				        start_locally => 1,
				      )],
	 ];
}

sub other_process {
  return [[uploaded => \&process_upload],
	  [preprocess => ClusterStage->new('mg_preprocess',
					   sge_flag => "-l mg_preprocess",
					  )],
	  [sims => ClusterStage->new('mg_sims',
				     start_locally => 1,
				    )],
	  [check_sims => ClusterStage->new('mg_check_sims',
					   start_locally => 1,
					  )],
	  [create_seed_org => ClusterStage->new('mg_create_seed_org',
						sge_flag => "-l mg_postproc_taxa_sims",
					       )],
	  [export => ClusterStage->new('mg_sim_to_gff_and_gbk',
				        start_locally => 1,
				      )],
	 ];
}

########################################
#
#  Pipeline Stages
#
#########################################

sub process_upload
{
    return;
}

sub mark_job_done
{

  print "Attempting to Email user\n";

    my($job_id, $job_dir, $meta, $req) = @_;

    if (open(D, ">$job_dir/DONE"))
    {
	print D time . "\n";
	close(D);
    }
    else
    {
	warn "Error opening $job_dir/DONE: $!\n";
    }

    my $job = new Job48($job_id);

    my $userobj = $job->getUserObject();

    print "setting meta $meta\n";
    $meta->set_metadata("status.final","complete");
    print "setting meta $meta .. done\n";

    if ($userobj)
    {
	my $email = $userobj->email();
	my $name = join(" " , $userobj->firstname(), $userobj->lastname());

	my $full = $name ? "$name <$email>" : $email;
	print "send email to $full\n";
    
	my $mail = Mail::Mailer->new();
	$mail->open({
	    To => $full,
	    From => 'Metagenome RAST server <mg-rast@mcs.anl.gov>',
	    Subject => "MG-RAST job completed"
	    });

	my $gname = $job->genome_name;
	my $entry = $FIG_Config::fortyeight_home;
	$entry = "http://metagenomics.nmpdr.org/" if $entry eq '';
	print $mail "The annotation job that you submitted for $gname has completed.\n";
	print $mail "It is available for browsing at $entry as job number $job_id.\n";
	$mail->close();
    }
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3