[Bio] / FigKernelPackages / FIG.pm Repository:
ViewVC logotype

Diff of /FigKernelPackages/FIG.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.278, Tue May 10 12:03:39 2005 UTC revision 1.279, Tue May 17 16:04:08 2005 UTC
# Line 303  Line 303 
303    
304  The askfor pool needs to keep track of which sequences need to be  The askfor pool needs to keep track of which sequences need to be
305  calculated, which have been handed out, etc. To simplify this task we  calculated, which have been handed out, etc. To simplify this task we
306  chunk the sequences into fairly small numbers (10-20 sequences) and  chunk the sequences into fairly small numbers (20k characters) and
307  allocate work on a per-chunk basis. We make use of the relational  allocate work on a per-chunk basis. We make use of the relational
308  database to keep track of chunk status as well as the seek locations  database to keep track of chunk status as well as the seek locations
309  into the file of sequence data. The initial creation of the pool  into the file of sequence data. The initial creation of the pool
# Line 317  Line 317 
317  {  {
318      my($self, $chunk_size) = @_;      my($self, $chunk_size) = @_;
319    
320      $chunk_size = 15 unless $chunk_size =~ /^\d+$/;      $chunk_size = 20000 unless $chunk_size =~ /^\d+$/;
321    
322      my $pool_dir = "$FIG_Config::global/sim_pools";      my $pool_dir = "$FIG_Config::fig/var/sim_pools";
323      &verify_dir($pool_dir);      &verify_dir($pool_dir);
324    
325      #      #
# Line 392  Line 392 
392          print &FIG::file_read("$cpool_dir/formatdb.log");          print &FIG::file_read("$cpool_dir/formatdb.log");
393          unlink("$cpool_dir/formatdb.pid");          unlink("$cpool_dir/formatdb.pid");
394       });       });
395      print "Running formatdb in background job $child_pid\n";      warn "Running formatdb in background job $child_pid\n";
396      open(FPID, ">$cpool_dir/formatdb.pid");      open(FPID, ">$cpool_dir/formatdb.pid");
397      print FPID "$child_pid\n";      print FPID "$child_pid\n";
398      close(FPID);      close(FPID);
# Line 416  Line 416 
416    
417      my($chunk_idx, $chunk_begin, $seq_idx);      my($chunk_idx, $chunk_begin, $seq_idx);
418    
419        my $cur_size = 0;
420    
421      $chunk_idx = 0;      $chunk_idx = 0;
422      $chunk_begin = 0;      $chunk_begin = 0;
423      $seq_idx = 0;      $seq_idx = 0;
424    
425      my(@seeks);      my(@seeks);
426    
427        my $tmpfile = "$FIG_Config::temp/simseek.$$";
428        open(my $tmpfh, ">$tmpfile") or confess "Cannot open tmpfile $tmpfile: $!";
429    
430      open(my $q_fh, "<$cpool_dir/q");      open(my $q_fh, "<$cpool_dir/q");
431      while (my $id = <$q_fh>)      while (my $id = <$q_fh>)
432      {      {
# Line 439  Line 444 
444          # Check if we're at the end of a chunk          # Check if we're at the end of a chunk
445          #          #
446    
447          if ((($seq_idx + 1) % $chunk_size) == 0)          $cur_size += length($seq);
448            if ($cur_size >= $chunk_size)
449          {          {
450              my $chunk_end = tell($seq_fh);              my $chunk_end = tell($seq_fh);
451              my $chunk_len = $chunk_end - $chunk_begin;              my $chunk_len = $chunk_end - $chunk_begin;
452    
453              push(@seeks, [$cpool_id, $chunk_idx, $chunk_begin, $chunk_len]);              push(@seeks, [$cpool_id, $chunk_idx, $chunk_begin, $chunk_len]);
454                print $tmpfh join("\t", $cpool_id, $chunk_idx, $chunk_begin, $chunk_len, 'FALSE', 'FALSE'), "\n";
455              $chunk_idx++;              $chunk_idx++;
456              $chunk_begin = $chunk_end;              $chunk_begin = $chunk_end;
457                $cur_size = 0;
458          }          }
459          $seq_idx++;          $seq_idx++;
460      }      }
461    
462      if ((($seq_idx) % $chunk_size) != 0)      if ($cur_size > 0)
463      {      {
464          my $chunk_end = tell($seq_fh);          my $chunk_end = tell($seq_fh);
465          my $chunk_len = $chunk_end - $chunk_begin;          my $chunk_len = $chunk_end - $chunk_begin;
466    
467            print $tmpfh join("\t", $cpool_id, $chunk_idx, $chunk_begin, $chunk_len, 'FALSE', 'FALSE'), "\n";
468          push(@seeks, [$cpool_id, $chunk_idx, $chunk_begin, $chunk_len]);          push(@seeks, [$cpool_id, $chunk_idx, $chunk_begin, $chunk_len]);
   
         $chunk_idx++;  
         $chunk_begin = $chunk_end;  
469      }      }
470    
471      close($q_fh);      close($q_fh);
472      close($seq_fh);      close($seq_fh);
473        close($tmpfh);
474    
475      print "Write seqs\n";      warn "Write seqs from $tmpfile\n";
476    
477      for my $seek (@seeks)      $self->db_handle->load_table(tbl => 'sim_queue',
478      {                                   file => $tmpfile);
         my($cpool_id, $chunk_idx, $chunk_begin, $chunk_len) = @$seek;  
479    
480          $db->SQL("insert into sim_queue (qid, chunk_id, seek, len, assigned, finished) " .      unlink($tmpfile);
481                   "values('$cpool_id', $chunk_idx, $chunk_begin, $chunk_len, FALSE, FALSE)");  
482      }  #     for my $seek (@seeks)
483    #     {
484    #       my($cpool_id, $chunk_idx, $chunk_begin, $chunk_len) = @$seek;
485    
486    #       $db->SQL("insert into sim_queue (qid, chunk_id, seek, len, assigned, finished) " .
487    #                "values('$cpool_id', $chunk_idx, $chunk_begin, $chunk_len, FALSE, FALSE)");
488    #     }
489    
490      return $cpool_id;      return $cpool_id;
491  }  }
# Line 490  Line 502 
502  sub get_sim_queue  sub get_sim_queue
503  {  {
504      my($self, $pool_id, $all_sims) = @_;      my($self, $pool_id, $all_sims) = @_;
505    
506    
507    }
508    
509    
510    
511    =head3 get_sim_work
512    
513    Get the next piece of sim computation work to be performed. Returned are
514    the path to the NR and a string containing the fasta data.
515    
516    =cut
517    
518    sub get_sim_work
519    {
520        my($self) = @_;
521    
522        #
523        # For now, just don't care about order of data that we get back.
524        #
525    
526        my $db = $self->db_handle();
527        my $lock = FIG::SimLock->new;
528    
529        my $work = $db->SQL(qq(SELECT qid, chunk_id, seek, len
530                               FROM sim_queue
531                               WHERE not finished
532                               LIMIT 1));
533        print "Got work ", Dumper($work), "\n";
534    
535        if (not $work or @$work == 0)
536        {
537            return undef;
538        }
539    
540        my($cpool_id, $chunk_id, $seek, $len) = @{$work->[0]};
541    
542        my $pool_dir = "$FIG_Config::fig/var/sim_pools";
543        my $cpool_dir = "$pool_dir/$cpool_id";
544    
545        my $nr = "$cpool_dir/nr";
546        open(my $fh, "<$cpool_dir/fasta.in");
547        seek($fh, $seek, 0);
548        my $fasta;
549        read($fh, $fasta, $len);
550    
551        return($cpool_id, $chunk_id, $nr, $fasta, "$cpool_dir/out.$chunk_id");
552    }
553    
554    =head3 sim_work_done
555    
556    Declare that the work in pool_id/chunk_id has been completed, and output written
557    to the pool directory (get_sim_work gave it the path).
558    
559    =cut
560    
561    sub sim_work_done
562    {
563        my($self, $pool_id, $chunk_id, $out_file) = @_;
564    
565        if (! -f $out_file)
566        {
567            confess "sim_work_done: output file $out_file does not exist";
568        }
569    
570        my $db = $self->db_handle();
571        my $lock = FIG::SimLock->new;
572    
573        my $dbh = $db->{_dbh};
574    
575        my $rows = $dbh->do(qq(UPDATE sim_queue
576                               SET finished = TRUE, output_file = ?
577                               WHERE qid = ? and chunk_id = ?), undef, $out_file, $pool_id, $chunk_id);
578        if ($rows != 1)
579        {
580            if ($dbh->errstr)
581            {
582                confess "Update not able to set finished=TRUE: ", $dbh->errstr;
583            }
584            else
585            {
586                confess "Update not able to set finished=TRUE";
587            }
588        }
589    
590        #
591        # Determine if this was the last piece of work for this pool. If so, we can
592        # schedule the postprocessing work.
593        #
594        # Note we're still holding the lock.
595        #
596    
597        my $out = $db->SQL(qq(SELECT chunk_id
598                              FROM sim_queue
599                              WHERE qid = ? AND not finished), undef, $pool_id);
600        if (@$out == 0)
601        {
602            #
603            # Pool is done.
604            #
605            $self->schedule_sim_pool_postprocessing($pool_id);
606        }
607    }
608    
609    =head3 schedule_sim_pool_postprocessing
610    
611    Schedule a job to do the similarity postprocessing for pool $pool_id.
612    
613    
614    =cut
615    
616    sub schedule_sim_pool_postprocessing
617    {
618        my($self, $pool_id) = @_;
619    
620        my $pool_dir = "$FIG_Config::fig/var/sim_pools";
621        my $cpool_dir = "$pool_dir/$pool_id";
622    
623        my $js = JobScheduler->new();
624        my $job = $js->job_create();
625    
626        my $spath = $job->get_script_path();
627        open(my $sfh, ">$spath");
628        print $sfh <<END;
629        #!/bin/sh
630        . $FIG_Config::fig_disk/config/fig-user-env.sh
631        $FIG_Config::bin/postprocess_computed_sims $pool_id
632    END
633    
634        close($sfh);
635        chmod(0775, $spath);
636    
637        #
638        # Write the job ID to the subsystem queue dir.
639        #
640    
641        open(J, ">$cpool_dir/postprocess_jobid");
642        print J $job->get_id(), "\n";
643        close(J);
644    
645        $job->enqueue();
646    }
647    
648    =head3 postprocess_computed_sims
649    
650    We build a pipe to this pipeline:
651    
652        reduce_sims peg.synonyms 300 | reformat_sims nr | split_sims dest prefix
653    
654    We put the new sims in the pool directory, and then copy to NewSims.
655    
656    =cut
657    
658    sub postprocess_computed_sims
659    {
660        my($self, $pool_id) = @_;
661    
662        #
663        # We don't lock here because the job is already done, and we
664        # shouldn't (ha, ha) ever postprocess twice.
665        #
666    
667        my $pool_dir = "$FIG_Config::fig/var/sim_pools";
668        my $cpool_dir = "$pool_dir/$pool_id";
669    
670        my $sim_dir = "$cpool_dir/NewSims";
671        &verify_dir($sim_dir);
672    
673        #
674        # Open the processing pipeline.
675        #
676    
677        my $reduce = "$FIG_Config::bin/reduce_sims $FIG_Config::global/peg.synonyms 300";
678        my $reformat = "$FIG_Config::bin/reformat_sims $cpool_dir/nr";
679        my $split = "$FIG_Config::bin/split_sims $sim_dir sims.$pool_id";
680        open(my $process, "| $reduce | $reformat | $split");
681    
682        #
683        # Iterate over all the sims files, taken from the database.
684        #
685    
686        my $dbh = $self->db_handle()->{_dbh};
687        my $files = $dbh->selectcol_arrayref(qq(SELECT output_file
688                                                FROM sim_queue
689                                                WHERE qid = ? and output_file IS NOT NULL
690                                                ORDER BY chunk_id), undef, $pool_id);
691        for my $file (@$files)
692        {
693            my $buf;
694            open(my $fh, "<$file") or confess "Cannot sim input file $file: $!";
695            while (read($fh, $buf, 4096))
696            {
697                print $process $buf;
698            }
699            close($fh);
700        }
701        my $res = close($process);
702        if (!$res)
703        {
704            if ($!)
705            {
706                confess "Error closing process pipeline: $!";
707            }
708            else
709            {
710                confess "Process pipeline exited with status $?";
711            }
712        }
713    
714        #
715        # If we got here, it worked.  Copy the new sims files over to NewSims.
716        #
717    
718        opendir(my $simdh, $sim_dir) or confess "Cannot open $sim_dir: $!";
719        my @new_sims = grep { $_ !~ /^\./ } readdir($simdh);
720        closedir($simdh);
721    
722        &verify_dir("$FIG_Config::data/NewSims");
723    
724        for my $sim_file (@new_sims)
725        {
726            my $target = "$FIG_Config::data/NewSims/$sim_file";
727            if (-s $target)
728            {
729                confess "$target already exists";
730            }
731            print "copying sim file $sim_file\n";
732            &FIG::run("cp $sim_dir/$sim_file $target");
733            &FIG::run("$FIG_Config::bin/index_sims $target");
734  }  }
735    }
736    
737    
738  =head3 get_active_sim_pools  =head3 get_active_sim_pools
739    
# Line 5722  Line 5965 
5965  =cut  =cut
5966    
5967  sub search_index {  sub search_index {
5968      my($self,$pattern) = @_;      my($self,$pattern, $non_word_search) = @_;
5969      my($patternQ,@raw,@pegs,@roles);      my($patternQ,@raw,@pegs,@roles);
5970    
5971      &clean_tmp;      &clean_tmp;
# Line 5730  Line 5973 
5973      $patternQ =~ s/\s+/;/g;      $patternQ =~ s/\s+/;/g;
5974      $patternQ =~ s/\./\\./g;      $patternQ =~ s/\./\\./g;
5975    
5976        my $glimpse_args = "-y  -H $FIG_Config::data/Indexes -i";
5977        $glimpse_args .= " -w" unless $non_word_search;
5978        $glimpse_args .= " \'$patternQ\'";
5979    
5980  #   print STDERR "pattern=$pattern patternQ=$patternQ\n";  #   print STDERR "pattern=$pattern patternQ=$patternQ\n";
5981      @raw = `$FIG_Config::ext_bin/glimpse -y -H $FIG_Config::data/Indexes -i -w \'$patternQ\'`;  #    warn "args: $glimpse_args\n";
5982        @raw = `$FIG_Config::ext_bin/glimpse $glimpse_args`;
5983      @pegs  = grep { ! $self->is_deleted_fid($_->[0]) }      @pegs  = grep { ! $self->is_deleted_fid($_->[0]) }
5984               sort { &FIG::by_fig_id($a->[0],$b->[0]) }               sort { &FIG::by_fig_id($a->[0],$b->[0]) }
5985               map { $_ =~ /^\S+:\s+(\S.*\S)/; [split(/\t/,$1)] }               map { $_ =~ /^\S+:\s+(\S.*\S)/; [split(/\t/,$1)] }
# Line 9378  Line 9626 
9626      }      }
9627  }  }
9628    
9629    package FIG::SimLock;
9630    
9631    #
9632    # Little package to implement a lock for sims work.
9633    #
9634    
9635    use strict;
9636    use Fcntl qw/:flock/;  # import LOCK_* constants
9637    
9638    sub new
9639    {
9640        my($class) = @_;
9641    
9642        my $pool_dir = "$FIG_Config::global/sim_pools";
9643        &FIG::verify_dir($pool_dir);
9644    
9645        #
9646        # Lock the pool directory.
9647        #
9648        open(my $lock, ">$pool_dir/lockfile");
9649    
9650        flock($lock, LOCK_EX);
9651    
9652        my $self = {
9653            lock_fh => $lock,
9654        };
9655    
9656        return bless($self, $class);
9657    }
9658    
9659    sub DESTROY
9660    {
9661        my($self) = @_;
9662    
9663        warn "$$ unlocking sims lock\n";
9664        $self->{lock_fh}->close();
9665    }
9666    
9667    package FIG;
9668    
9669  1;  1;

Legend:
Removed from v.1.278  
changed lines
  Added in v.1.279

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3