[Bio] / FigKernelPackages / P2P.pm Repository:
ViewVC logotype

Annotation of /FigKernelPackages/P2P.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (view) (download) (as text)

1 : olson 1.1 #
2 :     # This module contains the code for the P2P update protocol.
3 :     #
4 :     # Package P2P contains the namespace declarations, and possibly toplevel utility
5 :     # routines. (get_relay ?)
6 :     #
7 :     # Package P2P::Relay contains methods for contacting the P2P relay service. The actual
8 :     # implementation of the relay service is not contained here - it is a standalone module
9 :     # that can be installed on a web server that does not have a full SEED installed.
10 :     #
11 :     # Package P2P::Requestor contains the requestor-side code for the update protocol.
12 :     #
13 :     # Package P2P::Service contains the web service implementation routines for the
14 :     # protocol.
15 :     #
16 :    
17 :     package P2P;
18 :    
19 :     use FIG_Config;
20 :    
21 :     use strict;
22 :     use Exporter;
23 :     use base qw(Exporter);
24 :    
25 :     use Data::Dumper;
26 :    
27 :     use vars qw(@EXPORT @EXPORT_OK);
28 :     @EXPORT = ();
29 :     @EXPORT_OK = qw($ns_p2p $ns_relay);
30 :    
31 :     our $ns_p2p = "http://thefig.info/schemas/p2p_update";
32 :     our $ns_relay = "http://thefig.info/schemas/p2p_relay";
33 :    
34 :     =pod
35 :    
36 :     =head1 perform_update($peer)
37 :    
38 :     Perform a peer-to-peer update with the given peer. $peer is an instance of
39 :     P2P::Requestor which can connect to the peer. It is expected that the
40 :     SEED infrastructure will create this requestor appropriately for the
41 :     particular circumstance (direct connection, thru relay, etc).
42 :    
43 :     This code executes the high-level protocol, maintaining state between
44 :     calls to the peer to exchange the actual information.
45 :    
46 :     =cut
47 :    
48 :     sub perform_update
49 :     {
50 :     my($fig, $peer, $last_update) = @_;
51 :    
52 :     my $ret = $peer->request_update($last_update);
53 :    
54 :     if (!$ret or ref($ret) ne "ARRAY")
55 :     {
56 :     die "perform_update: request_updated failed\n";
57 :     }
58 :    
59 :     my($session, $target_release, $num_annos, $num_pegs, $num_genomes,
60 :     $target_time, $compatible) = @$ret;
61 :    
62 :     print "perform_update: session=$session target=$target_release num_annos=$num_annos\n";
63 :     print " num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";
64 :    
65 :     #
66 :     # We have the information now to begin the update process. Retrieve the pegs.
67 :     #
68 :    
69 :     $ret = $peer->get_pegs($session, 0, $num_pegs);
70 :    
71 :     if (!$ret or ref($ret) ne "ARRAY")
72 :     {
73 :     die "perform_update: get_pegs failed\n";
74 :     }
75 :    
76 :     my($peg_list, $genome_list) = @$ret;
77 :    
78 :     #
79 :     # Walk the peg-list to and generate @pegs_to_finalize.
80 :     #
81 :    
82 :     my(%peg_mapping, %genome_map );
83 :    
84 :     for my $peg_info (@$peg_list)
85 :     {
86 :     my($key, $peg, @rest) = @$peg_info;
87 :    
88 :     if ($key eq 'peg')
89 :     {
90 :     #
91 :     # Peg id is directly usable.
92 :     #
93 :     }
94 :     elsif ($key eq 'peg_info')
95 :     {
96 :     #
97 :     # Peg id not directly usable.
98 :     #
99 :    
100 :     my($alias_list, $genome_id) = @rest;
101 :    
102 :     for my $alias (@$alias_list)
103 :     {
104 :     my $mapped = $fig->by_alias($alias);
105 :     if ($mapped && $peg !~ /5$/)
106 :     {
107 :     print "$peg maps to $mapped via $alias\n";
108 :     $peg_mapping{$peg}= $mapped;
109 :     last;
110 :     }
111 :     }
112 :    
113 :     #
114 :     # If we didn't succeed in mapping by alias,
115 :     # stash this in the list of pegs to be mapped by
116 :     # genome.
117 :     #
118 :    
119 :     if (!defined($peg_mapping{$peg}))
120 :     {
121 :     push(@{$genome_map{$genome_id}}, $peg);
122 :     }
123 :     }
124 :     }
125 :    
126 :     #
127 :     # finished first pass. Now go over the per-genome mappings that need to be made.
128 :     #
129 :    
130 :     for my $genome_info (@$genome_list)
131 :     {
132 :     my($genome, $n_contigs, $n_nucs, $cksum) = @$genome_info;
133 :    
134 :     next unless $genome_map{$genome};
135 :    
136 :     my $my_genome = $fig->find_genome_by_content($genome, $n_contigs, $n_nucs, $cksum);
137 :    
138 :     if ($my_genome)
139 :     {
140 :     #
141 :     # Found a match.
142 :     #
143 :     print "Genome $genome maps to $my_genome locally\n";
144 :    
145 :     }
146 : olson 1.2 else
147 :     {
148 :     print "No mapping for $genome\n";
149 :     }
150 : olson 1.1 }
151 :     }
152 :    
153 :    
154 :     #############
155 :     #
156 :     # P2P Relay
157 :     #
158 :     #############
159 :    
160 :    
161 :     package P2P::Relay;
162 :     use strict;
163 :    
164 :     use Data::Dumper;
165 :     use SOAP::Lite;
166 :    
167 :     use P2P;
168 :    
169 :     sub new
170 :     {
171 :     my($class, $url) = @_;
172 :    
173 :     my $proxy = SOAP::Lite->uri($P2P::ns_relay)->proxy($url);
174 :    
175 :     my $self = {
176 :     url => $url,
177 :     proxy => $proxy,
178 :     };
179 :     return bless($self, $class);
180 :     }
181 :    
182 :     sub enumerate_annotation_systems
183 :     {
184 :     my($self) = @_;
185 :    
186 :     return $self->{proxy}->enumerate_annotation_systems()->result;
187 :     }
188 :    
189 :     sub fetch_queries
190 :     {
191 :     my($self, $id) = @_;
192 :    
193 :     my $reply = $self->{proxy}->fetch_queries($id);
194 :    
195 :     if ($reply->fault)
196 :     {
197 :     print "Failed to fetch queries: ", $reply->faultcode, " ", $reply->faultstring, "\n";
198 :     return undef;
199 :     }
200 :    
201 :     return $reply->result;
202 :     }
203 :    
204 :     sub deposit_answer
205 :     {
206 :     my($self, $id, $key, $answer) = @_;
207 :    
208 :     my $reply = $self->{proxy}->deposit_answer($id, $key,
209 :     SOAP::Data->type('base64')->value($answer));
210 :    
211 :     if ($reply->fault)
212 :     {
213 :     print "deposit_answer got fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
214 :     return undef;
215 :     }
216 :    
217 :     return $reply;
218 :     }
219 :    
220 :     =pod
221 :    
222 :     =head1 await_result
223 :    
224 :     Await the result from a possibly-asynchronous soap request.
225 :    
226 :     Look at the reply that we have. If it's a deferred reply, loop polling
227 :     the relay for the actual result.
228 :    
229 :     We determine if the reply is a deferred reply by examining the namespace
230 :     URI of the response. A response will be generated from the relay's namespace,
231 :     rather than that of the application itself.
232 :    
233 :     =cut
234 :    
235 :     sub await_result
236 :     {
237 :     my($self, $reply) = @_;
238 :    
239 :     while (1)
240 :     {
241 :     #
242 :     # Retrieve the namespace of the response, which is the first
243 :     # element in the body of the message.
244 :     #
245 :     my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');
246 :     print "Reply ns=$ns want $P2P::ns_relay\n";
247 :    
248 :     if ($ns eq $P2P::ns_relay)
249 :     {
250 :     my $val = $reply->result;
251 :     print "got val=", Dumper($val);
252 :     if ($val->[0] eq 'deferred')
253 :     {
254 :     #
255 :     # Sleep a little, then try to retrieve the response.
256 :     #
257 :    
258 :     sleep(1);
259 :     my $id = $val->[1];
260 :    
261 :     print "Retrieving reply\n";
262 :     $reply = $self->{proxy}->call_completed($id);
263 :     }
264 :     else
265 :     {
266 :     #
267 :     # We're not sure what to do here..
268 :     #
269 :     return undef;
270 :     }
271 :     }
272 :     else
273 :     {
274 :     #
275 :     # We got an actual response. Return it.
276 :     #
277 :    
278 :     return $reply;
279 :     }
280 :     }
281 :     }
282 :    
283 :     #############
284 :     #
285 :     # P2P Requestor
286 :     #
287 :     #############
288 :    
289 :     package P2P::Requestor;
290 :     use strict;
291 :    
292 :     use Data::Dumper;
293 :    
294 :     use SOAP::Lite;
295 :     use P2P;
296 :    
297 :     #
298 :     # Create a new Requestor. It contains a reference to the FIG instance
299 :     # so that we can run the protocol completely from in here.
300 :     #
301 :    
302 :     sub new
303 :     {
304 :     my($class, $fig, $url, $peer_id, $relay) = @_;
305 :    
306 :     my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url);
307 :    
308 :     my $self = {
309 :     fig => $fig,
310 :     url => $url,
311 :     peer_id => $peer_id,
312 :     proxy => $proxy,
313 :     relay => $relay,
314 :     };
315 :     return bless($self, $class);
316 :     }
317 :    
318 :     #
319 :     # First step: Request an update.
320 :     #
321 :     # We need to determine some notion of what our release is, since we are not
322 :     # currently tagging them explicitly. Until we delve into this more,
323 :     # I am going to return a null release, which means the same-release
324 :     # optimization won't be able to kick in.
325 :     #
326 :     # We also need to determine the last time we got an update from this
327 :     # system.
328 :     #
329 :    
330 :     sub request_update
331 :     {
332 :     my($self, $last_update) = @_;
333 :    
334 :     my $rel = $self->{fig}->get_release_info();
335 :    
336 :     if (!defined($last_update))
337 :     {
338 :     $last_update = $self->{fig}->get_peer_last_update($self->{peer_id});
339 :     }
340 :    
341 :     my $reply = $self->{proxy}->request_update($rel, $last_update);
342 :    
343 :     if ($self->{relay})
344 :     {
345 :     $reply = $self->{relay}->await_result($reply);
346 :     }
347 :    
348 :     if ($reply->fault)
349 :     {
350 :     print "request_update triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
351 :     return undef;
352 :     }
353 :    
354 :     return $reply->result;
355 :     }
356 :    
357 :     =pod
358 :    
359 :     =head1 get_pegs($session_id, $start, $length)
360 :    
361 :    
362 :     =cut
363 :    
364 :     sub get_pegs
365 :     {
366 :     my($self, $session_id, $start, $length) = @_;
367 :    
368 :     return $self->call("get_pegs", $session_id, $start, $length);
369 :     }
370 :    
371 :     sub call
372 :     {
373 :     my($self, $func, @args) = @_;
374 :    
375 :     my $reply = $self->{proxy}->$func(@args);
376 :    
377 :     if ($self->{relay})
378 :     {
379 :     $reply = $self->{relay}->await_result($reply);
380 :     }
381 :    
382 :     if ($reply->fault)
383 :     {
384 :     print "$func triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
385 :     return undef;
386 :     }
387 :    
388 :     return $reply->result;
389 :     }
390 :    
391 :    
392 :     #############
393 :     #
394 :     # P2P Service
395 :     #
396 :     # Code in this module is invoked on the target on behalf of a requestor.
397 :     #
398 :     #############
399 :    
400 :     package P2P::Service;
401 :    
402 :     use Data::Dumper;
403 :    
404 :     use FIG;
405 :     use FIG_Config;
406 :     use strict;
407 :    
408 :     use File::Temp qw(tempdir);
409 :     use File::Basename;
410 :    
411 :     sub request_update
412 :     {
413 :     my($class, $his_release, $last_update)= @_;
414 :    
415 :     #
416 :     # Verify input.
417 :     #
418 :    
419 :     if ($last_update !~ /^\d+$/)
420 :     {
421 :     die "request_update: last_update must be a number (not '$last_update')\n";
422 :     }
423 :    
424 :     #
425 :     # Create a new session id and a spool directory to use for storage
426 :     # of information about it. This can go in the tempdir since it is
427 :     # not persistent.
428 :     #
429 :    
430 :     &FIG::verify_dir("$FIG_Config::temp/p2p_spool");
431 :     #my $spool_dir = tempdir(DIR => "$FIG_Config::temp/p2p_spool");
432 :    
433 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/test";
434 :     &FIG::verify_dir($spool_dir);
435 :    
436 :     my $session_id = basename($spool_dir);
437 :     my $now = time;
438 :    
439 :     #
440 :     # Gather the list of pegs and annotations for the update.
441 :     #
442 :    
443 :     my $fig = new FIG;
444 :    
445 :     my $all_genomes = [$fig->genomes];
446 :    
447 :     my %all_genomes = map { $_ => 1 } @$all_genomes;
448 :    
449 :     my %pegs;
450 :    
451 :     my $num_annos = 0;
452 :     my $num_genomes = 0;
453 :     my $num_pegs = 0;
454 :    
455 :     my $anno_fh;
456 :     open($anno_fh, ">$spool_dir/annos");
457 :    
458 :     my $peg_fh;
459 :     open($peg_fh, ">$spool_dir/pegs");
460 :    
461 :     my $genome_fh;
462 :     open($genome_fh, ">$spool_dir/genomes");
463 :    
464 :     for my $genome (@$all_genomes)
465 :     {
466 :     my $num_annos_for_genome = 0;
467 :    
468 :     my $genome_dir = "$FIG_Config::organisms/$genome";
469 :     next unless -d $genome_dir;
470 :    
471 :     my $afh;
472 :     if (open($afh, "$genome_dir/annotations"))
473 :     {
474 :     my($fid, $anno_time, $who, $anno_text);
475 :     local($/);
476 :     $/ = "//\n";
477 :     while (my $ann = <$afh>)
478 :     {
479 :     chomp $ann;
480 :    
481 :     if ((($fid, $anno_time, $who, $anno_text) =
482 :     ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and
483 :     $anno_time > $last_update)
484 :    
485 :     {
486 :     #
487 :     # Look up aliases if we haven't seen this fid before.
488 :     #
489 :    
490 :     if (!defined($pegs{$fid}))
491 :     {
492 :     my @aliases = $fig->feature_aliases($fid);
493 :    
494 :     print $peg_fh join("\t", $fid, $genome, @aliases), "\n";
495 :     $num_pegs++;
496 :     }
497 :    
498 :     print $anno_fh "$ann//\n";
499 :    
500 :     $pegs{$fid}++;
501 :    
502 :     $num_annos_for_genome++;
503 :     $num_annos++;
504 :     }
505 :     }
506 :     close($afh);
507 :     }
508 :    
509 :     #
510 :     # Determine genome information if we have annotations for this one.
511 :     #
512 :    
513 :     if ($num_annos_for_genome > 0)
514 :     {
515 :     $num_genomes++;
516 :     if (open(my $cfh, "<$genome_dir/COUNTS"))
517 :     {
518 :     if ($_ = <$cfh>)
519 :     {
520 :     chomp;
521 :     my($cgenome, $n_contigs, $total_nucs, $cksum) = split(/\t/, $_);
522 :     if ($cgenome ne $genome)
523 :     {
524 :     warn "Hm, $genome has a COUNTS file with genome=$cgenome that does not match\n";
525 :     }
526 :     else
527 :     {
528 :     print $genome_fh join("\t",
529 :     $genome, $num_annos_for_genome, $n_contigs,
530 :     $total_nucs, $cksum), "\n";
531 :     }
532 :     }
533 :     }
534 :     }
535 :    
536 :     }
537 :     close($anno_fh);
538 :     close($peg_fh);
539 :     close($genome_fh);
540 :    
541 :     print "Pegs: $num_pegs\n";
542 :     print "Genomes: $num_genomes\n";
543 :     print "Annos: $num_annos\n";
544 :    
545 :     #
546 :     # Check compatibility.
547 :     #
548 :    
549 :     my $my_release = $fig->get_release_info();
550 :     my $compatible = (defined($my_release) && ($my_release == $his_release)) ? 1 : 0;
551 :    
552 :     open(my $fh, ">$spool_dir/INFO");
553 :     print $fh "requestor_release\t$his_release\n";
554 :     print $fh "last_update\t$last_update\n";
555 :     print $fh "cur_update\t$now\n";
556 :     print $fh "target_release\t$my_release\n";
557 :     print $fh "compatible\t$compatible\n";
558 :     print $fh "num_pegs\t$num_pegs\n";
559 :     print $fh "num_genomes\t$num_genomes\n";
560 :     print $fh "num_annos\t$num_annos\n";
561 :     close($fh);
562 :    
563 :     return [$session_id, $my_release, $num_annos, $num_pegs, $num_genomes, $now, $compatible];
564 :     }
565 :    
566 :    
567 :     sub get_pegs
568 :     {
569 :     my($self, $session_id, $start, $len) = @_;
570 :     my(%session_info);
571 :    
572 :     my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";
573 :    
574 :     -d $spool_dir or die "Invalid session id $session_id";
575 :    
576 :     #
577 :     # Read in the cached information for this session.
578 :     #
579 :    
580 :     open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
581 :     while (<$info_fh>)
582 :     {
583 :     chomp;
584 :     my($var, $val) = split(/\t/, $_, 2);
585 :     $session_info{$var} = $val;
586 :     }
587 :     close($info_fh);
588 :    
589 :     #
590 :     # Sanity check start and length.
591 :     #
592 :    
593 :     if ($start < 0 or $start >= $session_info{num_pegs})
594 :     {
595 :     die "Invalid start position $start";
596 :     }
597 :    
598 :     if ($len < 0 or ($start + $len - 1) >= $session_info{num_pegs})
599 :     {
600 :     die "Invalid length $len";
601 :     }
602 :    
603 :     #
604 :     # Open file, spin to the starting line, then start reading.
605 :     #
606 :    
607 :     open(my $peg_fh, "<$spool_dir/pegs") or die "Cannot open pegs file: $!";
608 :    
609 :     my $peg_output = [];
610 :     my $genome_output = [];
611 :    
612 :     my $peg_num = 0;
613 :     my $genomes_to_show = [];
614 :     my %genomes_to_show;
615 :    
616 :     my($fid, $genome, @aliases);
617 :    
618 :     while (<$peg_fh>)
619 :     {
620 :     next if ($peg_num < $start);
621 :    
622 :     last if ($peg_num > ($start + $len));
623 :    
624 :     chomp;
625 :    
626 :     #
627 :     # OK, this is a peg to process.
628 :     # It's easy if we're compatible.
629 :     #
630 :    
631 :     ($fid, $genome, @aliases) = split(/\t/, $_);
632 :    
633 :     if ($session_info{compatible})
634 :     {
635 :     push(@$peg_output, ['peg', $fid]);
636 :     }
637 :     else
638 :     {
639 :     if (!$genomes_to_show{$genome})
640 :     {
641 :     push(@$genomes_to_show, $genome);
642 :     $genomes_to_show{$genome}++;
643 :     }
644 :     push(@$peg_output, ['peg_info', $fid, [@aliases], $genome]);
645 :     }
646 :     }
647 :     continue
648 :     {
649 :     $peg_num++;
650 :     }
651 :    
652 :     #
653 :     # Read the genomes file, returning information about genomes referenced
654 :     # in the pegs returned.
655 :     #
656 :    
657 :     my $n_left = @$genomes_to_show;
658 :    
659 :     open(my $gfh, "<$spool_dir/genomes") or die "Cannot open genomes file: $!";
660 :     while ($n_left > 0 and $_ = <$gfh>)
661 :     {
662 :     chomp;
663 :    
664 :     my($genome, $n_annos, $n_contigs, $n_nucs, $cksum) = split(/\t/);
665 :    
666 :     if ($genomes_to_show{$genome})
667 :     {
668 :     push(@$genome_output, [$genome, $n_contigs, $n_nucs, $cksum]);
669 :     $n_left--;
670 :     }
671 :     }
672 :     close($gfh);
673 :    
674 :     return [$peg_output, $genome_output];
675 :     }

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3