[Bio] / FortyEight / Job48.pm Repository:
ViewVC logotype

View of /FortyEight/Job48.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.30 - (download) (as text) (annotate)
Tue May 27 23:42:10 2008 UTC (11 years, 10 months ago) by redwards
Branch: MAIN
CVS Tags: rast_rel_2008_06_18, rast_rel_2008_06_16
Changes since 1.29: +2 -2 lines
correcting the order that the web app is called to stop an annoying error message

#
# Module to wrap the lookups of 48-hour server job information.
#

package Job48;

use FIG;
use FIGV;
use GenomeMeta;
use DBMaster;
use Fcntl qw(:DEFAULT :flock :seek);
use Mail::Mailer;
use FileHandle;
use File::Basename;
use FileLocking qw(lock_file unlock_file lock_file_shared);

my $have_fsync;
eval {
	require File::Sync;
	$have_fsync++;
};

use DirHandle;
use strict;

use SOAP::Lite;

use FIG_Config;

sub all_jobs
{
    my @jobs;

    my $dh = new DirHandle($FIG_Config::fortyeight_jobs);

    while (defined($_ = $dh->read()))
    {
	next unless /^\d+$/;

	my $job = Job48->new($_);
	if ($job)
	{
	    push(@jobs, $job);
	}
    }
    return sort { $a->id <=> $b->id } @jobs;
}

#
# create new job directory on disk
# data is a hash reference 
#
sub create_new_job {
  my ($class, $data) = @_;
  
  my $jobs_dir = $FIG_Config::fortyeight_jobs;

  if (exists $data->{'taxonomy_id'}) {
    
    my $tid = $data->{'taxonomy_id'} || "666666";
    $tid =~ s/\s+//;
    
    # query clearing house about taxonomy id extension
    my $proxy = SOAP::Lite->uri('http://www.soaplite.com/Scripts')-> proxy($FIG_Config::clearinghouse_url);
    my $r = $proxy->register_genome($tid);
    if ($r->fault) {
      return (undef, "Failed to deposit: " . $r->faultcode . " " . $r->faultstring);
    }

    $data->{'taxonomy_id_ext'} = $tid . "." . $r->result;
  }
  else {
    $data->{'taxonomy_id_ext'} = '';
  }
    
  # init job counter if necessary
  umask 0000;
  unless (-f "$jobs_dir/JOBCOUNTER") {
    open(FH, ">$jobs_dir/JOBCOUNTER") or die "could not create jobcounter file: $!\n";
    print FH "1";
    close FH;
  }

  #
  # get new job id from job counter
  # Carefully lock and fsync().
  #
  open(FH, "+<$jobs_dir/JOBCOUNTER") or die "could not open jobcounter file: $!\n";
  FH->autoflush(1);
  lock_file(\*FH);
  seek(FH, 0, SEEK_SET);
  my $jobnumber = <FH>;
  
  $jobnumber++;
  while (-d $jobs_dir.'/'.$jobnumber) {
    $jobnumber++;
  }

  seek(FH, 0, SEEK_SET);
  FH->truncate(0);
  print FH "$jobnumber\n";

  eval { File::Sync::fsync(\*FH) if $have_fsync; };

  close FH;

  # create job directory
  my $job_dir = $jobs_dir.'/'.$jobnumber;
  mkdir $job_dir;
    
  unless (-d $job_dir) {
    return (undef, 'The job directory could not be created.');
  }
   
  mkdir "$job_dir/raw";
  if ($data->{'taxonomy_id_ext'}) {
    mkdir "$job_dir/raw/" . $data->{'taxonomy_id_ext'};
    $data->{'tax_dir'} = "$job_dir/raw/".$data->{'taxonomy_id_ext'};
  }

  # create metadata files  
  my $meta_id = $data->{'taxonomy_id_ext'} || 'genome_'.$jobnumber;
  $meta_id = 'metagenome_'.$jobnumber if ($data->{'metagenome'});

  my $meta = new GenomeMeta($meta_id, "$job_dir/meta.xml");
  $meta->add_log_entry("genome", "Created $job_dir for uploaded file by " . $data->{'user'});
  
  open(FH, ">" . $job_dir . "/GENOME") or die "could not open GENOME file in $job_dir: $!\n";
  print FH $data->{'genome'}."\n";
  close FH;
  
  open(FH, ">" . $job_dir . "/PROJECT") or die "could not open PROJECT file in $job_dir: $!\n";
  print FH $data->{'project'}."\n";
  close FH;
  
  open(FH, ">" . $job_dir . "/TAXONOMY") or die "could not open TAXONOMY file in $job_dir: $!\n";
  print FH $data->{'taxonomy'}."\n";
  close FH;
  
  if ($data->{'tax_dir'}) {
    system("cp $job_dir/GENOME $job_dir/PROJECT $job_dir/TAXONOMY ".$data->{'tax_dir'});

    open(FH, ">" . $data->{'tax_dir'} . "/GENETIC_CODE") or die "cannot open GENETIC_CODE file in $job_dir: $!\n";
    print FH $data->{'genetic_code'}."\n";
    close(FH);

  }
  
  open(FH, ">" . $job_dir . "/GENOME_ID") or die "cannot open GENOME_ID file in $job_dir: $!\n";
  print FH $data->{'taxonomy_id_ext'}."\n";
  close(FH);
  
  open(FH, ">" . $job_dir . "/USER") or die "cannot open USER file in $job_dir: $!\n";
  print FH $data->{'user'}."\n";
  close(FH);

  $meta->add_log_entry("genome", "Created metadata files.");
  
  # save uploaded file to raw directory
  if ($data->{'sequence_file'}  and !$data->{'metagenome'}) {
    my $upload_file = $data->{'sequence_file'};

    # check whether this is a FASTA or a Genbank file
    my $firstline = <$upload_file>;
    if ($firstline =~ /^\>\S+/) {
      # this is a FASTA file, print it to unformatted_contigs
      open(FH, ">" . $data->{'tax_dir'} . "/unformatted_contigs") 
	or die "could not open unformatted_contigs file in ".$data->{'tax_dir'}."\n";
      $firstline =~ s/(\r\n\n|\r\n|\n|\r)/\n/go;   #...Fix CR, CRLF, and CRLFLF newlines...
      print FH $firstline;
      while (<$upload_file>) {
	s/(\r\n\n|\r\n|\n|\r)/\n/go;   #...Fix CR, CRLF, and CRLFLF newlines...
	print FH;
      }
      close FH;
    }

    elsif ($firstline =~ /^LOCUS/) {
      # this is a Genbank file, call parse_genbank
      open(FH, ">" . $data->{'tax_dir'} . "/genbank_file") 
	or die "could not open genbank_file file in ".$data->{'tax_dir'}."\n";
      print FH $firstline;
      while (<$upload_file>) {
	s/(\r\n\n|\r\n|\n|\r)/\n/go;   #...Fix CR, CRLF, and CRLFLF newlines...
	print FH;
      }
      close FH;
      my $source = $data->{'tax_dir'} . "/genbank_file";
      &FIG::run($FIG_Config::bin."/parse_genbank " . $data->{'taxonomy_id_ext'} . " " . $data->{'tax_dir'} . " < $source");
      system("cp " . $data->{'tax_dir'} . "/contigs " . $data->{'tax_dir'} . "/unformatted_contigs");
      
      #...Fix genome metadata files mangled by `parse_genbank`...
      system("cp -pf  $job_dir/GENOME  $job_dir/PROJECT  $job_dir/TAXONOMY  $data->{tax_dir}/");
    }
    
    else {
      # the file is in incorrect format, throw an error
      return (undef, "The uploaded file has an incorrect format. Visit <a href='http://www.theseed.org/wiki/RAST_upload_formats'>our wiki<a> for more inforamtion about valid formats.");
      $meta->add_log_entry("genome", "Upload failed, invalid format.");

    }
    $meta->add_log_entry("genome", "Successfully uploaded sequence file.");
  }
  
  if ($data->{'metagenome'}) {
    open(FH, ">" . $job_dir . "/METAGENOME") or die "cannot open METAGENOME file in $job_dir: $!\n";
    close(FH);
  }
  
  open(FH, ">" . $job_dir . "/ACTIVE") or die "cannot open ACTIVE file in $job_dir: $!\n";
  close(FH);
  $meta->add_log_entry("genome", "Job set to active.");

  if (defined $data->{'meta'} and ref $data->{'meta'} eq 'HASH') {
    foreach my $key (keys(%{$data->{'meta'}})) {
      $meta->set_metadata($key, $data->{'meta'}->{$key});
    }
  }  
  
  $meta->set_metadata("upload.timestamp", time);
  $meta->set_metadata("status.uploaded", "complete");
  
  return ($jobnumber,'');

}


#
# load existing Job 
# 
sub new
{
    my($class, $job_id, $user) = @_;

    my $dir;
    if ($job_id =~ /^\d+$/)
    {
	$dir = "$FIG_Config::fortyeight_jobs/$job_id";
    }
    else
    {
	$dir = $job_id;
	$job_id = basename($dir);
    }
       

    return if (! -d $dir);

    my $self = {
	id => $job_id,
	dir => $dir,
    };
    $self = bless $self, $class;
    $self->init();

    if (ref $user) {

      my $jobuser = $self->getUserObject();
      die "Could not get user for job ".$self->id.".\n" unless ($jobuser);

      unless ($user->status == 2 or
	      $self->user eq $user->login or
	      $jobuser->organisation->name eq $user->organisation->name) {
	return undef;
      }
    
    }

    return $self;
}

sub init
{
    my($self) = @_;

    my $dir = $self->{dir};

    my $genome = &FIG::file_head("$dir/GENOME_ID", 1);
    chomp $genome;
    $self->{genome_id} = $genome;

    $self->{genome_name} = &FIG::file_head("$dir/GENOME", 1);
    chomp $self->{genome_name};

    $self->{project_name} = &FIG::file_head("$dir/PROJECT", 1);
    chomp $self->{project_name};

    $self->{user} = &FIG::file_head("$dir/USER", 1);
    chomp $self->{user};

    $self->{orgdir} = "$dir/rp/$genome";
    $self->{metagenome} = -f "$dir/METAGENOME" || 0;

    my $metaxml_key = ( $self->{metagenome} ) ? 'metagenome_'.$self->id : $genome;
    $self->{meta} = new GenomeMeta($metaxml_key, "$dir/meta.xml");

    $self->{to_be_deleted} = -f "$dir/DELETE" || 0;
    $self->{active} = -f "$dir/ACTIVE" || 0;
}

sub dir { return $_[0]->{dir}; }
sub id { return $_[0]->{id}; }
sub genome_id { return $_[0]->{genome_id}; }
sub genome_name { return $_[0]->{genome_name}; }
sub project_name { return $_[0]->{project_name}; }
sub meta { return $_[0]->{meta}; }
sub user { return $_[0]->{user}; }
sub active { return $_[0]->{active}; }
sub orgdir { return $_[0]->{orgdir}; }
sub metagenome { return $_[0]->{metagenome}; }
sub to_be_deleted { return $_[0]->{to_be_deleted}; }

#
# changes genome name in all occurences of the GENOME file
#
sub set_genome_name {
  my ( $self , $new_name ) = @_;

  my $dir =  $self->dir;
  my $name_changed = 0;
  my @GENOME = `find $dir -name GENOME`; 
  
  foreach my $gfile ( @GENOME ){
    chomp $gfile;
    my $old_name = $self->genome_name;
    my $replaced = $self->_replace_pattern_in_file( $gfile , $old_name , $new_name);
    if ( $replaced){
      $self->meta->add_log_entry("genome", "Changed name from $old_name to $new_name in $gfile.");
      $name_changed = 1;
    }
  }
  
  $self->{genome_name} = $new_name if ( $name_changed);

  return $self->{genome_name};
}

#
# replaces a pattern in a file 
#
sub _replace_pattern_in_file {
  my ( $self , $file , $old , $new , $tmp) = @_;

  $tmp = $self->dir."/change_pattern.tmp" unless ( $tmp );
  my $nr = 0;


 #  print STDERR "READING FILE: $file!!!!\n";
#   print STDERR "WRITING FILE: $tmp!!!!\n";
#   print STDERR "OLD NAME: $old!\n";
#   print STDERR "NEW NAME: $new!\n";
  open ( TMP , ">$tmp" ) or die "Can't open tempfile $tmp for writing\n";
  open ( FILE , $file ) or die "Can't open $file for reading\n";

  while ( my $line = <FILE> ){
    $nr = $line =~ s/$old/$new/g ;
    print TMP $line;
  }
  close TMP ;
  close FILE ;
  #`cp $tmp $tmp.bak`;
  my $success = rename($tmp, $file) ;
  
  unless( $success ){
    print STDERR "Can't rename $tmp to $file, exit process!\n";
    exit 0;
  }

  # print STDERR "Replace: $success and $nr\n";
  return $nr;
}

#
# return the dbmaster user object for the owner of the job
#
sub getUserObject
{
    my($self) = @_;
    my $user = $self->user();

    if ($FIG_Config::rast_jobs eq '') #  Old rast server
    {
	
	$ENV{DBHOST} = 'bioseed.mcs.anl.gov';
	my $dbm;
	eval {
		$dbm = DBMaster->new(-database => 'FortyEight_WebApplication');
	};
	if ($@)
	{
	    if ($@ =~ /No database name given/)
	    {
		    $dbm = DBMaster->new('FortyEight_WebApplication');
	    }
	    else
	    {
		die $@;
	    }
	}
	
	return undef unless $dbm;
	
	my $l = $dbm->User->get_objects({ login => $user });
	if ($l && @$l)
	{
	    return $l->[0];
	}
    }
    else
    {
	my $dbm = DBMaster->new(-database => $FIG_Config::webapplication_db,
				-backend => $FIG_Config::webapplication_backend,
				-host => $FIG_Config::webapplication_host,
				-user => $FIG_Config::webapplication_user,
			       );
	my $user = $dbm->User->init({ login => $user });
	return $user;
    }
}
	     
sub get_figv
{
    my ($self) = @_;
    return new FIGV($self->orgdir());
}

#
# A job is finished for our purposes when it has
# completed the auto_assign phase.
#
sub finished
{
    my($self) = @_;

    return $self->meta->get_metadata('status.bbhs') eq 'complete';
}

#
# Return a list of contigs. Read the contig file and 
# read the contig name from the header line.
#

sub contigs
{
    my($self) = @_;
    my %contigs;

#    my $tbl = $self->orgdir . "/Features/peg/tbl";


    my $contigfile = $self->orgdir . "/contigs";

    if (open(TBL, "<$contigfile"))
    {
	while (<TBL>)
	{
	    chomp;
	    if (/^>([^\s]+)/)
	    {
		$contigs{$1}++;
	    }
	}
    }
    else
    {
	warn "No $contigfile found\n";
    }
    return sort keys %contigs;
}


#
# Send an email message to the owner of the job.
#
# Do so only if the metadata key passed in has not been set to "yes".
#
sub send_email_to_owner
{
    my($self, $key, $subject, $body) = @_;

    my $meta = $self->meta;

    if ($meta->get_metadata($key) ne "yes")
    {
	my $userobj = $self->getUserObject();

	if ($userobj)
	{
	    my($email, $name);
	    if ($FIG_Config::rast_jobs eq '')
	    {
		$email = $userobj->eMail();
		$name = join(" " , $userobj->firstName(), $userobj->lastName());
	    }
	    else
	    {
		$email = $userobj->email();
		$name = join(" " , $userobj->firstname(), $userobj->lastname());
	    }
		
	    
	    my $full = $name ? "$name <$email>" : $email;
	    warn "send notification email to $full\n";

	    eval {
		my $mail = Mail::Mailer->new();
		$mail->open({
		    To => $full,
		    Cc => 'Annotation Server <rast@mcs.anl.gov>',
		    From => 'Annotation Server <rast@mcs.anl.gov>',
		    Subject => $subject,
		});
		
		print $mail $body;
		$mail->close();
		$meta->set_metadata($key, "yes");
		$meta->set_metadata("${key}_address", $email);
		$meta->set_metadata("${key}_timestamp", time);
	    };

	    if ($@)
	    {
		warn "Error sending mail to $full: $@\n";
		return 0;
	    }
	    else
	    {
		return 1;
	    }
	}
    }
    return 0;
}


#
# get_status_of_job - this method returns the latest job stage and it's status
# requires the job number and a User object reference
# (implemented as class method as by Terry's request)
#
sub get_status_of_job {
  my ($class, $job_id, $user) = @_;

  return ('no job id given', '') unless ($job_id);
  return ('no user given', '') unless (ref $user);
  
  my $job = $class->new($job_id, $user);

  return ('unknown job', '') unless (ref $job);

  my @keys = ( 'status.uploaded', 'status.rp', 'status.qc', 'status.correction',
	        'status.sims', 'status.bbhs', 'status.auto_assign', 
	        'status.pchs', 'status.scenario', 'status.final' );
  if ($job->metagenome) {
    @keys = ( 'status.uploaded', 'status.preprocess',
	       'status.sims', 'status.sims_postprocess',
	       'status.final' );
  }

  foreach my $stage (@keys) {
    my $status = $job->meta->get_metadata($stage) || 'not_started';
    next if ($status eq 'complete');
    return ($stage, $status);
  }

  # if we get here the last stage was complete
  return ($keys[scalar(@keys)-1], 'complete');

}

=head3 compute_job_metrics

 $metrics = $job->compute_job_metrics()

Returns a hash containing information about the job; used for computing aggregate
summary statistics.

=over 4

=item upload_time

Time at which the job was originally uploaded.

=item start_time

Time at which the job began processing.

=item end_normal_time

Time at which the job finished normal processing - exclusive of attribute computation.

=item end_complete_time

Time at which the job finished all processing.

=item successful

True if the job completed successfully.

=item local_user

True if the job was submitted by a "local" user - ANL, UC, FIG. Batch jobs 
are considered ANL.


=cut

sub compute_job_metrics
{
}

1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3