[Bio] / FigKernelPackages / P2P.pm Repository:
ViewVC logotype

Annotation of /FigKernelPackages/P2P.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.20 - (view) (download) (as text)

1 : olson 1.1 #
2 :     # This module contains the code for the P2P update protocol.
3 :     #
4 :     # Package P2P contains the namespace declarations, and possibly toplevel utility
5 :     # routines. (get_relay ?)
6 :     #
7 :     # Package P2P::Relay contains methods for contacting the P2P relay service. The actual
8 :     # implementation of the relay service is not contained here - it is a standalone module
9 :     # that can be installed on a web server that does not have a full SEED installed.
10 :     #
11 :     # Package P2P::Requestor contains the requestor-side code for the update protocol.
12 :     #
13 :     # Package P2P::Service contains the web service implementation routines for the
14 :     # protocol.
15 :     #
16 :    
17 :     package P2P;
18 :    
19 :     use FIG_Config;
20 :    
21 : olson 1.19 use DB_File;
22 :     use Fcntl;
23 :    
24 : olson 1.1 use strict;
25 :     use Exporter;
26 :     use base qw(Exporter);
27 :    
28 : olson 1.15 use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
29 :    
30 : olson 1.1 use Data::Dumper;
31 :    
32 :     use vars qw(@EXPORT @EXPORT_OK);
33 :     @EXPORT = ();
34 :     @EXPORT_OK = qw($ns_p2p $ns_relay);
35 :    
36 :     our $ns_p2p = "http://thefig.info/schemas/p2p_update";
37 :     our $ns_relay = "http://thefig.info/schemas/p2p_relay";
38 :    
39 :     =pod
40 :    
41 :     =head1 perform_update($peer)
42 :    
43 :     Perform a peer-to-peer update with the given peer. $peer is an instance of
44 :     P2P::Requestor which can connect to the peer. It is expected that the
45 :     SEED infrastructure will create this requestor appropriately for the
46 :     particular circumstance (direct connection, thru relay, etc).
47 :    
48 :     This code executes the high-level protocol, maintaining state between
49 :     calls to the peer to exchange the actual information.
50 :    
51 :     =cut
52 :    
53 :     sub perform_update
54 :     {
55 : olson 1.20 my($fig, $peer, $last_update, $skip_tough_search, $update_thru) = @_;
56 : olson 1.1
57 : olson 1.20 my $ret = $peer->request_update($last_update, $update_thru);
58 : olson 1.1
59 :     if (!$ret or ref($ret) ne "ARRAY")
60 :     {
61 : olson 1.18 die "perform_update: request_update failed\n";
62 : olson 1.1 }
63 :    
64 : olson 1.15 my($session, $target_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
65 : olson 1.1 $target_time, $compatible) = @$ret;
66 :    
67 : olson 1.18 print "perform_update: session=$session target=@$target_release num_annos=$num_annos\n";
68 : olson 1.1 print " num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";
69 :    
70 :     #
71 : olson 1.19 # We now know the data release for our peer.
72 :     #
73 :     # Open up the peg translation cache database (a DB_File) tied
74 :     # to %peg_cache. We needn't worry about keeping it in a directory
75 :     # based on our current release, as it the cache directory is kept *in*
76 :     # the current data release directory.
77 :     #
78 :    
79 :     my $cache_handle;
80 :     my %peg_cache;
81 :     if ($target_release->[1] ne "")
82 :     {
83 :     my $cache_file = "pegcache.$target_release->[1].db";
84 :     my $cache_dir = "$FIG_Config::data/P2PQueue";
85 :     $fig->verify_dir($cache_dir);
86 :    
87 :     $cache_handle = tie(%peg_cache, "DB_File", "$cache_dir/$cache_file",
88 :     O_CREAT | O_RDWR, 0666, $DB_HASH);
89 :     $cache_handle or warn "Could not tie peg_cache to $cache_dir/$cache_file: $!\n";
90 :     }
91 :    
92 :     #
93 : olson 1.1 # We have the information now to begin the update process. Retrieve the pegs.
94 :     #
95 :    
96 :     $ret = $peer->get_pegs($session, 0, $num_pegs);
97 :    
98 :     if (!$ret or ref($ret) ne "ARRAY")
99 :     {
100 :     die "perform_update: get_pegs failed\n";
101 :     }
102 :    
103 :     my($peg_list, $genome_list) = @$ret;
104 :    
105 :     #
106 :     # Walk the peg-list to and generate @pegs_to_finalize.
107 :     #
108 :    
109 :     my(%peg_mapping, %genome_map );
110 :    
111 :     for my $peg_info (@$peg_list)
112 :     {
113 :     my($key, $peg, @rest) = @$peg_info;
114 :    
115 :     if ($key eq 'peg')
116 :     {
117 :     #
118 :     # Peg id is directly usable.
119 :     #
120 : olson 1.6 $peg_mapping{$peg} = $peg;
121 : olson 1.1 }
122 :     elsif ($key eq 'peg_info')
123 :     {
124 :     #
125 : olson 1.19 # Peg id not directly usable. See if we have it in the cache.
126 : olson 1.1 #
127 :    
128 : olson 1.19 if ((my $cached = $peg_cache{$peg}) ne "")
129 :     {
130 :     #
131 :     # Cool, we've cached the result. Use it.
132 :     #
133 :    
134 :     $peg_mapping{$peg} = $cached;
135 :     warn "Found cached mapping $peg => $cached\n";
136 :     next;
137 :     }
138 :    
139 : olson 1.1 my($alias_list, $genome_id) = @rest;
140 :    
141 :     for my $alias (@$alias_list)
142 :     {
143 :     my $mapped = $fig->by_alias($alias);
144 : olson 1.3 if ($mapped)
145 : olson 1.1 {
146 :     print "$peg maps to $mapped via $alias\n";
147 :     $peg_mapping{$peg}= $mapped;
148 : olson 1.19 $peg_cache{$peg} = $mapped;
149 : olson 1.1 last;
150 :     }
151 :     }
152 :    
153 :     #
154 :     # If we didn't succeed in mapping by alias,
155 :     # stash this in the list of pegs to be mapped by
156 :     # genome.
157 :     #
158 :    
159 :     if (!defined($peg_mapping{$peg}))
160 :     {
161 :     push(@{$genome_map{$genome_id}}, $peg);
162 : olson 1.4 print "$peg did not map\n";
163 : olson 1.1 }
164 :     }
165 :     }
166 :    
167 : olson 1.19 $cache_handle->sync();
168 :    
169 : olson 1.1 #
170 :     # finished first pass. Now go over the per-genome mappings that need to be made.
171 :     #
172 : olson 1.6 # $genome_map{$genome_id} is a list of pegs that reside on that genome.
173 :     # the pegs and genome id are both target-based identifiers.
174 :     #
175 :    
176 :     my @finalize_req = ();
177 : olson 1.7 my %local_genome;
178 : olson 1.1
179 :     for my $genome_info (@$genome_list)
180 :     {
181 :     my($genome, $n_contigs, $n_nucs, $cksum) = @$genome_info;
182 :    
183 : olson 1.5 next unless defined($genome_map{$genome});
184 : olson 1.1
185 : olson 1.6 #
186 :     # Determine if we have a local genome installed that matches precisely the
187 :     # genome on the target side.
188 :     #
189 : olson 1.1 my $my_genome = $fig->find_genome_by_content($genome, $n_contigs, $n_nucs, $cksum);
190 :    
191 : olson 1.6 my $pegs = $genome_map{$genome};
192 :    
193 : olson 1.1 if ($my_genome)
194 :     {
195 :     #
196 : olson 1.6 # We do have such a local genome. Generate a peg_genome request to
197 :     # get the location information from the target side.
198 : olson 1.1 #
199 : olson 1.7 # Also remember the local genome mapping for this peg.
200 :     #
201 :    
202 : olson 1.6 print "$genome mapped to $my_genome\n";
203 : olson 1.7 for my $peg (@$pegs)
204 :     {
205 :     push(@finalize_req, ['peg_genome', $peg]);
206 :     $local_genome{$peg} = $my_genome;
207 :     }
208 : olson 1.1
209 :     }
210 : olson 1.2 else
211 :     {
212 : olson 1.6 #
213 :     # We don't have such a genome. We need to retrieve the
214 :     # sequence data in order to finish mapping.
215 :     #
216 :     push(@finalize_req, map { ['peg_unknown', $_] } @$pegs);
217 :     }
218 :     }
219 :    
220 :     #
221 :     # If we need to finalize, make the call.
222 :     if (@finalize_req)
223 :     {
224 : olson 1.19 # print Dumper(\@finalize_req);
225 : olson 1.6 $ret = $peer->finalize_pegs($session, \@finalize_req);
226 :    
227 :     if (!$ret or ref($ret) ne "ARRAY")
228 :     {
229 :     die "perform_update: finalize_pegs failed\n";
230 : olson 1.2 }
231 : olson 1.6
232 :     #
233 :     # The return is a list of either location entries or
234 : olson 1.7 # sequence data. Attempt to finish up the mapping.
235 : olson 1.6 #
236 :    
237 : olson 1.14 my(%sought, %sought_seq);
238 : olson 1.13
239 : olson 1.9
240 :     my $dbh = $fig->db_handle();
241 : olson 1.7 for my $entry (@$ret)
242 :     {
243 :     my($what, $peg, @rest) = @$entry;
244 :    
245 :     if ($what eq "peg_loc")
246 :     {
247 : olson 1.13 my($strand, $start, $end, $cksum, $seq) = @rest;
248 : olson 1.7
249 :     #
250 :     # We have a contig location. Try to find a matching contig
251 :     # here, and see if it maps to something.
252 :     #
253 :    
254 :     my $my_genome = $local_genome{$peg};
255 :     my $local_contig = $fig->find_contig_with_checksum($my_genome, $cksum);
256 :     if ($local_contig)
257 :     {
258 : olson 1.8 #
259 : olson 1.9 # Now look up the local peg. We match on the end location; depending on the strand
260 :     # the feature is on, we want to look at either minloc or maxloc.
261 : olson 1.8 #
262 : olson 1.9
263 :     my $whichloc = $strand eq '-' ? "minloc" : "maxloc";
264 :    
265 :     my $res = $dbh->SQL(qq!SELECT id from features
266 :     WHERE $whichloc = $end and genome = '$my_genome' and
267 :     contig = '$local_contig'
268 :     !);
269 :    
270 : olson 1.12 if ($res and @$res > 0)
271 : olson 1.9 {
272 : olson 1.12 my(@ids) = map { $_->[0] } @$res;
273 :     my $id = $ids[0];
274 : olson 1.9 $peg_mapping{$peg} = $id;
275 : olson 1.19 $peg_cache{$peg} = $id;
276 : olson 1.9 print "Mapped $peg to $id via contigs\n";
277 : olson 1.12 if (@$res > 1)
278 :     {
279 :     warn "Multiple mappings found for $peg: @ids\n";
280 :     }
281 : olson 1.9 }
282 :     else
283 :     {
284 : olson 1.11 print "failed: $peg $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";
285 : olson 1.13 $sought{$peg}++;
286 :     $sought_seq{$peg} = $seq;
287 : olson 1.9 }
288 : olson 1.7 }
289 :     else
290 :     {
291 :     print "Mapping failed for $my_genome checksum $cksum\n";
292 : olson 1.13 $sought{$peg}++;
293 :     $sought_seq{$peg} = $seq;
294 : olson 1.7 }
295 :     }
296 : olson 1.13 elsif ($what eq "peg_seq")
297 :     {
298 :     my($seq) = @rest;
299 :    
300 :     $sought{$peg}++;
301 :     $sought_seq{$peg} = $seq;
302 :     }
303 :     }
304 :    
305 :     #
306 :     # Now see if we need to do a tough search.
307 :     #
308 :    
309 : olson 1.19 if (keys(%sought) > 0 and !$skip_tough_search)
310 : olson 1.13 {
311 :     my %trans;
312 :    
313 :     print "Starting tough search\n";
314 :    
315 :     $fig->tough_search(undef, \%sought_seq, \%trans, \%sought);
316 :     print "Tough search translated: \n";
317 : olson 1.15 while (my($tpeg, $ttrans) = each(%trans))
318 :     {
319 :     print " $tpeg -> $ttrans\n";
320 :     $peg_mapping{$tpeg} = $ttrans;
321 : olson 1.19 $peg_cache{$tpeg} = $ttrans;
322 : olson 1.15 }
323 : olson 1.7 }
324 : olson 1.1 }
325 : olson 1.19 $cache_handle->sync();
326 :     untie %peg_cache;
327 :    
328 :     #
329 :     # Retrieve the assignments.
330 :     #
331 :    
332 :     my $assignments = $peer->get_assignments($session, 0, $num_assignments);
333 : olson 1.15
334 :     #
335 :     # Retrieve the annotations, and generate a list of mapped annotations.
336 :     #
337 :    
338 : olson 1.19 my $annos = $peer->get_annotations($session, 0, $num_annos);
339 : olson 1.15
340 :     #
341 :     # Create a list of locally-mapped annotations on a per-genome
342 :     # basis.
343 :     #
344 :    
345 :     my %genome_annos;
346 : olson 1.19
347 :     #
348 :     # %genome_assignments is a hash mapping from genome to a hashref
349 :     # that maps peg to function (since assignments are unique).
350 :     #
351 :     # (Hm. Unless two remote pegs map to the same local peg; unclear what to do
352 :     # then. Punt for now).
353 :     #
354 :     my %genome_assignments;
355 : olson 1.15
356 :     for my $anno (@$annos)
357 :     {
358 :     my($his_id, $ts, $author, $anno) = @$anno;
359 :    
360 :     my $my_id = $peg_mapping{$his_id};
361 :     next unless $my_id;
362 :    
363 :     my $genome = $fig->genome_of($my_id);
364 :    
365 :     push(@{$genome_annos{$genome}}, [$my_id, $ts, $author, $anno]);
366 :     }
367 :    
368 : olson 1.19 #
369 :     # Do the same for the assignments
370 :     #
371 :    
372 : olson 1.20 # print Dumper($assignments);
373 :    
374 : olson 1.19 for my $assign (@$assignments)
375 :     {
376 :     my($his_id, $ts, $author, $func) = @$assign;
377 :    
378 :     my $my_id = $peg_mapping{$his_id};
379 :     next unless $my_id;
380 :    
381 :     my $genome = $fig->genome_of($my_id);
382 :    
383 :     $genome_assignments{$genome}->{$my_id} = [$my_id, $ts, $author, $func];
384 :    
385 :    
386 :     }
387 :    
388 :     # print Dumper(\%genome_annos);
389 : olson 1.15
390 :     #
391 :     # Now install annotations.
392 :     #
393 :    
394 : olson 1.20 open(my $old_assignments, ">old_assignments");
395 :    
396 : olson 1.15 for my $genome (keys(%genome_annos))
397 :     {
398 : olson 1.19 #
399 :     # Plan: Apply the merge_annotations.pl logic. Read the annotations
400 :     # from the per-org annotations file, add the new ones here, sort, and remove duplicates.
401 :     # Write the results to the annotations file.
402 :     #
403 :     # When we are all done, rerun the index_annotations script.
404 :     #
405 :     # Why not do that incrementally? Partly because the annotation_seeks table doesn't
406 :     # have a column for the genome id, so a removal of old data would require a
407 :     # string-match query; since a complete reindex of the annotations is pretty
408 :     # fast (60 sec on a G4 laptop on a firewire disk), it's not clear whether the incremental
409 :     # update would actually be a win.
410 :     #
411 :    
412 :     my @annos = @{$genome_annos{$genome}};
413 :     my $assignments = $genome_assignments{$genome};
414 :     #
415 :     # %assignment_annos is a hash from peg to the list
416 :     # of annotations for that peg.
417 :     #
418 :     my %assignment_annos;
419 :    
420 :     my $dir = "$FIG_Config::organisms/$genome";
421 :     my $anno_file = "$dir/annotations";
422 :     my $anno_bak = "$dir/annotations." . time;
423 :    
424 :     my $new_count = @annos;
425 :    
426 :     #
427 :     # Rename the annotations file to a new name based on the current time.
428 :     #
429 :    
430 :     if (-f $anno_file)
431 :     {
432 :     rename($anno_file, $anno_bak) or die "Cannot rename $anno_file to $anno_bak: $!";
433 :     }
434 :    
435 :     if (open(my $fh, "<$anno_bak"))
436 :     {
437 :     #
438 :     # While we are scanning here, we look for the latest local assignment
439 :     # for any peg for which we are installing an assignment.
440 :     #
441 :     local($/) = "\n//\n";
442 :    
443 :     my($chunk, $peg, $ts, $author, $anno);
444 :    
445 :     while (defined($chunk = <$fh>))
446 :     {
447 :     chomp $chunk;
448 :     ($peg, $ts, $author, $anno) = split(/\n/, $chunk, 4);
449 :    
450 :     if ($peg =~ /^fig\|/ and $ts =~ /^\d+$/)
451 :     {
452 :     my $ent = [$peg, $ts, $author, $anno];
453 :     push(@annos, $ent);
454 :    
455 :     if (defined($assignments->{$peg}))
456 :     {
457 :     #
458 :     # We have an incoming assignment for this peg.
459 :     # Don't parse anything yet, but push the annotation
460 :     # on a list so we can sort by date.
461 :     #
462 :     push(@{$assignment_annos{$peg}}, $ent);
463 :     }
464 :     }
465 :     }
466 :     close($fh);
467 :     }
468 :    
469 :     #
470 :     # Determine if we are going to install an assignment.
471 :     #
472 :    
473 :     for my $peg (keys %$assignments)
474 :     {
475 : olson 1.20 my(undef, $ts, $author, $func) = @{$assignments->{$peg}};
476 :    
477 :     #
478 :     # Sort the existing annotations for this peg by date.
479 :     #
480 :     # Recall that this list has entries [$peg, $timestamp, $author, $anno]
481 :     #
482 :    
483 :     my @eannos;
484 :     if (ref($assignment_annos{$peg}))
485 :     {
486 :     @eannos = sort { $b->[1] <=> $a->[1] } @{$assignment_annos{$peg}};
487 :     }
488 :     else
489 :     {
490 :     #
491 :     # No assignment annotations found.
492 :     #
493 :     @eannos = ();
494 :     }
495 :    
496 :     # print "Assignment annos for $peg: ", Dumper(\@eannos);
497 : olson 1.19
498 :     #
499 : olson 1.20 # Filter out just the master assignments that are newer than
500 :     # the one we are contemplating putting in place.
501 : olson 1.19 #
502 :    
503 : olson 1.20 my @cand = grep {
504 :     ($_->[1] > $ts) and ($_->[3] =~ /Set master function to/)
505 :     } @eannos;
506 :    
507 :     if (@cand > 0)
508 :     {
509 :     #
510 :     # Here is were some policy needs to be put in place --
511 :     # we have a more recent annotation on the current system.
512 :     #
513 :     # For now, we will not install an assignment if there is any
514 :     # newer assignment in place.
515 :     #
516 :    
517 :     warn "Skipping assignment for $peg $func due to more recent assignment $cand[0]->[3]\n";
518 :     }
519 :     else
520 :     {
521 :     #
522 :     # Nothing is blocking us. While we are testing, just slam this assignment in.
523 :     #
524 : olson 1.19
525 : olson 1.20 my $old = $fig->function_of($peg, 'master');
526 :     print $old_assignments "$peg\t$old\n";
527 :    
528 :     print "Assign $peg $func\n";
529 :     $fig->assign_function($peg, 'master', $func);
530 :     }
531 : olson 1.19 }
532 :    
533 :     open(my $outfh, ">$anno_file") or die "Cannot open new annotation file $anno_file: $!\n";
534 :    
535 :     my $last;
536 :     my @sorted = sort { ($a->[0] cmp $b->[0]) or ($a->[1] <=> $b->[1]) } @annos;
537 :     my $inst = 0;
538 :     my $dup = 0;
539 :     foreach my $ann (@sorted)
540 :     {
541 :     my $txt = join("\n", @$ann);
542 :     #
543 :     # Drop the trailing \n if there is one; we will add it back when we print and
544 :     # want to ensure the file format remains sane.
545 :     #
546 :     chomp $txt;
547 :     if ($txt ne $last)
548 :     {
549 :     print $outfh "$txt\n//\n";
550 :     $last = $txt;
551 :     print "Inst $ann->[0] $ann->[1] $ann->[2]\n";
552 :     $inst++;
553 :     }
554 :     else
555 :     {
556 :     print "Dup $ann->[0] $ann->[1] $ann->[2]\n";
557 :     $dup++;
558 :     }
559 :     }
560 :     close($outfh);
561 :     chmod(0666, $anno_file) or warn "Cannot chmod 0666 $anno_file: $!\n";
562 :     print "Wrote $anno_file. $new_count new annos, $inst installed, $dup duplicates\n";
563 :    
564 :     #
565 : olson 1.15 # _install_genome_annos($fig, $genome, $genome_annos{$genome});
566 :     }
567 : olson 1.20 close($old_assignments);
568 : olson 1.1 }
569 :    
570 :    
571 : olson 1.15
572 : olson 1.1 #############
573 :     #
574 :     # P2P Relay
575 :     #
576 :     #############
577 :    
578 :    
579 :     package P2P::Relay;
580 :     use strict;
581 :    
582 :     use Data::Dumper;
583 :     use SOAP::Lite;
584 :    
585 :     use P2P;
586 :    
587 :     sub new
588 :     {
589 :     my($class, $url) = @_;
590 :    
591 :     my $proxy = SOAP::Lite->uri($P2P::ns_relay)->proxy($url);
592 :    
593 :     my $self = {
594 :     url => $url,
595 :     proxy => $proxy,
596 :     };
597 :     return bless($self, $class);
598 :     }
599 :    
600 :     sub enumerate_annotation_systems
601 :     {
602 :     my($self) = @_;
603 :    
604 :     return $self->{proxy}->enumerate_annotation_systems()->result;
605 :     }
606 :    
607 :     sub fetch_queries
608 :     {
609 :     my($self, $id) = @_;
610 :    
611 :     my $reply = $self->{proxy}->fetch_queries($id);
612 :    
613 :     if ($reply->fault)
614 :     {
615 :     print "Failed to fetch queries: ", $reply->faultcode, " ", $reply->faultstring, "\n";
616 :     return undef;
617 :     }
618 :    
619 :     return $reply->result;
620 :     }
621 :    
622 :     sub deposit_answer
623 :     {
624 :     my($self, $id, $key, $answer) = @_;
625 :    
626 :     my $reply = $self->{proxy}->deposit_answer($id, $key,
627 :     SOAP::Data->type('base64')->value($answer));
628 :    
629 :     if ($reply->fault)
630 :     {
631 :     print "deposit_answer got fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
632 :     return undef;
633 :     }
634 :    
635 :     return $reply;
636 :     }
637 :    
638 :     =pod
639 :    
640 :     =head1 await_result
641 :    
642 :     Await the result from a possibly-asynchronous soap request.
643 :    
644 :     Look at the reply that we have. If it's a deferred reply, loop polling
645 :     the relay for the actual result.
646 :    
647 :     We determine if the reply is a deferred reply by examining the namespace
648 :     URI of the response. A response will be generated from the relay's namespace,
649 :     rather than that of the application itself.
650 :    
651 :     =cut
652 :    
653 :     sub await_result
654 :     {
655 :     my($self, $reply) = @_;
656 :    
657 :     while (1)
658 :     {
659 :     #
660 :     # Retrieve the namespace of the response, which is the first
661 :     # element in the body of the message.
662 :     #
663 :     my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');
664 : olson 1.20 # print "Reply ns=$ns want $P2P::ns_relay\n";
665 : olson 1.1
666 :     if ($ns eq $P2P::ns_relay)
667 :     {
668 :     my $val = $reply->result;
669 : olson 1.20 # print "got val=", Dumper($val);
670 : olson 1.1 if ($val->[0] eq 'deferred')
671 :     {
672 :     #
673 :     # Sleep a little, then try to retrieve the response.
674 :     #
675 :    
676 :     sleep(1);
677 :     my $id = $val->[1];
678 :    
679 :     print "Retrieving reply\n";
680 :     $reply = $self->{proxy}->call_completed($id);
681 :     }
682 :     else
683 :     {
684 :     #
685 :     # We're not sure what to do here..
686 :     #
687 :     return undef;
688 :     }
689 :     }
690 :     else
691 :     {
692 :     #
693 :     # We got an actual response. Return it.
694 :     #
695 :    
696 :     return $reply;
697 :     }
698 :     }
699 :     }
700 :    
701 :     #############
702 :     #
703 :     # P2P Requestor
704 :     #
705 :     #############
706 :    
707 :     package P2P::Requestor;
708 :     use strict;
709 :    
710 :     use Data::Dumper;
711 : olson 1.15 use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
712 : olson 1.1
713 :     use SOAP::Lite;
714 : olson 1.15
715 :     #use SOAP::Lite +trace => [qw(transport dispatch result debug)];
716 : olson 1.1 use P2P;
717 :    
718 :     #
719 :     # Create a new Requestor. It contains a reference to the FIG instance
720 :     # so that we can run the protocol completely from in here.
721 :     #
722 :    
723 :     sub new
724 :     {
725 :     my($class, $fig, $url, $peer_id, $relay) = @_;
726 :    
727 : olson 1.17 my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url, timeout => 3600);
728 : olson 1.1
729 :     my $self = {
730 :     fig => $fig,
731 :     url => $url,
732 :     peer_id => $peer_id,
733 :     proxy => $proxy,
734 :     relay => $relay,
735 :     };
736 :     return bless($self, $class);
737 :     }
738 :    
739 :     #
740 :     # First step: Request an update.
741 :     #
742 :     # We need to determine some notion of what our release is, since we are not
743 :     # currently tagging them explicitly. Until we delve into this more,
744 :     # I am going to return a null release, which means the same-release
745 :     # optimization won't be able to kick in.
746 :     #
747 :     # We also need to determine the last time we got an update from this
748 :     # system.
749 :     #
750 :    
751 :     sub request_update
752 :     {
753 : olson 1.20 my($self, $last_update, $update_thru) = @_;
754 : olson 1.1
755 : olson 1.18 my $rel = [$self->{fig}->get_release_info()];
756 : olson 1.1
757 :     if (!defined($last_update))
758 :     {
759 :     $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});
760 :     }
761 : olson 1.17
762 :     print "Requesting update via $self->{proxy}\n";
763 : olson 1.20 my $reply = $self->{proxy}->request_update($rel, $last_update, $update_thru);
764 :     # print "Got reply ", Dumper($reply);
765 : olson 1.1
766 :     if ($self->{relay})
767 :     {
768 :     $reply = $self->{relay}->await_result($reply);
769 :     }
770 :    
771 :     if ($reply->fault)
772 :     {
773 :     print "request_update triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
774 :     return undef;
775 :     }
776 :    
777 :     return $reply->result;
778 :     }
779 :    
780 :     =pod
781 :    
782 :     =head1 get_pegs($session_id, $start, $length)
783 :    
784 :    
785 :     =cut
786 :    
787 :     sub get_pegs
788 :     {
789 :     my($self, $session_id, $start, $length) = @_;
790 :    
791 :     return $self->call("get_pegs", $session_id, $start, $length);
792 :     }
793 :    
794 : olson 1.6 sub finalize_pegs
795 :     {
796 :     my($self, $session_id, $request) = @_;
797 :    
798 :     return $self->call("finalize_pegs", $session_id, $request);
799 :     }
800 :    
801 : olson 1.15 sub get_annotations
802 :     {
803 :     my($self, $session_id, $start, $length) = @_;
804 :    
805 :     return $self->call("get_annotations", $session_id, $start, $length);
806 :     }
807 :    
808 : olson 1.19 sub get_assignments
809 :     {
810 :     my($self, $session_id, $start, $length) = @_;
811 :    
812 :     return $self->call("get_assignments", $session_id, $start, $length);
813 :     }
814 :    
815 : olson 1.1 sub call
816 :     {
817 :     my($self, $func, @args) = @_;
818 : olson 1.15
819 :     my $t0 = [gettimeofday()];
820 :     print "Calling $func\n";
821 : olson 1.1 my $reply = $self->{proxy}->$func(@args);
822 : olson 1.15 my $t1 = [gettimeofday()];
823 :    
824 :     my $elap = tv_interval($t0, $t1);
825 :     print "Call to $func took $elap\n";
826 : olson 1.1
827 :     if ($self->{relay})
828 :     {
829 :     $reply = $self->{relay}->await_result($reply);
830 :     }
831 :    
832 :     if ($reply->fault)
833 :     {
834 :     print "$func triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
835 :     return undef;
836 :     }
837 :    
838 :     return $reply->result;
839 :     }
840 :    
841 :    
842 :     #############
843 :     #
844 :     # P2P Service
845 :     #
846 :     # Code in this module is invoked on the target on behalf of a requestor.
847 :     #
848 :     #############
849 :    
850 :     package P2P::Service;
851 :    
852 :     use Data::Dumper;
853 :    
854 :     use FIG;
855 :     use FIG_Config;
856 :     use strict;
857 :    
858 :     use File::Temp qw(tempdir);
859 :     use File::Basename;
860 :    
861 :     sub request_update
862 :     {
863 : olson 1.20 my($class, $his_release, $last_update, $update_thru)= @_;
864 : olson 1.1
865 :     #
866 :     # Verify input.
867 :     #
868 :    
869 :     if ($last_update !~ /^\d+$/)
870 :     {
871 :     die "request_update: last_update must be a number (not '$last_update')\n";
872 :     }
873 :    
874 : olson 1.20 if ($update_thru eq "")
875 :     {
876 :     $update_thru = time + 10000;
877 :     }
878 :    
879 : olson 1.1 #
880 :     # Create a new session id and a spool directory to use for storage
881 :     # of information about it. This can go in the tempdir since it is
882 :     # not persistent.
883 :     #
884 :    
885 :     &FIG::verify_dir("$FIG_Config::temp/p2p_spool");
886 :     #my $spool_dir = tempdir(DIR => "$FIG_Config::temp/p2p_spool");
887 :    
888 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/test";
889 :     &FIG::verify_dir($spool_dir);
890 :    
891 :     my $session_id = basename($spool_dir);
892 :     my $now = time;
893 :    
894 :     #
895 :     # Gather the list of pegs and annotations for the update.
896 :     #
897 :    
898 :     my $fig = new FIG;
899 :    
900 :     my $all_genomes = [$fig->genomes];
901 :    
902 :     my %all_genomes = map { $_ => 1 } @$all_genomes;
903 :    
904 :     my %pegs;
905 : olson 1.15
906 :     #
907 :     # We keep track of usernames that have been seen, so that
908 :     # we can both update our local user database and
909 :     # we can report them to our peer.
910 :     #
911 :    
912 :     my %users;
913 : olson 1.1
914 :     my $num_annos = 0;
915 :     my $num_genomes = 0;
916 :     my $num_pegs = 0;
917 : olson 1.15 my $num_assignments = 0;
918 : olson 1.1
919 :     my $anno_fh;
920 :     open($anno_fh, ">$spool_dir/annos");
921 :    
922 :     my $peg_fh;
923 :     open($peg_fh, ">$spool_dir/pegs");
924 :    
925 :     my $genome_fh;
926 :     open($genome_fh, ">$spool_dir/genomes");
927 :    
928 : olson 1.15 my $assign_fh;
929 :     open($assign_fh, ">$spool_dir/assignments");
930 :    
931 : olson 1.1 for my $genome (@$all_genomes)
932 :     {
933 :     my $num_annos_for_genome = 0;
934 : olson 1.15 my %assignment;
935 : olson 1.1
936 :     my $genome_dir = "$FIG_Config::organisms/$genome";
937 :     next unless -d $genome_dir;
938 :    
939 :     my $afh;
940 :     if (open($afh, "$genome_dir/annotations"))
941 :     {
942 :     my($fid, $anno_time, $who, $anno_text);
943 :     local($/);
944 :     $/ = "//\n";
945 :     while (my $ann = <$afh>)
946 :     {
947 :     chomp $ann;
948 :    
949 :     if ((($fid, $anno_time, $who, $anno_text) =
950 :     ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and
951 : olson 1.20 $anno_time > $last_update and
952 :     $anno_time < $update_thru)
953 : olson 1.1
954 :     {
955 :     #
956 : olson 1.15 # Update users list.
957 :     #
958 :    
959 :     $users{$who}++;
960 :    
961 :     #
962 : olson 1.1 # Look up aliases if we haven't seen this fid before.
963 :     #
964 :    
965 :     if (!defined($pegs{$fid}))
966 :     {
967 :     my @aliases = $fig->feature_aliases($fid);
968 :    
969 :     print $peg_fh join("\t", $fid, $genome, @aliases), "\n";
970 :     $num_pegs++;
971 :     }
972 :    
973 :     print $anno_fh "$ann//\n";
974 :    
975 :     $pegs{$fid}++;
976 :    
977 :     $num_annos_for_genome++;
978 :     $num_annos++;
979 : olson 1.15
980 :     #
981 :     # While we're here, see if this is an assignment. We check in the
982 :     # %assignment hash, which is keyed on fid, to see if we already
983 :     # saw an assignment for this fid. If we have, we keep this one only if
984 :     # the assignment time on it is later than the one we saw already.
985 :     #
986 :     # We are only looking at master assignments for now. We will need
987 :     # to return to this issue and reexamine it, but in order to move
988 :     # forward I am only matching master assignments.
989 :     #
990 :    
991 :     if ($anno_text =~ /Set master function to\n(\S[^\n]+\S)/)
992 :     {
993 :     my $func = $1;
994 :    
995 :     my $other = $assignment{$fid};
996 :    
997 :     #
998 :     # If we haven't seen an assignment for this fid,
999 :     # or if it the other assignment has a timestamp that
1000 :     # is earlier than this one, set the assignment.
1001 :     #
1002 :    
1003 :     if (!defined($other) or
1004 :     ($other->[1] < $anno_time))
1005 :     {
1006 :     $assignment{$fid} = [$fid, $anno_time, $who, $func];
1007 :     }
1008 :     }
1009 : olson 1.1 }
1010 :     }
1011 :     close($afh);
1012 : olson 1.15
1013 :     #
1014 :     # Write out the assignments that remain.
1015 :     #
1016 :    
1017 :     for my $fid (sort keys(%assignment))
1018 :     {
1019 :     print $assign_fh join("\t", @{$assignment{$fid}}), "\n";
1020 :     $num_assignments++;
1021 :     }
1022 : olson 1.1 }
1023 : olson 1.15
1024 : olson 1.1
1025 :     #
1026 :     # Determine genome information if we have annotations for this one.
1027 :     #
1028 :    
1029 :     if ($num_annos_for_genome > 0)
1030 :     {
1031 :     $num_genomes++;
1032 :     if (open(my $cfh, "<$genome_dir/COUNTS"))
1033 :     {
1034 :     if ($_ = <$cfh>)
1035 :     {
1036 :     chomp;
1037 :     my($cgenome, $n_contigs, $total_nucs, $cksum) = split(/\t/, $_);
1038 :     if ($cgenome ne $genome)
1039 :     {
1040 :     warn "Hm, $genome has a COUNTS file with genome=$cgenome that does not match\n";
1041 :     }
1042 :     else
1043 :     {
1044 :     print $genome_fh join("\t",
1045 :     $genome, $num_annos_for_genome, $n_contigs,
1046 :     $total_nucs, $cksum), "\n";
1047 :     }
1048 :     }
1049 :     }
1050 :     }
1051 :    
1052 :     }
1053 :     close($anno_fh);
1054 :     close($peg_fh);
1055 :     close($genome_fh);
1056 : olson 1.15 close($assign_fh);
1057 : olson 1.1
1058 :     print "Pegs: $num_pegs\n";
1059 :     print "Genomes: $num_genomes\n";
1060 :     print "Annos: $num_annos\n";
1061 :    
1062 :     #
1063 :     # Check compatibility.
1064 :     #
1065 :    
1066 : olson 1.18 my $my_release = [$fig->get_release_info()];
1067 :    
1068 :     #
1069 :     # Release id is $my_release->[1].
1070 :     #
1071 :    
1072 :     my $compatible;
1073 :     if ($my_release->[1] ne "" and $his_release->[1] ne "")
1074 :     {
1075 :     #
1076 :     # Both releases must be defined for them to be compatible.
1077 :     #
1078 :     # At some point we need to consider the derived-release issue.
1079 :     #
1080 :    
1081 :     $compatible = $my_release->[1] eq $his_release->[1];
1082 :     }
1083 :     else
1084 :     {
1085 :     $compatible = 0;
1086 :     }
1087 : olson 1.1
1088 :     open(my $fh, ">$spool_dir/INFO");
1089 :     print $fh "requestor_release\t$his_release\n";
1090 :     print $fh "last_update\t$last_update\n";
1091 : olson 1.20 print $fh "update_thru\t$update_thru\n";
1092 : olson 1.1 print $fh "cur_update\t$now\n";
1093 :     print $fh "target_release\t$my_release\n";
1094 :     print $fh "compatible\t$compatible\n";
1095 :     print $fh "num_pegs\t$num_pegs\n";
1096 :     print $fh "num_genomes\t$num_genomes\n";
1097 :     print $fh "num_annos\t$num_annos\n";
1098 : olson 1.15 print $fh "num_assignments\t$num_assignments\n";
1099 : olson 1.1 close($fh);
1100 :    
1101 : olson 1.15 #
1102 :     # Construct list of users, and pdate local user database.
1103 :     #
1104 :    
1105 :     my @users = keys(%users);
1106 : olson 1.17 # $fig->ensure_users(\@users);
1107 : olson 1.15
1108 :     return [$session_id, $my_release, $num_assignments, $num_annos, $num_pegs, $num_genomes,
1109 : olson 1.16 $now, $compatible, \@users];
1110 : olson 1.1 }
1111 :    
1112 :    
1113 :     sub get_pegs
1114 :     {
1115 :     my($self, $session_id, $start, $len) = @_;
1116 :     my(%session_info);
1117 :    
1118 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1119 :    
1120 :     -d $spool_dir or die "Invalid session id $session_id";
1121 :    
1122 :     #
1123 :     # Read in the cached information for this session.
1124 :     #
1125 :    
1126 :     open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1127 :     while (<$info_fh>)
1128 :     {
1129 :     chomp;
1130 :     my($var, $val) = split(/\t/, $_, 2);
1131 :     $session_info{$var} = $val;
1132 :     }
1133 :     close($info_fh);
1134 :    
1135 :     #
1136 :     # Sanity check start and length.
1137 :     #
1138 :    
1139 :     if ($start < 0 or $start >= $session_info{num_pegs})
1140 :     {
1141 :     die "Invalid start position $start";
1142 :     }
1143 :    
1144 :     if ($len < 0 or ($start + $len - 1) >= $session_info{num_pegs})
1145 :     {
1146 :     die "Invalid length $len";
1147 :     }
1148 :    
1149 :     #
1150 :     # Open file, spin to the starting line, then start reading.
1151 :     #
1152 :    
1153 :     open(my $peg_fh, "<$spool_dir/pegs") or die "Cannot open pegs file: $!";
1154 :    
1155 :     my $peg_output = [];
1156 :     my $genome_output = [];
1157 :    
1158 :     my $peg_num = 0;
1159 :     my $genomes_to_show = [];
1160 :     my %genomes_to_show;
1161 :    
1162 :     my($fid, $genome, @aliases);
1163 :    
1164 :     while (<$peg_fh>)
1165 :     {
1166 :     next if ($peg_num < $start);
1167 :    
1168 :     last if ($peg_num > ($start + $len));
1169 :    
1170 :     chomp;
1171 :    
1172 :     #
1173 :     # OK, this is a peg to process.
1174 :     # It's easy if we're compatible.
1175 :     #
1176 :    
1177 :     ($fid, $genome, @aliases) = split(/\t/, $_);
1178 :    
1179 :     if ($session_info{compatible})
1180 :     {
1181 :     push(@$peg_output, ['peg', $fid]);
1182 :     }
1183 :     else
1184 :     {
1185 :     if (!$genomes_to_show{$genome})
1186 :     {
1187 :     push(@$genomes_to_show, $genome);
1188 :     $genomes_to_show{$genome}++;
1189 :     }
1190 :     push(@$peg_output, ['peg_info', $fid, [@aliases], $genome]);
1191 :     }
1192 :     }
1193 :     continue
1194 :     {
1195 :     $peg_num++;
1196 :     }
1197 :    
1198 :     #
1199 :     # Read the genomes file, returning information about genomes referenced
1200 :     # in the pegs returned.
1201 :     #
1202 :    
1203 :     my $n_left = @$genomes_to_show;
1204 :    
1205 :     open(my $gfh, "<$spool_dir/genomes") or die "Cannot open genomes file: $!";
1206 :     while ($n_left > 0 and $_ = <$gfh>)
1207 :     {
1208 :     chomp;
1209 :    
1210 :     my($genome, $n_annos, $n_contigs, $n_nucs, $cksum) = split(/\t/);
1211 :    
1212 :     if ($genomes_to_show{$genome})
1213 :     {
1214 :     push(@$genome_output, [$genome, $n_contigs, $n_nucs, $cksum]);
1215 :     $n_left--;
1216 :     }
1217 :     }
1218 :     close($gfh);
1219 :    
1220 :     return [$peg_output, $genome_output];
1221 :     }
1222 : olson 1.6
1223 :     sub finalize_pegs
1224 :     {
1225 :     my($self, $session, $request) = @_;
1226 :     my($out);
1227 :    
1228 :     my $fig = new FIG;
1229 :    
1230 :     #
1231 :     # Walk the request handling appropriately. This is fairly easy, as it
1232 :     # is just a matter of pulling either sequence or location/contig data.
1233 :     #
1234 :    
1235 :     for my $item (@$request)
1236 :     {
1237 :     my($what, $peg) = @$item;
1238 :    
1239 :     if ($what eq "peg_genome")
1240 :     {
1241 :     #
1242 :     # Return the location and contig checksum for this peg.
1243 :     #
1244 : olson 1.13 # We also include the sequence in case the contig mapping doesn't work.
1245 :     #
1246 : olson 1.6
1247 :     my $loc = $fig->feature_location($peg);
1248 :     my $contig = $fig->contig_of($loc);
1249 : olson 1.7 my $cksum = $fig->contig_checksum($fig->genome_of($peg), $contig);
1250 : olson 1.13 my $seq = $fig->get_translation($peg);
1251 : olson 1.6
1252 :     push(@$out, ['peg_loc', $peg,
1253 : olson 1.13 $fig->strand_of($peg),
1254 : olson 1.6 $fig->beg_of($loc), $fig->end_of($loc),
1255 : olson 1.13 $cksum, $seq]);
1256 : olson 1.6
1257 :     }
1258 : olson 1.7 elsif ($what eq "peg_unknown")
1259 : olson 1.6 {
1260 :     my $seq = $fig->get_translation($peg);
1261 :     push(@$out, ['peg_seq', $peg, $seq]);
1262 :     }
1263 :     }
1264 :     return $out;
1265 :     }
1266 :    
1267 : olson 1.15
1268 :     sub get_annotations
1269 :     {
1270 :     my($self, $session_id, $start, $len) = @_;
1271 :    
1272 :     #
1273 :     # This is now easy; just run thru the saved annotations and return.
1274 :     #
1275 :    
1276 :     my(%session_info);
1277 :    
1278 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1279 :    
1280 :     -d $spool_dir or die "Invalid session id $session_id";
1281 :    
1282 :     #
1283 :     # Read in the cached information for this session.
1284 :     #
1285 :    
1286 :     open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1287 :     while (<$info_fh>)
1288 :     {
1289 :     chomp;
1290 :     my($var, $val) = split(/\t/, $_, 2);
1291 :     $session_info{$var} = $val;
1292 :     }
1293 :     close($info_fh);
1294 :    
1295 :     #
1296 :     # Sanity check start and length.
1297 :     #
1298 :    
1299 :     if ($start < 0 or $start >= $session_info{num_annos})
1300 :     {
1301 :     die "Invalid start position $start";
1302 :     }
1303 :    
1304 :     if ($len < 0 or ($start + $len - 1) >= $session_info{num_annos})
1305 :     {
1306 :     die "Invalid length $len";
1307 :     }
1308 :    
1309 :     #
1310 :     # Open file, spin to the starting line, then start reading.
1311 :     #
1312 :    
1313 :     open(my $anno_fh, "<$spool_dir/annos") or die "Cannot open annos file: $!";
1314 :    
1315 :     my $anno_output = [];
1316 :    
1317 :     my $anno_num = 0;
1318 :    
1319 :     local $/ = "//\n";
1320 :     while (<$anno_fh>)
1321 :     {
1322 :     next if ($anno_num < $start);
1323 :    
1324 :     last if ($anno_num > ($start + $len));
1325 :    
1326 :     chomp;
1327 :    
1328 :     my($id, $date, $author, $anno) = split(/\n/, $_, 4);
1329 :    
1330 :     push(@$anno_output, [$id, $date, $author, $anno]);
1331 :     }
1332 :     continue
1333 :     {
1334 :     $anno_num++;
1335 :     }
1336 :    
1337 :     return $anno_output;
1338 :     }
1339 : olson 1.19
1340 :     sub get_assignments
1341 :     {
1342 :     my($self, $session_id, $start, $len) = @_;
1343 :    
1344 :     #
1345 :     # This is now easy; just run thru the saved assignments and return.
1346 :     #
1347 :    
1348 :     my(%session_info);
1349 :    
1350 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
1351 :    
1352 :     -d $spool_dir or die "Invalid session id $session_id";
1353 :    
1354 :     #
1355 :     # Read in the cached information for this session.
1356 :     #
1357 :    
1358 :     open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
1359 :     while (<$info_fh>)
1360 :     {
1361 :     chomp;
1362 :     my($var, $val) = split(/\t/, $_, 2);
1363 :     $session_info{$var} = $val;
1364 :     }
1365 :     close($info_fh);
1366 :    
1367 :     #
1368 :     # Sanity check start and length.
1369 :     #
1370 :    
1371 :     if ($start < 0 or $start >= $session_info{num_assignments})
1372 :     {
1373 :     die "Invalid start position $start";
1374 :     }
1375 :    
1376 :     if ($len < 0 or ($start + $len - 1) >= $session_info{num_assignments})
1377 :     {
1378 :     die "Invalid length $len";
1379 :     }
1380 :    
1381 :     #
1382 :     # Open file, spin to the starting line, then start reading.
1383 :     #
1384 :    
1385 :     open(my $assign_fh, "<$spool_dir/assignments") or die "Cannot open assignments file: $!";
1386 :    
1387 :     my $assign_output = [];
1388 :    
1389 :     my $assign_num = 0;
1390 :    
1391 :     while (<$assign_fh>)
1392 :     {
1393 :     next if ($assign_num < $start);
1394 :    
1395 :     last if ($assign_num > ($start + $len));
1396 :    
1397 :     chomp;
1398 :    
1399 :     my($id, $date, $author, $func) = split(/\t/, $_, 4);
1400 :    
1401 :     push(@$assign_output, [$id, $date, $author, $func]);
1402 :     }
1403 :     continue
1404 :     {
1405 :     $assign_num++;
1406 :     }
1407 :    
1408 :     return $assign_output;
1409 :     }
1410 :    

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3