[Bio] / FigKernelPackages / P2P.pm Repository:
ViewVC logotype

Diff of /FigKernelPackages/P2P.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.12, Fri Sep 24 19:52:25 2004 UTC revision 1.21, Fri Jan 7 18:54:45 2005 UTC
# Line 18  Line 18 
18    
19  use FIG_Config;  use FIG_Config;
20    
21    use DB_File;
22    use Fcntl;
23    
24  use strict;  use strict;
25  use Exporter;  use Exporter;
26  use base qw(Exporter);  use base qw(Exporter);
27    
28    use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
29    
30  use Data::Dumper;  use Data::Dumper;
31    
32  use vars qw(@EXPORT @EXPORT_OK);  use vars qw(@EXPORT @EXPORT_OK);
# Line 31  Line 36 
36  our $ns_p2p = "http://thefig.info/schemas/p2p_update";  our $ns_p2p = "http://thefig.info/schemas/p2p_update";
37  our $ns_relay = "http://thefig.info/schemas/p2p_relay";  our $ns_relay = "http://thefig.info/schemas/p2p_relay";
38    
39    my $peg_batch_size = 1000;
40    my $anno_batch_size = 1000;
41    my $assign_batch_size = 1000;
42    
43  =pod  =pod
44    
45  =head1 perform_update($peer)  =head1 perform_update($peer)
# Line 47  Line 56 
56    
57  sub perform_update  sub perform_update
58  {  {
59      my($fig, $peer, $last_update) = @_;      my($fig, $peer, $last_update, $skip_tough_search, $update_thru) = @_;
60    
61      my $ret = $peer->request_update($last_update);      my $ret = $peer->request_update($last_update, $update_thru);
62    
63      if (!$ret or ref($ret) ne "ARRAY")      if (!$ret or ref($ret) ne "ARRAY")
64      {      {
65          die "perform_update: request_updated failed\n";          die "perform_update: request_update failed\n";
66      }      }
67    
68      my($session, $target_release, $num_annos, $num_pegs, $num_genomes,      my($session, $target_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
69         $target_time, $compatible) = @$ret;         $target_time, $compatible) = @$ret;
70    
71      print "perform_update: session=$session target=$target_release num_annos=$num_annos\n";      print "perform_update: session=$session target=@$target_release num_annos=$num_annos\n";
72      print "                num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";      print "                num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";
73    
74      #      #
75        # We now know the data release for our peer.
76        #
77        # Open up the peg translation cache database (a DB_File) tied
78        # to %peg_cache. We needn't worry about keeping it in a directory
79        # based on our current release, as it the cache directory is kept *in*
80        # the current data release directory.
81        #
82    
83        my $cache_handle;
84        my %peg_cache;
85        if ($target_release->[1] ne "")
86        {
87            my $cache_file = "pegcache.$target_release->[1].db";
88            my $cache_dir = "$FIG_Config::data/P2PQueue";
89            $fig->verify_dir($cache_dir);
90    
91            $cache_handle = tie(%peg_cache, "DB_File", "$cache_dir/$cache_file",
92                                O_CREAT | O_RDWR, 0666, $DB_HASH);
93            $cache_handle or warn "Could not tie peg_cache to $cache_dir/$cache_file: $!\n";
94        }
95    
96        #
97        # peg_mapping is the local mapping from remote->local peg. This might
98        # be replacable by peg_cache from above.
99        #
100        my %peg_mapping;
101    
102    
103        #
104      # We have  the information now to begin the update process. Retrieve the pegs.      # We have  the information now to begin the update process. Retrieve the pegs.
105      #      #
106    
107      $ret = $peer->get_pegs($session, 0, $num_pegs);      _compute_peg_mapping($fig, $peer, $session, $num_pegs, \%peg_mapping, \%peg_cache, $cache_handle,
108                             $skip_tough_search);
109    
110      if (!$ret or ref($ret) ne "ARRAY")      $cache_handle->sync();
111        untie %peg_cache;
112    
113        #
114        # Create a list of locally-mapped annotations on a per-genome
115        # basis.
116        #
117    
118        my %genome_annos;
119    
120        #
121        # %genome_assignments is a hash mapping from genome to a hashref
122        # that maps  peg to function (since assignments are unique).
123        #
124        # (Hm. Unless two remote pegs map to the same local peg; unclear what to do
125        # then. Punt for now).
126        #
127        my %genome_assignments;
128    
129    
130    
131        #
132        # Retrieve the annotations, and generate a list of mapped annotations.
133        #
134    
135        for (my $anno_start = 0; $anno_start < $num_annos; $anno_start += $anno_batch_size)
136      {      {
137          die "perform_update: get_pegs failed\n";          my $anno_req_len = $num_annos - $anno_start;
138            $anno_req_len = $anno_batch_size if $anno_req_len > $anno_batch_size;
139    
140            print "Retrieve $anno_req_len annos at $anno_start\n";
141    
142            my $annos = $peer->get_annotations($session, $anno_start, $anno_req_len);
143    
144            for my $anno (@$annos)
145            {
146                my($his_id, $ts, $author, $anno) = @$anno;
147    
148                my $my_id = $peg_mapping{$his_id};
149                next unless $my_id;
150    
151                my $genome = $fig->genome_of($my_id);
152    
153                push(@{$genome_annos{$genome}}, [$my_id, $ts, $author, $anno]);
154            }
155      }      }
156    
157      my($peg_list, $genome_list) = @$ret;      #
158        # Do the same for the assignments
159        #
160    
161        # print Dumper($assignments);
162    
163    
164        for (my $assign_start = 0; $assign_start < $num_assignments; $assign_start += $assign_batch_size)
165        {
166            my $assign_req_len = $num_assignments - $assign_start;
167            $assign_req_len = $assign_batch_size if $assign_req_len > $assign_batch_size;
168    
169            print "Retrieve $assign_req_len assigns at $assign_start\n";
170    
171            my $assignments = $peer->get_assignments($session, $assign_start, $assign_req_len);
172    
173            for my $assign (@$assignments)
174            {
175                my($his_id, $ts, $author, $func) = @$assign;
176    
177                my $my_id = $peg_mapping{$his_id};
178                next unless $my_id;
179    
180                my $genome = $fig->genome_of($my_id);
181    
182                $genome_assignments{$genome}->{$my_id} =  [$my_id, $ts, $author, $func];
183            }
184        }
185    
186        # print Dumper(\%genome_annos);
187    
188      #      #
189      # Walk the peg-list to and generate @pegs_to_finalize.      # Now install annotations.
190      #      #
191    
192      my(%peg_mapping, %genome_map );      open(my $old_assignments, ">old_assignments");
193    
194      for my $peg_info (@$peg_list)      for my $genome (keys(%genome_annos))
195      {      {
196          my($key, $peg, @rest) = @$peg_info;          #
197            # Plan:  Apply the merge_annotations.pl logic. Read the annotations
198            # from the per-org annotations file, add the new ones here, sort, and remove duplicates.
199            # Write the results to the annotations file.
200            #
201            # When we are all done, rerun the index_annotations script.
202            #
203            # Why not do that incrementally? Partly because the annotation_seeks table doesn't
204            # have a column for the genome id, so a removal of old data would require a
205            # string-match query; since a complete reindex of the annotations is pretty
206            # fast (60 sec on a G4 laptop on a firewire disk), it's not clear whether the incremental
207            # update would actually be a win.
208            #
209    
210          if ($key eq 'peg')          my @annos = @{$genome_annos{$genome}};
211            my $assignments = $genome_assignments{$genome};
212            #
213            # %assignment_annos is a hash from peg to the list
214            # of annotations for that peg.
215            #
216            my %assignment_annos;
217    
218            my $dir = "$FIG_Config::organisms/$genome";
219            my $anno_file = "$dir/annotations";
220            my $anno_bak = "$dir/annotations." . time;
221    
222            my $new_count = @annos;
223    
224            #
225            # Rename the annotations file to a new name based on the current time.
226            #
227    
228            if (-f $anno_file)
229            {
230                rename($anno_file, $anno_bak) or die "Cannot rename $anno_file to $anno_bak: $!";
231            }
232    
233            if (open(my $fh, "<$anno_bak"))
234          {          {
235              #              #
236              # Peg id is directly usable.              # While we are scanning here, we look for the latest local assignment
237                # for any peg for which we are installing an assignment.
238                #
239                local($/) = "\n//\n";
240    
241                my($chunk, $peg, $ts, $author, $anno);
242    
243                while (defined($chunk = <$fh>))
244                {
245                    chomp $chunk;
246                    ($peg, $ts, $author, $anno) = split(/\n/, $chunk, 4);
247    
248                    if ($peg =~ /^fig\|/ and $ts =~ /^\d+$/)
249                    {
250                        my $ent = [$peg, $ts, $author, $anno];
251                        push(@annos, $ent);
252    
253                        if (defined($assignments->{$peg}))
254                        {
255              #              #
256              $peg_mapping{$peg} = $peg;                          # We have an incoming assignment for this peg.
257                            # Don't parse anything yet, but push the annotation
258                            # on a list so we can sort by date.
259                            #
260                            push(@{$assignment_annos{$peg}}, $ent);
261          }          }
262          elsif ($key eq 'peg_info')                  }
263                }
264                close($fh);
265            }
266    
267            #
268            # Determine if we are going to install an assignment.
269            #
270    
271            for my $peg (keys %$assignments)
272          {          {
273                my(undef, $ts, $author, $func) = @{$assignments->{$peg}};
274    
275              #              #
276              # Peg id not directly usable.              # Sort the existing annotations for this peg by date.
277                #
278                # Recall that this list has entries [$peg, $timestamp, $author, $anno]
279              #              #
280    
281              my($alias_list, $genome_id) = @rest;              my @eannos;
282                if (ref($assignment_annos{$peg}))
283                {
284                    @eannos = sort { $b->[1] <=> $a->[1] } @{$assignment_annos{$peg}};
285                }
286                else
287                {
288                    #
289                    # No assignment annotations found.
290                    #
291                    @eannos = ();
292                }
293    
294              for my $alias (@$alias_list)              # print "Assignment annos for $peg: ", Dumper(\@eannos);
295    
296                #
297                # Filter out just the master assignments that are newer than
298                # the one we are contemplating putting in place.
299                #
300    
301                my @cand = grep {
302                    ($_->[1] > $ts) and ($_->[3] =~ /Set master function to/)
303                    } @eannos;
304    
305                if (@cand > 0)
306              {              {
307                  my $mapped = $fig->by_alias($alias);                  #
308                  if ($mapped)                  # Here is were some policy needs to be put in place --
309                    # we have a more recent annotation on the current system.
310                    #
311                    # For now, we will not install an assignment if there is any
312                    # newer assignment in place.
313                    #
314    
315                    warn "Skipping assignment for $peg $func due to more recent assignment $cand[0]->[3]\n";
316                }
317                else
318                  {                  {
319                      print "$peg maps to $mapped via $alias\n";                  #
320                      $peg_mapping{$peg}= $mapped;                  # Nothing is blocking us. While we are testing, just slam this assignment in.
321                      last;                  #
322    
323                    my $old = $fig->function_of($peg, 'master');
324                    print $old_assignments "$peg\t$old\n";
325    
326                    if ($old ne $func)
327                    {
328                        print "Assign $peg $func\n";
329                        $fig->assign_function($peg, 'master', $func);
330                  }                  }
331              }              }
332            }
333    
334            open(my $outfh, ">$anno_file") or die "Cannot open new annotation file $anno_file: $!\n";
335    
336            my $last;
337            my @sorted = sort { ($a->[0] cmp $b->[0]) or ($a->[1] <=> $b->[1]) } @annos;
338            my $inst = 0;
339            my $dup = 0;
340            foreach my $ann (@sorted)
341            {
342                my $txt = join("\n", @$ann);
343                #
344                # Drop the trailing \n if there is one; we  will add it back when we print and
345                # want to ensure the file format remains sane.
346              #              #
347              # If we didn't succeed in mapping by alias,              chomp $txt;
348              # stash this in the list of pegs to be mapped by              if ($txt ne $last)
349              # genome.              {
350                    print $outfh "$txt\n//\n";
351                    $last = $txt;
352                    # print "Inst $ann->[0] $ann->[1] $ann->[2]\n";
353                    $inst++;
354                }
355                else
356                {
357                    # print "Dup $ann->[0] $ann->[1] $ann->[2]\n";
358                    $dup++;
359                }
360            }
361            close($outfh);
362            chmod(0666, $anno_file) or warn "Cannot chmod 0666 $anno_file: $!\n";
363            print "Wrote $anno_file. $new_count new annos, $inst installed, $dup duplicates\n";
364    
365              #              #
366            # _install_genome_annos($fig, $genome, $genome_annos{$genome});
367        }
368        close($old_assignments);
369    }
370    
371              if (!defined($peg_mapping{$peg}))  #
372    # Compute the peg mapping for a session.
373    #
374    # $fig          Active FIG instance
375    # $peer         P2P peer for this session.
376    # $session      P2P session ID
377    # $peg_mapping  Hash ref for the remote -> local PEG mapping
378    # $peg_cache    Hash ref for the persistent remote -> local PEG mapping cache db.
379    # $cache_handle DB_File handle corresponding to $peg_cache.
380    #
381    sub _compute_peg_mapping
382              {              {
383                  push(@{$genome_map{$genome_id}}, $peg);      my($fig, $peer, $session, $num_pegs, $peg_mapping, $peg_cache, $cache_handle, $skip_tough_search) = @_;
384                  print "$peg did not map\n";  
385        #
386        # genome_map is a hash mapping from target genome id to a list of
387        # pegs on the target. This is used to construct a finalize_pegs request after
388        # the first phase of peg mapping.
389        #
390    
391        my %genome_map;
392    
393        #
394        # target_genome_info is a hash mapping from target genome
395        # identifier to the target-side information on the genome -
396        # number of contigs, number of nucleotides, checksum.
397        #
398        # We accumulate it here across possibly multiple batches of
399        # peg retrievals in order to create a single  finalization
400        # list.
401        #
402    
403        my %target_genome_info;
404    
405        #
406        # For very large transfers, we need to batch the peg processing.
407        #
408    
409        for (my $peg_start = 0; $peg_start < $num_pegs; $peg_start += $peg_batch_size)
410        {
411            my $peg_req_len = $num_pegs - $peg_start;
412            $peg_req_len = $peg_batch_size if $peg_req_len > $peg_batch_size;
413    
414            print "Getting $peg_req_len pegs at $peg_start\n";
415            my $ret = $peer->get_pegs($session, $peg_start, $peg_req_len);
416    
417            if (!$ret or ref($ret) ne "ARRAY")
418            {
419                die "perform_update: get_pegs failed\n";
420              }              }
421    
422            my($peg_list, $genome_list) = @$ret;
423    
424            for my $gent (@$genome_list)
425            {
426                $target_genome_info{$gent->[0]} = $gent;
427          }          }
428    
429            _compute_peg_mapping_batch($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
430                                       $peg_list, \%genome_map);
431      }      }
432    
433      #      #
434      # finished first pass. Now go over the per-genome mappings that need to be made.      # We have finished first pass. Now go over the per-genome mappings that need to be made.
435      #      #
436      # $genome_map{$genome_id} is a list of pegs that reside on that genome.      # $genome_map{$genome_id} is a list of pegs that reside on that genome.
437      # the pegs and genome id are both target-based identifiers.      # The pegs and genome id are both target-based identifiers.
438        #
439        # %target_genome_info defines the list of genome information we have on the remote
440        # side.
441        #
442        # We build a request to be passed to finalize_pegs. Each entry in the request is either
443        # ['peg_genome', $peg] which means that we have a genome that corresponds to the
444        # genome the peg is in. We can attempt to map via contig locations.
445        #
446        # If that is not the case,  we pass a request entry of ['peg_unknown', $peg]
447        # which will result in the sequence data being returned.
448      #      #
449    
450      my @finalize_req = ();      my @finalize_req = ();
451    
452        #
453        # local_genome maps a target peg identifier to the local genome id it translates to.
454        #
455      my %local_genome;      my %local_genome;
456    
457      for my $genome_info (@$genome_list)      for my $genome (keys(%target_genome_info))
458      {      {
459          my($genome, $n_contigs, $n_nucs, $cksum) = @$genome_info;          my($tg, $n_contigs, $n_nucs, $cksum) = @{$target_genome_info{$genome}};
460    
461            $tg eq $genome or die "Invalid entry in target_genome_info for $genome => $tg, $n_contigs, $n_nucs, $cksum";
462    
463            #
464            # Don't bother unless we have any pegs to look up.
465            #
466          next unless defined($genome_map{$genome});          next unless defined($genome_map{$genome});
467    
468          #          #
# Line 177  Line 501 
501      }      }
502    
503      #      #
504      # If we need to finalize, make the call.      # We've built our finalization request. Handle it (possibly with batching here too).
505      if (@finalize_req)      #
506    
507        _process_finalization_request($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
508                                     \%local_genome, \@finalize_req, $skip_tough_search);
509    
510    }
511    
512    #
513    # Process one batch of PEGs.
514    #
515    # Same args as _compute_peg_mapping, with the addition of:
516    #
517    #       $peg_list       List of pegs to be processed
518    #       $genome_map     Hash maintaining list of genomes with their pegs.
519    #       $target_genome_info     Hash maintaining overall list of target-side genome information.
520    #
521    sub _compute_peg_mapping_batch
522    {
523        my($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
524           $peg_list, $genome_map, $target_genome_info) = @_;
525    
526        #
527        # Walk the list of pegs as returned from get_pegs() and determine what has to
528        # be done.
529        #
530        # If the entry is ['peg', $peg], we can use the peg ID as is.
531        #
532        # If the entry is ['peg_info', $peg, $alias_list, $genome], the peg
533        # has the given aliases, and is in the given genome.
534        #
535        for my $peg_info (@$peg_list)
536        {
537            my($key, $peg, @rest) = @$peg_info;
538    
539            if ($key eq 'peg')
540            {
541                #
542                # Peg id is directly usable.
543                #
544                $peg_mapping->{$peg} = $peg;
545            }
546            elsif ($key eq 'peg_info')
547            {
548                #
549                # Peg id not directly usable. See if we have it in the cache.
550                #
551    
552                if ((my $cached = $peg_cache->{$peg}) ne "")
553                {
554                    #
555                    # Cool, we've cached the result. Use it.
556                    #
557    
558                    $peg_mapping->{$peg} = $cached;
559                    # warn "Found cached mapping $peg => $cached\n";
560                    next;
561                }
562    
563                #
564                # It is not cached. Attempt to resolve by means of alias IDs.
565                #
566    
567                my($alias_list, $genome_id) = @rest;
568    
569                for my $alias (@$alias_list)
570                {
571                    my $mapped = $fig->by_alias($alias);
572                    if ($mapped)
573                    {
574                        print "$peg maps to $mapped via $alias\n";
575                        $peg_mapping->{$peg}= $mapped;
576                        $peg_cache->{$peg} = $mapped;
577                        last;
578                    }
579                }
580    
581                #
582                # If we weren't able to resolve by ID,
583                # add to %genome_map as a PEG that will need
584                # to be resolved by means of contig location.
585                #
586    
587                if (!defined($peg_mapping->{$peg}))
588                {
589                    push(@{$genome_map->{$genome_id}}, $peg);
590                    print "$peg did not map on first pass\n";
591                }
592            }
593        }
594    
595        #
596        # Flush the cache to write out any computed mappings.
597        #
598        $cache_handle->sync();
599    
600    }
601    
602    sub _process_finalization_request
603    {
604        my($fig, $peer, $session, $peg_mapping, $peg_cache, $cache_handle,
605           $local_genome, $finalize_req, $skip_tough_search) = @_;
606    
607        #
608        # Immediately return unless there's something to do.
609        #
610        return unless ref($finalize_req) and @$finalize_req > 0;
611    
612        my $fin_batch_size = 50;
613    
614        while (@$finalize_req > 0)
615      {      {
616          print Dumper(\@finalize_req);          my @req = splice(@$finalize_req, 0, $fin_batch_size);
617          $ret = $peer->finalize_pegs($session, \@finalize_req);  
618            print "Invoking finalize_pegs on ", int(@req), " pegs\n";
619            my $ret = $peer->finalize_pegs($session, \@req);
620    
621          if (!$ret or ref($ret) ne "ARRAY")          if (!$ret or ref($ret) ne "ARRAY")
622          {          {
# Line 193  Line 628 
628          # sequence data. Attempt to finish up the mapping.          # sequence data. Attempt to finish up the mapping.
629          #          #
630    
631            my(%sought, %sought_seq);
632    
633    
634          my $dbh = $fig->db_handle();          my $dbh = $fig->db_handle();
635          for my $entry (@$ret)          for my $entry (@$ret)
# Line 201  Line 638 
638    
639              if ($what eq "peg_loc")              if ($what eq "peg_loc")
640              {              {
641                  my($strand, $start, $end, $cksum) = @rest;                  my($strand, $start, $end, $cksum, $seq) = @rest;
642    
643                  #                  #
644                  # We have a contig location. Try to find a matching contig                  # We have a contig location. Try to find a matching contig
645                  # here, and see if it maps to something.                  # here, and see if it maps to something.
646                  #                  #
647    
648                  my $my_genome = $local_genome{$peg};                  my $my_genome = $local_genome->{$peg};
649                  my $local_contig = $fig->find_contig_with_checksum($my_genome, $cksum);                  my $local_contig = $fig->find_contig_with_checksum($my_genome, $cksum);
650                  if ($local_contig)                  if ($local_contig)
651                  {                  {
# Line 228  Line 665 
665                      {                      {
666                          my(@ids) = map { $_->[0] } @$res;                          my(@ids) = map { $_->[0] } @$res;
667                          my $id = $ids[0];                          my $id = $ids[0];
668                          $peg_mapping{$peg} = $id;                          $peg_mapping->{$peg} = $id;
669                            $peg_cache->{$peg} = $id;
670                          print "Mapped $peg to $id via contigs\n";                          print "Mapped $peg to $id via contigs\n";
671                          if (@$res > 1)                          if (@$res > 1)
672                          {                          {
# Line 238  Line 676 
676                      else                      else
677                      {                      {
678                          print "failed: $peg  $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";                          print "failed: $peg  $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";
679                            $sought{$peg}++;
680                            $sought_seq{$peg} = $seq;
681                      }                      }
682                  }                  }
683                  else                  else
684                  {                  {
685                      print "Mapping failed for $my_genome checksum $cksum\n";                      print "Mapping failed for $my_genome checksum $cksum\n";
686                        $sought{$peg}++;
687                        $sought_seq{$peg} = $seq;
688                  }                  }
689              }              }
690                elsif ($what eq "peg_seq")
691                {
692                    my($seq) = @rest;
693    
694                    $sought{$peg}++;
695                    $sought_seq{$peg} = $seq;
696          }          }
697      }      }
 }  
698    
699            #
700            # Now see if we need to do a tough search.
701            #
702    
703            if (keys(%sought) > 0 and !$skip_tough_search)
704            {
705                my %trans;
706    
707                print "Starting tough search\n";
708    
709                $fig->tough_search(undef, \%sought_seq, \%trans, \%sought);
710                print "Tough search translated: \n";
711                while (my($tpeg, $ttrans) = each(%trans))
712                {
713                    print "  $tpeg -> $ttrans\n";
714                    $peg_mapping->{$tpeg} = $ttrans;
715                    $peg_cache->{$tpeg} = $ttrans;
716                }
717            }
718        }
719    }
720    
721  #############  #############
722  #  #
# Line 342  Line 810 
810          # element in the body of the message.          # element in the body of the message.
811          #          #
812          my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');          my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');
813          print "Reply ns=$ns want $P2P::ns_relay\n";          # print "Reply ns=$ns want $P2P::ns_relay\n";
814    
815          if ($ns eq $P2P::ns_relay)          if ($ns eq $P2P::ns_relay)
816          {          {
817              my $val = $reply->result;              my $val = $reply->result;
818              print "got val=", Dumper($val);              # print "got val=", Dumper($val);
819              if ($val->[0] eq 'deferred')              if ($val->[0] eq 'deferred')
820              {              {
821                  #                  #
# Line 389  Line 857 
857  use strict;  use strict;
858    
859  use Data::Dumper;  use Data::Dumper;
860    use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
861    
862  use SOAP::Lite;  use SOAP::Lite;
863    
864    #use SOAP::Lite +trace => [qw(transport dispatch result debug)];
865  use P2P;  use P2P;
866    
867  #  #
# Line 402  Line 873 
873  {  {
874      my($class, $fig, $url, $peer_id, $relay) = @_;      my($class, $fig, $url, $peer_id, $relay) = @_;
875    
876      my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url);      my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url, timeout => 3600);
877    
878      my $self = {      my $self = {
879          fig => $fig,          fig => $fig,
# Line 428  Line 899 
899    
900  sub request_update  sub request_update
901  {  {
902      my($self, $last_update) = @_;      my($self, $last_update, $update_thru) = @_;
903    
904      my $rel = $self->{fig}->get_release_info();      my $rel = [$self->{fig}->get_release_info()];
905    
906      if (!defined($last_update))      if (!defined($last_update))
907      {      {
908          $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});          $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});
909      }      }
910    
911      my $reply = $self->{proxy}->request_update($rel, $last_update);      print "Requesting update via $self->{proxy}\n";
912        my $reply = $self->{proxy}->request_update($rel, $last_update, $update_thru);
913        # print "Got reply ", Dumper($reply);
914    
915      if ($self->{relay})      if ($self->{relay})
916      {      {
# Line 474  Line 947 
947      return $self->call("finalize_pegs", $session_id, $request);      return $self->call("finalize_pegs", $session_id, $request);
948  }  }
949    
950    sub get_annotations
951    {
952        my($self, $session_id, $start, $length) = @_;
953    
954        return $self->call("get_annotations", $session_id, $start, $length);
955    }
956    
957    sub get_assignments
958    {
959        my($self, $session_id, $start, $length) = @_;
960    
961        return $self->call("get_assignments", $session_id, $start, $length);
962    }
963    
964  sub call  sub call
965  {  {
966      my($self, $func, @args) = @_;      my($self, $func, @args) = @_;
967    
968        my $t0 = [gettimeofday()];
969        print "Calling $func\n";
970      my $reply = $self->{proxy}->$func(@args);      my $reply = $self->{proxy}->$func(@args);
971        my $t1 = [gettimeofday()];
972    
973        my $elap = tv_interval($t0, $t1);
974        print "Call to $func took $elap\n";
975    
976      if ($self->{relay})      if ($self->{relay})
977      {      {
# Line 516  Line 1009 
1009    
1010  sub request_update  sub request_update
1011  {  {
1012      my($class, $his_release, $last_update)= @_;      my($class, $his_release, $last_update, $update_thru)= @_;
1013    
1014      #      #
1015      # Verify input.      # Verify input.
# Line 527  Line 1020 
1020          die "request_update: last_update must be a number (not '$last_update')\n";          die "request_update: last_update must be a number (not '$last_update')\n";
1021      }      }
1022    
1023        if ($update_thru eq "")
1024        {
1025            $update_thru = time + 10000;
1026        }
1027    
1028      #      #
1029      # Create a new session id and a spool directory to use for storage      # Create a new session id and a spool directory to use for storage
1030      # of information about it. This can go in the tempdir since it is      # of information about it. This can go in the tempdir since it is
# Line 554  Line 1052 
1052    
1053      my %pegs;      my %pegs;
1054    
1055        #
1056        # We keep track of usernames that have been seen, so that
1057        # we can both update our local user database and
1058        # we can report them to our peer.
1059        #
1060    
1061        my %users;
1062    
1063      my $num_annos = 0;      my $num_annos = 0;
1064      my $num_genomes = 0;      my $num_genomes = 0;
1065      my $num_pegs = 0;      my $num_pegs = 0;
1066        my $num_assignments = 0;
1067    
1068      my $anno_fh;      my $anno_fh;
1069      open($anno_fh, ">$spool_dir/annos");      open($anno_fh, ">$spool_dir/annos");
# Line 567  Line 1074 
1074      my $genome_fh;      my $genome_fh;
1075      open($genome_fh, ">$spool_dir/genomes");      open($genome_fh, ">$spool_dir/genomes");
1076    
1077        my $assign_fh;
1078        open($assign_fh, ">$spool_dir/assignments");
1079    
1080      for my $genome (@$all_genomes)      for my $genome (@$all_genomes)
1081      {      {
1082          my $num_annos_for_genome = 0;          my $num_annos_for_genome = 0;
1083            my %assignment;
1084    
1085          my $genome_dir = "$FIG_Config::organisms/$genome";          my $genome_dir = "$FIG_Config::organisms/$genome";
1086          next unless -d $genome_dir;          next unless -d $genome_dir;
# Line 586  Line 1097 
1097    
1098                  if ((($fid, $anno_time, $who, $anno_text) =                  if ((($fid, $anno_time, $who, $anno_text) =
1099                       ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and                       ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and
1100                      $anno_time > $last_update)                      $anno_time > $last_update and
1101                        $anno_time < $update_thru)
1102    
1103                  {                  {
1104                      #                      #
1105                        # Update users list.
1106                        #
1107    
1108                        $users{$who}++;
1109    
1110                        #
1111                      # Look up aliases if we haven't seen this fid before.                      # Look up aliases if we haven't seen this fid before.
1112                      #                      #
1113    
# Line 607  Line 1125 
1125    
1126                      $num_annos_for_genome++;                      $num_annos_for_genome++;
1127                      $num_annos++;                      $num_annos++;
1128    
1129                        #
1130                        # While we're here, see if this is an assignment. We check in the
1131                        # %assignment hash, which is keyed on fid, to see if we already
1132                        # saw an assignment for this fid. If we have, we keep this one only if
1133                        # the assignment time on it is later than the one we saw already.
1134                        #
1135                        # We are only looking at master assignments for now. We will need
1136                        # to return to this issue and reexamine it, but in order to move
1137                        # forward I am only matching master assignments.
1138                        #
1139    
1140                        if ($anno_text =~ /Set master function to\n(\S[^\n]+\S)/)
1141                        {
1142                            my $func = $1;
1143    
1144                            my $other = $assignment{$fid};
1145    
1146                            #
1147                            # If we haven't seen an assignment for this fid,
1148                            # or if it the other assignment has a timestamp that
1149                            # is earlier than this one, set the assignment.
1150                            #
1151    
1152                            if (!defined($other) or
1153                                ($other->[1] < $anno_time))
1154                            {
1155                                $assignment{$fid} = [$fid, $anno_time, $who, $func];
1156                            }
1157                        }
1158                  }                  }
1159              }              }
1160              close($afh);              close($afh);
1161    
1162                #
1163                # Write out the assignments that remain.
1164                #
1165    
1166                for my $fid (sort keys(%assignment))
1167                {
1168                    print $assign_fh join("\t", @{$assignment{$fid}}), "\n";
1169                    $num_assignments++;
1170          }          }
1171            }
1172    
1173    
1174          #          #
1175          # Determine genome information if we have annotations for this one.          # Determine genome information if we have annotations for this one.
# Line 643  Line 1202 
1202      close($anno_fh);      close($anno_fh);
1203      close($peg_fh);      close($peg_fh);
1204      close($genome_fh);      close($genome_fh);
1205        close($assign_fh);
1206    
1207      print "Pegs: $num_pegs\n";      print "Pegs: $num_pegs\n";
1208      print "Genomes: $num_genomes\n";      print "Genomes: $num_genomes\n";
# Line 652  Line 1212 
1212      # Check compatibility.      # Check compatibility.
1213      #      #
1214    
1215      my $my_release = $fig->get_release_info();      my $my_release = [$fig->get_release_info()];
1216      my $compatible = (defined($my_release) && ($my_release == $his_release)) ? 1 : 0;  
1217        #
1218        # Release id is $my_release->[1].
1219        #
1220    
1221        my $compatible;
1222        if ($my_release->[1] ne "" and $his_release->[1] ne "")
1223        {
1224            #
1225            # Both releases must be defined for them to be compatible.
1226            #
1227            # At some point we need to consider the derived-release issue.
1228            #
1229    
1230            $compatible = $my_release->[1] eq $his_release->[1];
1231        }
1232        else
1233        {
1234            $compatible = 0;
1235        }
1236    
1237      open(my $fh, ">$spool_dir/INFO");      open(my $fh, ">$spool_dir/INFO");
1238      print $fh "requestor_release\t$his_release\n";      print $fh "requestor_release\t$his_release\n";
1239      print $fh "last_update\t$last_update\n";      print $fh "last_update\t$last_update\n";
1240        print $fh "update_thru\t$update_thru\n";
1241      print $fh "cur_update\t$now\n";      print $fh "cur_update\t$now\n";
1242      print $fh "target_release\t$my_release\n";      print $fh "target_release\t$my_release\n";
1243      print $fh "compatible\t$compatible\n";      print $fh "compatible\t$compatible\n";
1244      print $fh "num_pegs\t$num_pegs\n";      print $fh "num_pegs\t$num_pegs\n";
1245      print $fh "num_genomes\t$num_genomes\n";      print $fh "num_genomes\t$num_genomes\n";
1246      print $fh "num_annos\t$num_annos\n";      print $fh "num_annos\t$num_annos\n";
1247        print $fh "num_assignments\t$num_assignments\n";
1248      close($fh);      close($fh);
1249    
1250      return [$session_id, $my_release, $num_annos, $num_pegs, $num_genomes, $now, $compatible];      #
1251        # Construct list of users, and pdate local user database.
1252        #
1253    
1254        my @users = keys(%users);
1255        # $fig->ensure_users(\@users);
1256    
1257        return [$session_id, $my_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
1258                $now, $compatible, \@users];
1259  }  }
1260    
1261    
# Line 801  Line 1390 
1390              #              #
1391              # Return the location and contig checksum for this peg.              # Return the location and contig checksum for this peg.
1392              #              #
1393                # We also include the sequence in case the contig mapping doesn't work.
1394                #
1395    
1396              my $loc = $fig->feature_location($peg);              my $loc = $fig->feature_location($peg);
1397              my $contig = $fig->contig_of($loc);              my $contig = $fig->contig_of($loc);
1398              my $cksum = $fig->contig_checksum($fig->genome_of($peg), $contig);              my $cksum = $fig->contig_checksum($fig->genome_of($peg), $contig);
1399              warn "Checksum for '$loc' '$contig' is $cksum\n";              my $seq = $fig->get_translation($peg);
1400    
1401              push(@$out, ['peg_loc', $peg,              push(@$out, ['peg_loc', $peg,
1402                          $fig->strand_of($loc),                          $fig->strand_of($peg),
1403                          $fig->beg_of($loc), $fig->end_of($loc),                          $fig->beg_of($loc), $fig->end_of($loc),
1404                          $cksum]);                          $cksum, $seq]);
1405    
1406          }          }
1407          elsif ($what eq "peg_unknown")          elsif ($what eq "peg_unknown")
# Line 822  Line 1413 
1413      return $out;      return $out;
1414  }  }
1415    
1416    
1417    sub get_annotations
1418    {
1419        my($self, $session_id, $start, $len) = @_;
1420    
1421        #
1422        # This is now easy; just run thru the saved annotations and return.
1423        #
1424    
1425        my(%session_info);
1426    
1427        my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1428    
1429        -d $spool_dir or die "Invalid session id $session_id";
1430    
1431        #
1432        # Read in the cached information for this session.
1433        #
1434    
1435        open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1436        while (<$info_fh>)
1437        {
1438            chomp;
1439            my($var, $val) = split(/\t/, $_, 2);
1440            $session_info{$var} = $val;
1441        }
1442        close($info_fh);
1443    
1444        #
1445        # Sanity check start and length.
1446        #
1447    
1448        if ($start < 0 or $start >= $session_info{num_annos})
1449        {
1450            die "Invalid start position $start";
1451        }
1452    
1453        if ($len < 0 or ($start + $len - 1) >= $session_info{num_annos})
1454        {
1455            die "Invalid length $len";
1456        }
1457    
1458        #
1459        # Open file, spin to the starting line, then start reading.
1460        #
1461    
1462        open(my $anno_fh, "<$spool_dir/annos") or die "Cannot open annos file: $!";
1463    
1464        my $anno_output = [];
1465    
1466        my $anno_num = 0;
1467    
1468        local $/ = "//\n";
1469        while (<$anno_fh>)
1470        {
1471            next if ($anno_num < $start);
1472    
1473            last if ($anno_num > ($start + $len));
1474    
1475            chomp;
1476    
1477            my($id, $date, $author, $anno) = split(/\n/, $_, 4);
1478    
1479            push(@$anno_output, [$id, $date, $author, $anno]);
1480        }
1481        continue
1482        {
1483            $anno_num++;
1484        }
1485    
1486        return $anno_output;
1487    }
1488    
1489    sub get_assignments
1490    {
1491        my($self, $session_id, $start, $len) = @_;
1492    
1493        #
1494        # This is now easy; just run thru the saved assignments and return.
1495        #
1496    
1497        my(%session_info);
1498    
1499        my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1500    
1501        -d $spool_dir or die "Invalid session id $session_id";
1502    
1503        #
1504        # Read in the cached information for this session.
1505        #
1506    
1507        open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1508        while (<$info_fh>)
1509        {
1510            chomp;
1511            my($var, $val) = split(/\t/, $_, 2);
1512            $session_info{$var} = $val;
1513        }
1514        close($info_fh);
1515    
1516        #
1517        # Sanity check start and length.
1518        #
1519    
1520        if ($start < 0 or $start >= $session_info{num_assignments})
1521        {
1522            die "Invalid start position $start";
1523        }
1524    
1525        if ($len < 0 or ($start + $len - 1) >= $session_info{num_assignments})
1526        {
1527            die "Invalid length $len";
1528        }
1529    
1530        #
1531        # Open file, spin to the starting line, then start reading.
1532        #
1533    
1534        open(my $assign_fh, "<$spool_dir/assignments") or die "Cannot open assignments file: $!";
1535    
1536        my $assign_output = [];
1537    
1538        my $assign_num = 0;
1539    
1540        while (<$assign_fh>)
1541        {
1542            next if ($assign_num < $start);
1543    
1544            last if ($assign_num > ($start + $len));
1545    
1546            chomp;
1547    
1548            my($id, $date, $author, $func) = split(/\t/, $_, 4);
1549    
1550            push(@$assign_output, [$id, $date, $author, $func]);
1551        }
1552        continue
1553        {
1554            $assign_num++;
1555        }
1556    
1557        return $assign_output;
1558    }
1559    

Legend:
Removed from v.1.12  
changed lines
  Added in v.1.21

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3