[Bio] / FigKernelPackages / P2P.pm Repository:
ViewVC logotype

Diff of /FigKernelPackages/P2P.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.3, Wed Sep 22 20:39:01 2004 UTC revision 1.22, Fri Jan 7 19:24:39 2005 UTC
# Line 18  Line 18 
18    
19  use FIG_Config;  use FIG_Config;
20    
21    use DB_File;
22    use Fcntl;
23    
24  use strict;  use strict;
25  use Exporter;  use Exporter;
26  use base qw(Exporter);  use base qw(Exporter);
27    
28    use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
29    
30  use Data::Dumper;  use Data::Dumper;
31    
32  use vars qw(@EXPORT @EXPORT_OK);  use vars qw(@EXPORT @EXPORT_OK);
# Line 31  Line 36 
36  our $ns_p2p = "http://thefig.info/schemas/p2p_update";  our $ns_p2p = "http://thefig.info/schemas/p2p_update";
37  our $ns_relay = "http://thefig.info/schemas/p2p_relay";  our $ns_relay = "http://thefig.info/schemas/p2p_relay";
38    
39    my $peg_batch_size = 1000;
40    my $anno_batch_size = 1000;
41    my $assign_batch_size = 1000;
42    
43    my $log_fh;
44    
45  =pod  =pod
46    
47  =head1 perform_update($peer)  =head1 perform_update($peer)
# Line 47  Line 58 
58    
59  sub perform_update  sub perform_update
60  {  {
61      my($fig, $peer, $last_update) = @_;      my($fig, $peer, $last_update, $skip_tough_search, $update_thru, $log_file) = @_;
62    
63      my $ret = $peer->request_update($last_update);      $log_file = "/dev/null" unless $log_file ne "";
64        open($log_fh, ">>$log_file") or die "Cannot open logfile $log_file: $!\n";
65        $log_fh->autoflush(1);
66    
67        print $log_fh "Beginning P2P update at " . localtime() . "\n";
68        print $log_fh "  Peer URL: $peer->{url}\n";
69        print $log_fh "  Update from: " . localtime($last_update) . "\n";
70        print $log_fh "\n";
71    
72        my $ret = $peer->request_update($last_update, $update_thru);
73    
74      if (!$ret or ref($ret) ne "ARRAY")      if (!$ret or ref($ret) ne "ARRAY")
75      {      {
76          die "perform_update: request_updated failed\n";          die "perform_update: request_update failed\n";
77      }      }
78    
79      my($session, $target_release, $num_annos, $num_pegs, $num_genomes,      my($session, $target_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
80         $target_time, $compatible) = @$ret;         $target_time, $compatible) = @$ret;
81    
82      print "perform_update: session=$session target=$target_release num_annos=$num_annos\n";      print "perform_update: session=$session target=@$target_release num_annos=$num_annos\n";
83      print "                num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";      print "                num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";
84    
85        my @my_release = $fig->get_release_info();
86    
87        print $log_fh "Session id = $session\n";
88        print $log_fh "Target release information: \n\t", join("\n\t", @$target_release), "\n";
89        print $log_fh "My release information: \n\t", join("\n\t", @my_release), "\n";
90        print $log_fh "$num_annos annotations\n";
91        print $log_fh "$num_assignments assignments\n";
92        print $log_fh "$num_pegs pegs\n";
93    
94        #
95        # We now know the data release for our peer.
96        #
97        # Open up the peg translation cache database (a DB_File) tied
98        # to %peg_cache. We needn't worry about keeping it in a directory
99        # based on our current release, as it the cache directory is kept *in*
100        # the current data release directory.
101        #
102    
103        my $cache_handle;
104        my %peg_cache;
105        if ($target_release->[1] ne "")
106        {
107            my $cache_file = "pegcache.$target_release->[1].db";
108            my $cache_dir = "$FIG_Config::data/P2PQueue";
109            $fig->verify_dir($cache_dir);
110    
111            $cache_handle = tie(%peg_cache, "DB_File", "$cache_dir/$cache_file",
112                                O_CREAT | O_RDWR, 0666, $DB_HASH);
113            $cache_handle or warn "Could not tie peg_cache to $cache_dir/$cache_file: $!\n";
114        }
115    
116        #
117        # peg_mapping is the local mapping from remote->local peg. This might
118        # be replacable by peg_cache from above.
119        #
120        my %peg_mapping;
121    
122    
123      #      #
124      # We have  the information now to begin the update process. Retrieve the pegs.      # We have  the information now to begin the update process. Retrieve the pegs.
125      #      #
126    
127      $ret = $peer->get_pegs($session, 0, $num_pegs);      _compute_peg_mapping($fig, $peer, $session, $num_pegs, \%peg_mapping, \%peg_cache, $cache_handle,
128                             $skip_tough_search);
129    
130        $cache_handle->sync();
131        untie %peg_cache;
132    
133        #
134        # Create a list of locally-mapped annotations on a per-genome
135        # basis.
136        #
137    
138        my %genome_annos;
139    
140        #
141        # %genome_assignments is a hash mapping from genome to a hashref
142        # that maps  peg to function (since assignments are unique).
143        #
144        # (Hm. Unless two remote pegs map to the same local peg; unclear what to do
145        # then. Punt for now).
146        #
147        my %genome_assignments;
148    
149        #
150        # Retrieve the annotations, and generate a list of mapped annotations.
151        #
152    
153        for (my $anno_start = 0; $anno_start < $num_annos; $anno_start += $anno_batch_size)
154        {
155            my $anno_req_len = $num_annos - $anno_start;
156            $anno_req_len = $anno_batch_size if $anno_req_len > $anno_batch_size;
157    
158            print "Retrieve $anno_req_len annos at $anno_start\n";
159            print $log_fh "Retrieve $anno_req_len annos at $anno_start\n";
160    
161            my $annos = $peer->get_annotations($session, $anno_start, $anno_req_len);
162    
163            for my $anno (@$annos)
164            {
165                my($his_id, $ts, $author, $anno) = @$anno;
166    
167                my $my_id = $peg_mapping{$his_id};
168                next unless $my_id;
169    
170                my $genome = $fig->genome_of($my_id);
171    
172                push(@{$genome_annos{$genome}}, [$my_id, $ts, $author, $anno]);
173            }
174        }
175    
176        #
177        # Do the same for the assignments
178        #
179    
180        # print Dumper($assignments);
181    
182    
183        for (my $assign_start = 0; $assign_start < $num_assignments; $assign_start += $assign_batch_size)
184        {
185            my $assign_req_len = $num_assignments - $assign_start;
186            $assign_req_len = $assign_batch_size if $assign_req_len > $assign_batch_size;
187    
188            print "Retrieve $assign_req_len assigns at $assign_start\n";
189            print $log_fh "Retrieve $assign_req_len assigns at $assign_start\n";
190    
191            my $assignments = $peer->get_assignments($session, $assign_start, $assign_req_len);
192    
193            for my $assign (@$assignments)
194            {
195                my($his_id, $ts, $author, $func) = @$assign;
196    
197                my $my_id = $peg_mapping{$his_id};
198                next unless $my_id;
199    
200                my $genome = $fig->genome_of($my_id);
201    
202                $genome_assignments{$genome}->{$my_id} =  [$my_id, $ts, $author, $func];
203            }
204        }
205    
206        # print Dumper(\%genome_annos);
207    
208        #
209        # Now install annotations.
210        #
211    
212        for my $genome (keys(%genome_annos))
213        {
214            #
215            # Plan:  Apply the merge_annotations.pl logic. Read the annotations
216            # from the per-org annotations file, add the new ones here, sort, and remove duplicates.
217            # Write the results to the annotations file.
218            #
219            # When we are all done, rerun the index_annotations script.
220            #
221            # Why not do that incrementally? Partly because the annotation_seeks table doesn't
222            # have a column for the genome id, so a removal of old data would require a
223            # string-match query; since a complete reindex of the annotations is pretty
224            # fast (60 sec on a G4 laptop on a firewire disk), it's not clear whether the incremental
225            # update would actually be a win.
226            #
227    
228            my @annos = @{$genome_annos{$genome}};
229            my $assignments = $genome_assignments{$genome};
230            #
231            # %assignment_annos is a hash from peg to the list
232            # of annotations for that peg.
233            #
234            my %assignment_annos;
235    
236            my $dir = "$FIG_Config::organisms/$genome";
237            my $anno_file = "$dir/annotations";
238            my $anno_bak = "$dir/annotations." . time;
239    
240            my $new_count = @annos;
241    
242            #
243            # Rename the annotations file to a new name based on the current time.
244            #
245    
246            if (-f $anno_file)
247            {
248                rename($anno_file, $anno_bak) or die "Cannot rename $anno_file to $anno_bak: $!";
249                print $log_fh "Moved annotations file $anno_file to backup $anno_bak\n";
250            }
251    
252            if (open(my $fh, "<$anno_bak"))
253            {
254                #
255                # While we are scanning here, we look for the latest local assignment
256                # for any peg for which we are installing an assignment.
257                #
258                local($/) = "\n//\n";
259    
260                my($chunk, $peg, $ts, $author, $anno);
261    
262                while (defined($chunk = <$fh>))
263                {
264                    chomp $chunk;
265                    ($peg, $ts, $author, $anno) = split(/\n/, $chunk, 4);
266    
267                    if ($peg =~ /^fig\|/ and $ts =~ /^\d+$/)
268                    {
269                        my $ent = [$peg, $ts, $author, $anno];
270                        push(@annos, $ent);
271    
272                        if (defined($assignments->{$peg}))
273                        {
274                            #
275                            # We have an incoming assignment for this peg.
276                            # Don't parse anything yet, but push the annotation
277                            # on a list so we can sort by date.
278                            #
279                            push(@{$assignment_annos{$peg}}, $ent);
280                        }
281                    }
282                }
283                close($fh);
284            }
285    
286            #
287            # Determine if we are going to install an assignment.
288            #
289    
290            for my $peg (keys %$assignments)
291            {
292                my(undef, $ts, $author, $func) = @{$assignments->{$peg}};
293    
294                #
295                # Sort the existing annotations for this peg by date.
296                #
297                # Recall that this list has entries [$peg, $timestamp, $author, $anno]
298                #
299    
300                my @eannos;
301                if (ref($assignment_annos{$peg}))
302                {
303                    @eannos = sort { $b->[1] <=> $a->[1] } @{$assignment_annos{$peg}};
304                }
305                else
306                {
307                    #
308                    # No assignment annotations found.
309                    #
310                    @eannos = ();
311                }
312    
313                # print "Assignment annos for $peg: ", Dumper(\@eannos);
314    
315                #
316                # Filter out just the master assignments that are newer than
317                # the one we are contemplating putting in place.
318                #
319    
320                my @cand = grep {
321                    ($_->[1] > $ts) and ($_->[3] =~ /Set master function to/)
322                    } @eannos;
323    
324                if (@cand > 0)
325                {
326                    #
327                    # Here is were some policy needs to be put in place --
328                    # we have a more recent annotation on the current system.
329                    #
330                    # For now, we will not install an assignment if there is any
331                    # newer assignment in place.
332                    #
333    
334                    warn "Skipping assignment for $peg $func due to more recent assignment $cand[0]->[3]\n";
335                    print $log_fh "Skipping assignment for $peg $func due to more recent assignment $cand[0]->[3]\n";
336                }
337                else
338                {
339                    #
340                    # Nothing is blocking us. While we are testing, just slam this assignment in.
341                    #
342    
343                    my $old = $fig->function_of($peg, 'master');
344    
345                    if ($old ne $func)
346                    {
347                        print "Assign $peg $func\n";
348                        print $log_fh "Assign $peg $func\n";
349                        print $log_fh "   was $old\n";
350                        $fig->assign_function($peg, 'master', $func);
351                    }
352                }
353            }
354    
355            open(my $outfh, ">$anno_file") or die "Cannot open new annotation file $anno_file: $!\n";
356    
357            my $last;
358            my @sorted = sort { ($a->[0] cmp $b->[0]) or ($a->[1] <=> $b->[1]) } @annos;
359            my $inst = 0;
360            my $dup = 0;
361            foreach my $ann (@sorted)
362            {
363                my $txt = join("\n", @$ann);
364                #
365                # Drop the trailing \n if there is one; we  will add it back when we print and
366                # want to ensure the file format remains sane.
367                #
368                chomp $txt;
369                if ($txt ne $last)
370                {
371                    print $outfh "$txt\n//\n";
372                    $last = $txt;
373                    # print "Inst $ann->[0] $ann->[1] $ann->[2]\n";
374                    $inst++;
375                }
376                else
377                {
378                    # print "Dup $ann->[0] $ann->[1] $ann->[2]\n";
379                    $dup++;
380                }
381            }
382            close($outfh);
383            chmod(0666, $anno_file) or warn "Cannot chmod 0666 $anno_file: $!\n";
384            print "Wrote $anno_file. $new_count new annos, $inst installed, $dup duplicates\n";
385            print $log_fh "Wrote $anno_file. $new_count new annos, $inst installed, $dup duplicates\n";
386        }
387    }
388    
389    #
390    # Compute the peg mapping for a session.
391    #
392    # $fig          Active FIG instance
393    # $peer         P2P peer for this session.
394    # $session      P2P session ID
395    # $peg_mapping  Hash ref for the remote -> local PEG mapping
396    # $peg_cache    Hash ref for the persistent remote -> local PEG mapping cache db.
397    # $cache_handle DB_File handle corresponding to $peg_cache.
398    #
399    sub _compute_peg_mapping
400    {
401        my($fig, $peer, $session, $num_pegs, $peg_mapping, $peg_cache, $cache_handle, $skip_tough_search) = @_;
402    
403        #
404        # genome_map is a hash mapping from target genome id to a list of
405        # pegs on the target. This is used to construct a finalize_pegs request after
406        # the first phase of peg mapping.
407        #
408    
409        my %genome_map;
410    
411        #
412        # target_genome_info is a hash mapping from target genome
413        # identifier to the target-side information on the genome -
414        # number of contigs, number of nucleotides, checksum.
415        #
416        # We accumulate it here across possibly multiple batches of
417        # peg retrievals in order to create a single  finalization
418        # list.
419        #
420    
421        my %target_genome_info;
422    
423        #
424        # For very large transfers, we need to batch the peg processing.
425        #
426    
427        for (my $peg_start = 0; $peg_start < $num_pegs; $peg_start += $peg_batch_size)
428        {
429            my $peg_req_len = $num_pegs - $peg_start;
430            $peg_req_len = $peg_batch_size if $peg_req_len > $peg_batch_size;
431    
432            print "Getting $peg_req_len pegs at $peg_start\n";
433            print $log_fh "Getting $peg_req_len pegs at $peg_start\n";
434            my $ret = $peer->get_pegs($session, $peg_start, $peg_req_len);
435    
436      if (!$ret or ref($ret) ne "ARRAY")      if (!$ret or ref($ret) ne "ARRAY")
437      {      {
# Line 75  Line 440 
440    
441      my($peg_list, $genome_list) = @$ret;      my($peg_list, $genome_list) = @$ret;
442    
443            for my $gent (@$genome_list)
444            {
445                $target_genome_info{$gent->[0]} = $gent;
446            }
447    
448            _compute_peg_mapping_batch($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
449                                       $peg_list, \%genome_map);
450        }
451    
452        #
453        # We have finished first pass. Now go over the per-genome mappings that need to be made.
454        #
455        # $genome_map{$genome_id} is a list of pegs that reside on that genome.
456        # The pegs and genome id are both target-based identifiers.
457      #      #
458      # Walk the peg-list to and generate @pegs_to_finalize.      # %target_genome_info defines the list of genome information we have on the remote
459        # side.
460        #
461        # We build a request to be passed to finalize_pegs. Each entry in the request is either
462        # ['peg_genome', $peg] which means that we have a genome that corresponds to the
463        # genome the peg is in. We can attempt to map via contig locations.
464        #
465        # If that is not the case,  we pass a request entry of ['peg_unknown', $peg]
466        # which will result in the sequence data being returned.
467      #      #
468    
469      my(%peg_mapping, %genome_map );      my @finalize_req = ();
470    
471        #
472        # local_genome maps a target peg identifier to the local genome id it translates to.
473        #
474        my %local_genome;
475    
476        for my $genome (keys(%target_genome_info))
477        {
478            my($tg, $n_contigs, $n_nucs, $cksum) = @{$target_genome_info{$genome}};
479    
480            $tg eq $genome or die "Invalid entry in target_genome_info for $genome => $tg, $n_contigs, $n_nucs, $cksum";
481    
482            #
483            # Don't bother unless we have any pegs to look up.
484            #
485            next unless defined($genome_map{$genome});
486    
487            #
488            # Determine if we have a local genome installed that matches precisely the
489            # genome on the target side.
490            #
491            my $my_genome = $fig->find_genome_by_content($genome, $n_contigs, $n_nucs, $cksum);
492    
493            my $pegs = $genome_map{$genome};
494    
495            if ($my_genome)
496            {
497                #
498                # We do have such a local genome. Generate a peg_genome request to
499                # get the location information from the target side.
500                #
501                # Also remember the local genome mapping for this peg.
502                #
503    
504                print "$genome mapped to $my_genome\n";
505                print $log_fh "$genome mapped to $my_genome\n";
506                for my $peg (@$pegs)
507                {
508                    push(@finalize_req, ['peg_genome', $peg]);
509                    $local_genome{$peg} = $my_genome;
510                }
511    
512            }
513            else
514            {
515                #
516                # We don't have such a genome. We need to retrieve the
517                # sequence data in order to finish mapping.
518                #
519                push(@finalize_req, map { ['peg_unknown', $_] } @$pegs);
520            }
521        }
522    
523        #
524        # We've built our finalization request. Handle it (possibly with batching here too).
525        #
526    
527        _process_finalization_request($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
528                                     \%local_genome, \@finalize_req, $skip_tough_search);
529    
530    }
531    
532    #
533    # Process one batch of PEGs.
534    #
535    # Same args as _compute_peg_mapping, with the addition of:
536    #
537    #       $peg_list       List of pegs to be processed
538    #       $genome_map     Hash maintaining list of genomes with their pegs.
539    #       $target_genome_info     Hash maintaining overall list of target-side genome information.
540    #
541    sub _compute_peg_mapping_batch
542    {
543        my($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
544           $peg_list, $genome_map, $target_genome_info) = @_;
545    
546        #
547        # Walk the list of pegs as returned from get_pegs() and determine what has to
548        # be done.
549        #
550        # If the entry is ['peg', $peg], we can use the peg ID as is.
551        #
552        # If the entry is ['peg_info', $peg, $alias_list, $genome], the peg
553        # has the given aliases, and is in the given genome.
554        #
555      for my $peg_info (@$peg_list)      for my $peg_info (@$peg_list)
556      {      {
557          my($key, $peg, @rest) = @$peg_info;          my($key, $peg, @rest) = @$peg_info;
# Line 90  Line 561 
561              #              #
562              # Peg id is directly usable.              # Peg id is directly usable.
563              #              #
564                $peg_mapping->{$peg} = $peg;
565          }          }
566          elsif ($key eq 'peg_info')          elsif ($key eq 'peg_info')
567          {          {
568              #              #
569              # Peg id not directly usable.              # Peg id not directly usable. See if we have it in the cache.
570                #
571    
572                if ((my $cached = $peg_cache->{$peg}) ne "")
573                {
574                    #
575                    # Cool, we've cached the result. Use it.
576                    #
577    
578                    $peg_mapping->{$peg} = $cached;
579                    # warn "Found cached mapping $peg => $cached\n";
580                    next;
581                }
582    
583                #
584                # It is not cached. Attempt to resolve by means of alias IDs.
585              #              #
586    
587              my($alias_list, $genome_id) = @rest;              my($alias_list, $genome_id) = @rest;
# Line 105  Line 592 
592                  if ($mapped)                  if ($mapped)
593                  {                  {
594                      print "$peg maps to $mapped via $alias\n";                      print "$peg maps to $mapped via $alias\n";
595                      $peg_mapping{$peg}= $mapped;                      print $log_fh "$peg maps to $mapped via $alias\n";
596                        $peg_mapping->{$peg}= $mapped;
597                        $peg_cache->{$peg} = $mapped;
598                      last;                      last;
599                  }                  }
600              }              }
601    
602              #              #
603              # If we didn't succeed in mapping by alias,              # If we weren't able to resolve by ID,
604              # stash this in the list of pegs to be mapped by              # add to %genome_map as a PEG that will need
605              # genome.              # to be resolved by means of contig location.
606              #              #
607    
608              if (!defined($peg_mapping{$peg}))              if (!defined($peg_mapping->{$peg}))
609              {              {
610                  push(@{$genome_map{$genome_id}}, $peg);                  push(@{$genome_map->{$genome_id}}, $peg);
611                    print "$peg did not map on first pass\n";
612                    print $log_fh "$peg did not map on first pass\n";
613              }              }
614          }          }
615      }      }
616    
617      #      #
618      # finished first pass. Now go over the per-genome mappings that need to be made.      # Flush the cache to write out any computed mappings.
619      #      #
620        $cache_handle->sync();
621    
622    }
623    
624      for my $genome_info (@$genome_list)  sub _process_finalization_request
625      {      {
626          my($genome, $n_contigs, $n_nucs, $cksum) = @$genome_info;      my($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
627           $local_genome, $finalize_req, $skip_tough_search) = @_;
628    
629          next unless $genome_map{$genome};      #
630        # Immediately return unless there's something to do.
631        #
632        return unless ref($finalize_req) and @$finalize_req > 0;
633    
634          my $my_genome = $fig->find_genome_by_content($genome, $n_contigs, $n_nucs, $cksum);      my $fin_batch_size = 50;
635    
636          if ($my_genome)      while (@$finalize_req > 0)
637        {
638            my @req = splice(@$finalize_req, 0, $fin_batch_size);
639    
640            print "Invoking finalize_pegs on ", int(@req), " pegs\n";
641            print $log_fh "Invoking finalize_pegs on ", int(@req), " pegs\n";
642            my $ret = $peer->finalize_pegs($session, \@req);
643    
644            if (!$ret or ref($ret) ne "ARRAY")
645            {
646                die "perform_update: finalize_pegs failed\n";
647            }
648    
649            #
650            # The return is a list of either location entries or
651            # sequence data. Attempt to finish up the mapping.
652            #
653    
654            my(%sought, %sought_seq);
655    
656    
657            my $dbh = $fig->db_handle();
658            for my $entry (@$ret)
659            {
660                my($what, $peg, @rest) = @$entry;
661    
662                if ($what eq "peg_loc")
663                {
664                    my($strand, $start, $end, $cksum, $seq) = @rest;
665    
666                    #
667                    # We have a contig location. Try to find a matching contig
668                    # here, and see if it maps to something.
669                    #
670    
671                    my $my_genome = $local_genome->{$peg};
672                    my $local_contig = $fig->find_contig_with_checksum($my_genome, $cksum);
673                    if ($local_contig)
674          {          {
675              #              #
676              # Found a match.                      # Now look up the local peg. We match on the end location; depending on the strand
677                        # the feature is on, we want to look at either minloc or maxloc.
678              #              #
             print "Genome $genome maps to $my_genome locally\n";  
679    
680                        my $whichloc = $strand eq '-' ? "minloc" : "maxloc";
681    
682                        my $res = $dbh->SQL(qq!SELECT id from features
683                                               WHERE $whichloc = $end and genome = '$my_genome' and
684                                               contig = '$local_contig'
685                                            !);
686    
687                        if ($res and @$res > 0)
688                        {
689                            my(@ids) = map { $_->[0] } @$res;
690                            my $id = $ids[0];
691                            $peg_mapping->{$peg} = $id;
692                            $peg_cache->{$peg} = $id;
693                            print "Mapped $peg to $id via contigs\n";
694                            if (@$res > 1)
695                            {
696                                warn "Multiple mappings found for $peg: @ids\n";
697                                print $log_fh "Multiple mappings found for $peg: @ids\n";
698                            }
699          }          }
700          else          else
701          {          {
702              print "No mapping for $genome\n";                          print "failed: $peg  $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";
703                            print $log_fh "failed: $peg  $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";
704                            $sought{$peg}++;
705                            $sought_seq{$peg} = $seq;
706                        }
707          }          }
708                    else
709                    {
710                        print "Mapping failed for $my_genome checksum $cksum\n";
711                        print $log_fh "Mapping failed for $my_genome checksum $cksum\n";
712                        $sought{$peg}++;
713                        $sought_seq{$peg} = $seq;
714      }      }
715  }  }
716                elsif ($what eq "peg_seq")
717                {
718                    my($seq) = @rest;
719    
720                    $sought{$peg}++;
721                    $sought_seq{$peg} = $seq;
722                }
723            }
724    
725            #
726            # Now see if we need to do a tough search.
727            #
728    
729            if (keys(%sought) > 0 and !$skip_tough_search)
730            {
731                my %trans;
732    
733                print "Starting tough search\n";
734                print $log_fh "Starting tough search\n";
735    
736                $fig->tough_search(undef, \%sought_seq, \%trans, \%sought);
737                print "Tough search translated: \n";
738                print $log_fh "Tough search translated: \n";
739                while (my($tpeg, $ttrans) = each(%trans))
740                {
741                    print "  $tpeg -> $ttrans\n";
742                    print $log_fh "  $tpeg -> $ttrans\n";
743                    $peg_mapping->{$tpeg} = $ttrans;
744                    $peg_cache->{$tpeg} = $ttrans;
745                }
746            }
747        }
748    }
749    
750  #############  #############
751  #  #
# Line 243  Line 839 
839          # element in the body of the message.          # element in the body of the message.
840          #          #
841          my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');          my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');
842          print "Reply ns=$ns want $P2P::ns_relay\n";          # print "Reply ns=$ns want $P2P::ns_relay\n";
843    
844          if ($ns eq $P2P::ns_relay)          if ($ns eq $P2P::ns_relay)
845          {          {
846              my $val = $reply->result;              my $val = $reply->result;
847              print "got val=", Dumper($val);              # print "got val=", Dumper($val);
848              if ($val->[0] eq 'deferred')              if ($val->[0] eq 'deferred')
849              {              {
850                  #                  #
# Line 290  Line 886 
886  use strict;  use strict;
887    
888  use Data::Dumper;  use Data::Dumper;
889    use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
890    
891  use SOAP::Lite;  use SOAP::Lite;
892    
893    #use SOAP::Lite +trace => [qw(transport dispatch result debug)];
894  use P2P;  use P2P;
895    
896  #  #
# Line 303  Line 902 
902  {  {
903      my($class, $fig, $url, $peer_id, $relay) = @_;      my($class, $fig, $url, $peer_id, $relay) = @_;
904    
905      my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url);      my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url, timeout => 3600);
906    
907      my $self = {      my $self = {
908          fig => $fig,          fig => $fig,
# Line 329  Line 928 
928    
929  sub request_update  sub request_update
930  {  {
931      my($self, $last_update) = @_;      my($self, $last_update, $update_thru) = @_;
932    
933      my $rel = $self->{fig}->get_release_info();      my $rel = [$self->{fig}->get_release_info()];
934    
935      if (!defined($last_update))      if (!defined($last_update))
936      {      {
937          $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});          $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});
938      }      }
939    
940      my $reply = $self->{proxy}->request_update($rel, $last_update);      print "Requesting update via $self->{proxy}\n";
941        my $reply = $self->{proxy}->request_update($rel, $last_update, $update_thru);
942        # print "Got reply ", Dumper($reply);
943    
944      if ($self->{relay})      if ($self->{relay})
945      {      {
# Line 368  Line 969 
969      return $self->call("get_pegs", $session_id, $start, $length);      return $self->call("get_pegs", $session_id, $start, $length);
970  }  }
971    
972    sub finalize_pegs
973    {
974        my($self, $session_id, $request) = @_;
975    
976        return $self->call("finalize_pegs", $session_id, $request);
977    }
978    
979    sub get_annotations
980    {
981        my($self, $session_id, $start, $length) = @_;
982    
983        return $self->call("get_annotations", $session_id, $start, $length);
984    }
985    
986    sub get_assignments
987    {
988        my($self, $session_id, $start, $length) = @_;
989    
990        return $self->call("get_assignments", $session_id, $start, $length);
991    }
992    
993  sub call  sub call
994  {  {
995      my($self, $func, @args) = @_;      my($self, $func, @args) = @_;
996    
997        my $t0 = [gettimeofday()];
998        print "Calling $func\n";
999      my $reply = $self->{proxy}->$func(@args);      my $reply = $self->{proxy}->$func(@args);
1000        my $t1 = [gettimeofday()];
1001    
1002        my $elap = tv_interval($t0, $t1);
1003        print "Call to $func took $elap\n";
1004    
1005      if ($self->{relay})      if ($self->{relay})
1006      {      {
# Line 410  Line 1038 
1038    
1039  sub request_update  sub request_update
1040  {  {
1041      my($class, $his_release, $last_update)= @_;      my($class, $his_release, $last_update, $update_thru)= @_;
1042    
1043      #      #
1044      # Verify input.      # Verify input.
# Line 421  Line 1049 
1049          die "request_update: last_update must be a number (not '$last_update')\n";          die "request_update: last_update must be a number (not '$last_update')\n";
1050      }      }
1051    
1052        if ($update_thru eq "")
1053        {
1054            $update_thru = time + 10000;
1055        }
1056    
1057      #      #
1058      # Create a new session id and a spool directory to use for storage      # Create a new session id and a spool directory to use for storage
1059      # of information about it. This can go in the tempdir since it is      # of information about it. This can go in the tempdir since it is
# Line 428  Line 1061 
1061      #      #
1062    
1063      &FIG::verify_dir("$FIG_Config::temp/p2p_spool");      &FIG::verify_dir("$FIG_Config::temp/p2p_spool");
1064      #my $spool_dir = tempdir(DIR  => "$FIG_Config::temp/p2p_spool");      my $spool_dir = tempdir(DIR  => "$FIG_Config::temp/p2p_spool");
1065    
1066      my $spool_dir = "$FIG_Config::temp/p2p_spool/test";      #my $spool_dir = "$FIG_Config::temp/p2p_spool/test";
1067      &FIG::verify_dir($spool_dir);      &FIG::verify_dir($spool_dir);
1068    
1069      my $session_id = basename($spool_dir);      my $session_id = basename($spool_dir);
# Line 448  Line 1081 
1081    
1082      my %pegs;      my %pegs;
1083    
1084        #
1085        # We keep track of usernames that have been seen, so that
1086        # we can both update our local user database and
1087        # we can report them to our peer.
1088        #
1089    
1090        my %users;
1091    
1092      my $num_annos = 0;      my $num_annos = 0;
1093      my $num_genomes = 0;      my $num_genomes = 0;
1094      my $num_pegs = 0;      my $num_pegs = 0;
1095        my $num_assignments = 0;
1096    
1097      my $anno_fh;      my $anno_fh;
1098      open($anno_fh, ">$spool_dir/annos");      open($anno_fh, ">$spool_dir/annos");
# Line 461  Line 1103 
1103      my $genome_fh;      my $genome_fh;
1104      open($genome_fh, ">$spool_dir/genomes");      open($genome_fh, ">$spool_dir/genomes");
1105    
1106        my $assign_fh;
1107        open($assign_fh, ">$spool_dir/assignments");
1108    
1109      for my $genome (@$all_genomes)      for my $genome (@$all_genomes)
1110      {      {
1111          my $num_annos_for_genome = 0;          my $num_annos_for_genome = 0;
1112            my %assignment;
1113    
1114          my $genome_dir = "$FIG_Config::organisms/$genome";          my $genome_dir = "$FIG_Config::organisms/$genome";
1115          next unless -d $genome_dir;          next unless -d $genome_dir;
# Line 480  Line 1126 
1126    
1127                  if ((($fid, $anno_time, $who, $anno_text) =                  if ((($fid, $anno_time, $who, $anno_text) =
1128                       ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and                       ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and
1129                      $anno_time > $last_update)                      $anno_time > $last_update and
1130                        $anno_time < $update_thru)
1131    
1132                  {                  {
1133                      #                      #
1134                        # Update users list.
1135                        #
1136    
1137                        $users{$who}++;
1138    
1139                        #
1140                      # Look up aliases if we haven't seen this fid before.                      # Look up aliases if we haven't seen this fid before.
1141                      #                      #
1142    
# Line 501  Line 1154 
1154    
1155                      $num_annos_for_genome++;                      $num_annos_for_genome++;
1156                      $num_annos++;                      $num_annos++;
1157    
1158                        #
1159                        # While we're here, see if this is an assignment. We check in the
1160                        # %assignment hash, which is keyed on fid, to see if we already
1161                        # saw an assignment for this fid. If we have, we keep this one only if
1162                        # the assignment time on it is later than the one we saw already.
1163                        #
1164                        # We are only looking at master assignments for now. We will need
1165                        # to return to this issue and reexamine it, but in order to move
1166                        # forward I am only matching master assignments.
1167                        #
1168    
1169                        if ($anno_text =~ /Set master function to\n(\S[^\n]+\S)/)
1170                        {
1171                            my $func = $1;
1172    
1173                            my $other = $assignment{$fid};
1174    
1175                            #
1176                            # If we haven't seen an assignment for this fid,
1177                            # or if it the other assignment has a timestamp that
1178                            # is earlier than this one, set the assignment.
1179                            #
1180    
1181                            if (!defined($other) or
1182                                ($other->[1] < $anno_time))
1183                            {
1184                                $assignment{$fid} = [$fid, $anno_time, $who, $func];
1185                            }
1186                        }
1187                  }                  }
1188              }              }
1189              close($afh);              close($afh);
1190    
1191                #
1192                # Write out the assignments that remain.
1193                #
1194    
1195                for my $fid (sort keys(%assignment))
1196                {
1197                    print $assign_fh join("\t", @{$assignment{$fid}}), "\n";
1198                    $num_assignments++;
1199                }
1200          }          }
1201    
1202    
1203          #          #
1204          # Determine genome information if we have annotations for this one.          # Determine genome information if we have annotations for this one.
1205          #          #
# Line 537  Line 1231 
1231      close($anno_fh);      close($anno_fh);
1232      close($peg_fh);      close($peg_fh);
1233      close($genome_fh);      close($genome_fh);
1234        close($assign_fh);
1235    
1236      print "Pegs: $num_pegs\n";      print "Pegs: $num_pegs\n";
1237      print "Genomes: $num_genomes\n";      print "Genomes: $num_genomes\n";
# Line 546  Line 1241 
1241      # Check compatibility.      # Check compatibility.
1242      #      #
1243    
1244      my $my_release = $fig->get_release_info();      my $my_release = [$fig->get_release_info()];
1245      my $compatible = (defined($my_release) && ($my_release == $his_release)) ? 1 : 0;  
1246        #
1247        # Release id is $my_release->[1].
1248        #
1249    
1250        my $compatible;
1251        if ($my_release->[1] ne "" and $his_release->[1] ne "")
1252        {
1253            #
1254            # Both releases must be defined for them to be compatible.
1255            #
1256            # At some point we need to consider the derived-release issue.
1257            #
1258    
1259            $compatible = $my_release->[1] eq $his_release->[1];
1260        }
1261        else
1262        {
1263            $compatible = 0;
1264        }
1265    
1266      open(my $fh, ">$spool_dir/INFO");      open(my $fh, ">$spool_dir/INFO");
1267      print $fh "requestor_release\t$his_release\n";      print $fh "requestor_release\t$his_release\n";
1268      print $fh "last_update\t$last_update\n";      print $fh "last_update\t$last_update\n";
1269        print $fh "update_thru\t$update_thru\n";
1270      print $fh "cur_update\t$now\n";      print $fh "cur_update\t$now\n";
1271      print $fh "target_release\t$my_release\n";      print $fh "target_release\t$my_release\n";
1272      print $fh "compatible\t$compatible\n";      print $fh "compatible\t$compatible\n";
1273      print $fh "num_pegs\t$num_pegs\n";      print $fh "num_pegs\t$num_pegs\n";
1274      print $fh "num_genomes\t$num_genomes\n";      print $fh "num_genomes\t$num_genomes\n";
1275      print $fh "num_annos\t$num_annos\n";      print $fh "num_annos\t$num_annos\n";
1276        print $fh "num_assignments\t$num_assignments\n";
1277      close($fh);      close($fh);
1278    
1279      return [$session_id, $my_release, $num_annos, $num_pegs, $num_genomes, $now, $compatible];      #
1280        # Construct list of users, and pdate local user database.
1281        #
1282    
1283        my @users = keys(%users);
1284        # $fig->ensure_users(\@users);
1285    
1286        return [$session_id, $my_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
1287                $now, $compatible, \@users];
1288  }  }
1289    
1290    
# Line 673  Line 1397 
1397    
1398      return [$peg_output, $genome_output];      return [$peg_output, $genome_output];
1399  }  }
1400    
1401    sub finalize_pegs
1402    {
1403        my($self, $session, $request) = @_;
1404        my($out);
1405    
1406        my $fig = new FIG;
1407    
1408        #
1409        # Walk the request handling appropriately. This is fairly easy, as it
1410        # is just a matter of pulling either sequence or location/contig data.
1411        #
1412    
1413        for my $item (@$request)
1414        {
1415            my($what, $peg) = @$item;
1416    
1417            if ($what eq "peg_genome")
1418            {
1419                #
1420                # Return the location and contig checksum for this peg.
1421                #
1422                # We also include the sequence in case the contig mapping doesn't work.
1423                #
1424    
1425                my $loc = $fig->feature_location($peg);
1426                my $contig = $fig->contig_of($loc);
1427                my $cksum = $fig->contig_checksum($fig->genome_of($peg), $contig);
1428                my $seq = $fig->get_translation($peg);
1429    
1430                push(@$out, ['peg_loc', $peg,
1431                            $fig->strand_of($peg),
1432                            $fig->beg_of($loc), $fig->end_of($loc),
1433                            $cksum, $seq]);
1434    
1435            }
1436            elsif ($what eq "peg_unknown")
1437            {
1438                my $seq = $fig->get_translation($peg);
1439                push(@$out, ['peg_seq', $peg, $seq]);
1440            }
1441        }
1442        return $out;
1443    }
1444    
1445    
1446    sub get_annotations
1447    {
1448        my($self, $session_id, $start, $len) = @_;
1449    
1450        #
1451        # This is now easy; just run thru the saved annotations and return.
1452        #
1453    
1454        my(%session_info);
1455    
1456        my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1457    
1458        -d $spool_dir or die "Invalid session id $session_id";
1459    
1460        #
1461        # Read in the cached information for this session.
1462        #
1463    
1464        open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1465        while (<$info_fh>)
1466        {
1467            chomp;
1468            my($var, $val) = split(/\t/, $_, 2);
1469            $session_info{$var} = $val;
1470        }
1471        close($info_fh);
1472    
1473        #
1474        # Sanity check start and length.
1475        #
1476    
1477        if ($start < 0 or $start >= $session_info{num_annos})
1478        {
1479            die "Invalid start position $start";
1480        }
1481    
1482        if ($len < 0 or ($start + $len - 1) >= $session_info{num_annos})
1483        {
1484            die "Invalid length $len";
1485        }
1486    
1487        #
1488        # Open file, spin to the starting line, then start reading.
1489        #
1490    
1491        open(my $anno_fh, "<$spool_dir/annos") or die "Cannot open annos file: $!";
1492    
1493        my $anno_output = [];
1494    
1495        my $anno_num = 0;
1496    
1497        local $/ = "//\n";
1498        while (<$anno_fh>)
1499        {
1500            next if ($anno_num < $start);
1501    
1502            last if ($anno_num > ($start + $len));
1503    
1504            chomp;
1505    
1506            my($id, $date, $author, $anno) = split(/\n/, $_, 4);
1507    
1508            push(@$anno_output, [$id, $date, $author, $anno]);
1509        }
1510        continue
1511        {
1512            $anno_num++;
1513        }
1514    
1515        return $anno_output;
1516    }
1517    
1518    sub get_assignments
1519    {
1520        my($self, $session_id, $start, $len) = @_;
1521    
1522        #
1523        # This is now easy; just run thru the saved assignments and return.
1524        #
1525    
1526        my(%session_info);
1527    
1528        my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1529    
1530        -d $spool_dir or die "Invalid session id $session_id";
1531    
1532        #
1533        # Read in the cached information for this session.
1534        #
1535    
1536        open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1537        while (<$info_fh>)
1538        {
1539            chomp;
1540            my($var, $val) = split(/\t/, $_, 2);
1541            $session_info{$var} = $val;
1542        }
1543        close($info_fh);
1544    
1545        #
1546        # Sanity check start and length.
1547        #
1548    
1549        if ($start < 0 or $start >= $session_info{num_assignments})
1550        {
1551            die "Invalid start position $start";
1552        }
1553    
1554        if ($len < 0 or ($start + $len - 1) >= $session_info{num_assignments})
1555        {
1556            die "Invalid length $len";
1557        }
1558    
1559        #
1560        # Open file, spin to the starting line, then start reading.
1561        #
1562    
1563        open(my $assign_fh, "<$spool_dir/assignments") or die "Cannot open assignments file: $!";
1564    
1565        my $assign_output = [];
1566    
1567        my $assign_num = 0;
1568    
1569        while (<$assign_fh>)
1570        {
1571            next if ($assign_num < $start);
1572    
1573            last if ($assign_num > ($start + $len));
1574    
1575            chomp;
1576    
1577            my($id, $date, $author, $func) = split(/\t/, $_, 4);
1578    
1579            push(@$assign_output, [$id, $date, $author, $func]);
1580        }
1581        continue
1582        {
1583            $assign_num++;
1584        }
1585    
1586        return $assign_output;
1587    }
1588    
1589    1;

Legend:
Removed from v.1.3  
changed lines
  Added in v.1.22

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3