[Bio] / FigKernelPackages / P2P.pm Repository:
ViewVC logotype

View of /FigKernelPackages/P2P.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.10 - (download) (as text) (annotate)
Fri Sep 24 19:36:27 2004 UTC (15 years, 4 months ago) by olson
Branch: MAIN
Changes since 1.9: +1 -1 lines
more testing

#
# This module contains the code for the P2P update protocol.
#
# Package P2P contains the namespace declarations, and possibly toplevel utility
# routines. (get_relay ?)
#
# Package P2P::Relay contains methods for contacting the P2P relay service. The actual
# implementation of the relay service is not contained here - it is a standalone module
# that can be installed on a web server that does not have a full SEED installed.
#
# Package P2P::Requestor contains the requestor-side code for the update protocol.
#
# Package P2P::Service contains the web service implementation routines for the
# protocol.
#

package P2P;

use FIG_Config;

use strict;
use Exporter;
use base qw(Exporter);

use Data::Dumper;

use vars qw(@EXPORT @EXPORT_OK);
@EXPORT = ();
@EXPORT_OK = qw($ns_p2p $ns_relay);

our $ns_p2p = "http://thefig.info/schemas/p2p_update";
our $ns_relay = "http://thefig.info/schemas/p2p_relay";

=pod

=head1 perform_update($peer)

Perform a peer-to-peer update with the given peer. $peer is an instance of
P2P::Requestor which can connect to the peer. It is expected that the
SEED infrastructure will create this requestor appropriately for the 
particular circumstance (direct connection, thru relay, etc).

This code executes the high-level protocol, maintaining state between
calls to the peer to exchange the actual information.

=cut

sub perform_update
{
    my($fig, $peer, $last_update) = @_;

    my $ret = $peer->request_update($last_update);

    if (!$ret or ref($ret) ne "ARRAY")
    {
	die "perform_update: request_updated failed\n";
    }

    my($session, $target_release, $num_annos, $num_pegs, $num_genomes,
       $target_time, $compatible) = @$ret;

    print "perform_update: session=$session target=$target_release num_annos=$num_annos\n";
    print "                num_pegs=$num_pegs num_genomes=$num_genomes target_time=$target_time compat=$compatible\n";

    #
    # We have  the information now to begin the update process. Retrieve the pegs.
    #

    $ret = $peer->get_pegs($session, 0, $num_pegs);

    if (!$ret or ref($ret) ne "ARRAY")
    {
	die "perform_update: get_pegs failed\n";
    }

    my($peg_list, $genome_list) = @$ret;

    #
    # Walk the peg-list to and generate @pegs_to_finalize.
    #

    my(%peg_mapping, %genome_map );

    for my $peg_info (@$peg_list)
    {
	my($key, $peg, @rest) = @$peg_info;

	if ($key eq 'peg')
	{
	    #
	    # Peg id is directly usable.
	    #
	    $peg_mapping{$peg} = $peg;
	}
	elsif ($key eq 'peg_info')
	{
	    #
	    # Peg id not directly usable.
	    #

	    my($alias_list, $genome_id) = @rest;

	    for my $alias (@$alias_list)
	    {
		my $mapped = $fig->by_alias($alias);
		if ($mapped)
		{
		    print "$peg maps to $mapped via $alias\n";
		    $peg_mapping{$peg}= $mapped;
		    last;
		}
	    }

	    #
	    # If we didn't succeed in mapping by alias,
	    # stash this in the list of pegs to be mapped by
	    # genome.
	    #

	    if (!defined($peg_mapping{$peg}))
	    {
		push(@{$genome_map{$genome_id}}, $peg);
		print "$peg did not map\n";
	    }
	}
    }

    #
    # finished first pass. Now go over the per-genome mappings that need to be made.
    #
    # $genome_map{$genome_id} is a list of pegs that reside on that genome.
    # the pegs and genome id are both target-based identifiers.
    #

    my @finalize_req = ();
    my %local_genome;

    for my $genome_info (@$genome_list)
    {
	my($genome, $n_contigs, $n_nucs, $cksum) = @$genome_info;
	
	next unless defined($genome_map{$genome});

	#
	# Determine if we have a local genome installed that matches precisely the
	# genome on the target side.
	#
	my $my_genome = $fig->find_genome_by_content($genome, $n_contigs, $n_nucs, $cksum);

	my $pegs = $genome_map{$genome};
	
	if ($my_genome)
	{
	    #
	    # We do have such a local genome. Generate a peg_genome request to
	    # get the location information from the target side.
	    #
	    # Also remember the local genome mapping for this peg.
	    #

	    print "$genome mapped to $my_genome\n";
	    for my $peg (@$pegs)
	    {
		push(@finalize_req, ['peg_genome', $peg]);
		$local_genome{$peg} = $my_genome;
	    }
	    
	}
	else
	{
	    #
	    # We don't have such a genome. We need to retrieve the
	    # sequence data in order to finish mapping.
	    #
	    push(@finalize_req, map { ['peg_unknown', $_] } @$pegs);
	}
    }

    #
    # If we need to finalize, make the call.
    if (@finalize_req)
    {
	print Dumper(\@finalize_req);
	$ret = $peer->finalize_pegs($session, \@finalize_req);

	if (!$ret or ref($ret) ne "ARRAY")
	{
	    die "perform_update: finalize_pegs failed\n";
	}

	#
	# The return is a list of either location entries or
	# sequence data. Attempt to finish up the mapping.
	#


	my $dbh = $fig->db_handle();
	for my $entry (@$ret)
	{
	    my($what, $peg, @rest) = @$entry;

	    if ($what eq "peg_loc")
	    {
		my($strand, $start, $end, $cksum) = @rest;

		#
		# We have a contig location. Try to find a matching contig
		# here, and see if it maps to something.
		#

		my $my_genome = $local_genome{$peg};
		my $local_contig = $fig->find_contig_with_checksum($my_genome, $cksum);
		if ($local_contig)
		{
		    print "$peg maps to local genome $my_genome and contig $local_contig start=$start end=$end strand=$strand\n";
		    #
		    # Now look up the local peg. We match on the end location; depending on the strand
		    # the feature is on, we want to look at either minloc or maxloc.
		    #

		    my $whichloc = $strand eq '-' ? "minloc" : "maxloc";

		    my $res = $dbh->SQL(qq!SELECT id from features
					   WHERE $whichloc = $end and genome = '$my_genome' and
					   contig = '$local_contig'
					!);

		    if ($res and @$res == 1)
		    {
			my($id) = $res->[0]->[0];
			$peg_mapping{$peg} = $id;
			print "Mapped $peg to $id via contigs\n";
		    }
		    else
		    {
			print "Failed to map $peg via contigs\n";
		    }
		}
		else
		{
		    print "Mapping failed for $my_genome checksum $cksum\n";
		}
	    }
	}
    }
}


#############
#
# P2P Relay 
#
#############


package P2P::Relay;
use strict;

use Data::Dumper;
use SOAP::Lite;

use P2P;

sub new
{
    my($class, $url) = @_;

    my $proxy = SOAP::Lite->uri($P2P::ns_relay)->proxy($url);
    
    my $self = {
	url => $url,
	proxy => $proxy,
    };
    return bless($self, $class);
}

sub enumerate_annotation_systems
{
    my($self) = @_;

    return $self->{proxy}->enumerate_annotation_systems()->result;
}

sub fetch_queries
{
    my($self, $id) = @_;

    my $reply = $self->{proxy}->fetch_queries($id);

    if ($reply->fault)
    {
	print "Failed to fetch queries: ", $reply->faultcode, " ", $reply->faultstring, "\n";
	return undef;
    }

    return $reply->result;
}

sub deposit_answer
{
    my($self, $id, $key, $answer) = @_;

    my $reply = $self->{proxy}->deposit_answer($id, $key,
					       SOAP::Data->type('base64')->value($answer));

    if ($reply->fault)
    {
	print "deposit_answer got fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
	return undef;
    }	
    
    return $reply;
}

=pod

=head1 await_result

Await the result from a possibly-asynchronous soap request.

Look at the reply that we have. If it's a deferred reply, loop polling
the relay for the actual result.

We determine if the reply is a deferred reply by examining the namespace
URI of the response. A response will be generated from the relay's namespace,
rather than that of the application itself.

=cut

sub await_result
{
    my($self, $reply) = @_;

    while (1)
    {
	#
	# Retrieve the namespace of the response, which is the first
	# element in the body of the message.
	#
	my $ns = $reply->namespaceuriof('/Envelope/Body/[1]');
	print "Reply ns=$ns want $P2P::ns_relay\n";

	if ($ns eq $P2P::ns_relay)
	{
	    my $val = $reply->result;
	    print "got val=", Dumper($val);
	    if ($val->[0] eq 'deferred')
	    {
		#
		# Sleep a little, then try to retrieve the response.
		#
		
		sleep(1);
		my $id = $val->[1];

		print "Retrieving reply\n";
		$reply = $self->{proxy}->call_completed($id);
	    }
	    else
	    {
		#
		# We're not sure what to do here..
		#
		return undef;
	    }
	}
	else
	{
	    #
	    # We got an actual response. Return it.
	    #

	    return $reply;
	}
    }
}

#############
#
# P2P Requestor
#
#############

package P2P::Requestor;
use strict;

use Data::Dumper;

use SOAP::Lite;
use P2P;

#
# Create a new Requestor. It contains a reference to the FIG instance
# so that we can run the protocol completely from in here.
#

sub new
{
    my($class, $fig, $url, $peer_id, $relay) = @_;

    my $proxy = SOAP::Lite->uri($ns_p2p)->proxy($url);
    
    my $self = {
	fig => $fig,
	url => $url,
	peer_id => $peer_id,
	proxy => $proxy,
	relay => $relay,
    };
    return bless($self, $class);
}

#
# First step: Request an update.
#
# We need to determine some notion of what our release is, since we are not
# currently tagging them explicitly. Until we delve into this more,
# I am going to return a null release, which means the same-release
# optimization won't be able to kick in.
#
# We also need to determine the last time we got an update from this
# system. 
#

sub request_update
{
    my($self, $last_update) = @_;

    my $rel = $self->{fig}->get_release_info();

    if (!defined($last_update))
    {
	$last_update = $self->{fig}->get_peer_last_update($self->{peer_id});
    }
    
    my $reply = $self->{proxy}->request_update($rel, $last_update);

    if ($self->{relay})
    {
	$reply = $self->{relay}->await_result($reply);
    }

    if ($reply->fault)
    {
	print "request_update triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
	return undef;
    }

    return $reply->result;
}

=pod

=head1 get_pegs($session_id, $start, $length)


=cut

sub get_pegs
{
    my($self, $session_id, $start, $length) = @_;

    return $self->call("get_pegs", $session_id, $start, $length);
}

sub finalize_pegs
{
    my($self, $session_id, $request) = @_;

    return $self->call("finalize_pegs", $session_id, $request);
}

sub call
{
    my($self, $func, @args) = @_;
    
    my $reply = $self->{proxy}->$func(@args);
    
    if ($self->{relay})
    {
	$reply = $self->{relay}->await_result($reply);
    }

    if ($reply->fault)
    {
	print "$func triggered fault: ", $reply->faultcode, " ", $reply->faultstring, "\n";
	return undef;
    }

    return $reply->result;
}
    

#############
#
# P2P Service
#
# Code in this module is invoked on the target on behalf of a requestor.
#
#############

package P2P::Service;

use Data::Dumper;

use FIG;
use FIG_Config;
use strict;

use File::Temp qw(tempdir);
use File::Basename;

sub request_update
{
    my($class, $his_release, $last_update)= @_;

    #
    # Verify input.
    #

    if ($last_update !~ /^\d+$/)
    {
	die "request_update: last_update must be a number (not '$last_update')\n";
    }

    #
    # Create a new session id and a spool directory to use for storage
    # of information about it. This can go in the tempdir since it is
    # not persistent.
    #
    
    &FIG::verify_dir("$FIG_Config::temp/p2p_spool");
    #my $spool_dir = tempdir(DIR  => "$FIG_Config::temp/p2p_spool");

    my $spool_dir = "$FIG_Config::temp/p2p_spool/test";
    &FIG::verify_dir($spool_dir);

    my $session_id = basename($spool_dir);
    my $now = time;

    #
    # Gather the list of pegs and annotations for the update.
    #

    my $fig = new FIG;

    my $all_genomes = [$fig->genomes];

    my %all_genomes = map { $_ => 1 } @$all_genomes;

    my %pegs;
    
    my $num_annos = 0;
    my $num_genomes = 0;
    my $num_pegs = 0;

    my $anno_fh;
    open($anno_fh, ">$spool_dir/annos");

    my $peg_fh;
    open($peg_fh, ">$spool_dir/pegs");

    my $genome_fh;
    open($genome_fh, ">$spool_dir/genomes");

    for my $genome (@$all_genomes)
    {
	my $num_annos_for_genome = 0;
	
	my $genome_dir = "$FIG_Config::organisms/$genome";
	next unless -d $genome_dir;

	my $afh;
	if (open($afh, "$genome_dir/annotations"))
	{
	    my($fid, $anno_time, $who, $anno_text);
	    local($/);
	    $/ = "//\n";
	    while (my $ann = <$afh>)
	    {
		chomp $ann;
	    
		if ((($fid, $anno_time, $who, $anno_text) =
		     ($ann =~ /^(fig\|\d+\.\d+\.peg\.\d+)\n(\d+)\n(\S+)\n(.*\S)/s)) and
		    $anno_time > $last_update)
		    
		{
		    #
		    # Look up aliases if we haven't seen this fid before.
		    #

		    if (!defined($pegs{$fid}))
		    {
			my @aliases = $fig->feature_aliases($fid);

			print $peg_fh join("\t", $fid, $genome, @aliases), "\n";
			$num_pegs++;
		    }

		    print $anno_fh "$ann//\n";

		    $pegs{$fid}++;

		    $num_annos_for_genome++;
		    $num_annos++;
		}
	    }
	    close($afh);
	}
	
	#
	# Determine genome information if we have annotations for this one.
	#

	if ($num_annos_for_genome > 0)
	{
	    $num_genomes++;
	    if (open(my $cfh, "<$genome_dir/COUNTS"))
	    {
		if ($_ = <$cfh>)
		{
		    chomp;
		    my($cgenome, $n_contigs, $total_nucs, $cksum) = split(/\t/, $_);
		    if ($cgenome ne $genome)
		    {
			warn "Hm, $genome has a COUNTS file with genome=$cgenome that does not match\n";
		    }
		    else
		    {
			print $genome_fh join("\t",
					      $genome, $num_annos_for_genome, $n_contigs,
					      $total_nucs, $cksum), "\n";
		    }
		}
	    }
	}

    }
    close($anno_fh);
    close($peg_fh);
    close($genome_fh);

    print "Pegs: $num_pegs\n";
    print "Genomes: $num_genomes\n";
    print "Annos: $num_annos\n";

    #
    # Check compatibility.
    #

    my $my_release = $fig->get_release_info();
    my $compatible = (defined($my_release) && ($my_release == $his_release)) ? 1 : 0;

    open(my $fh, ">$spool_dir/INFO");
    print $fh "requestor_release\t$his_release\n";
    print $fh "last_update\t$last_update\n";
    print $fh "cur_update\t$now\n";
    print $fh "target_release\t$my_release\n";
    print $fh "compatible\t$compatible\n";
    print $fh "num_pegs\t$num_pegs\n";
    print $fh "num_genomes\t$num_genomes\n";
    print $fh "num_annos\t$num_annos\n";
    close($fh);

    return [$session_id, $my_release, $num_annos, $num_pegs, $num_genomes, $now, $compatible];
}


sub get_pegs
{
    my($self, $session_id, $start, $len) = @_;
    my(%session_info);

    my $spool_dir = "$FIG_Config::temp/p2p_spool/$session_id";

    -d $spool_dir or die "Invalid session id $session_id";

    #
    # Read in the cached information for this session.
    #

    open(my $info_fh, "<$spool_dir/INFO") or die "Cannot open INFO file: $!";
    while (<$info_fh>)
    {
	chomp;
	my($var, $val) = split(/\t/, $_, 2);
	$session_info{$var} = $val;
    }
    close($info_fh);

    #
    # Sanity check start and length.
    #

    if ($start < 0 or $start >= $session_info{num_pegs})
    {
	die "Invalid start position $start";
    }

    if ($len < 0 or ($start + $len - 1) >= $session_info{num_pegs})
    {
	die "Invalid length $len";
    }

    #
    # Open file, spin to the starting line, then start reading.
    #

    open(my $peg_fh, "<$spool_dir/pegs") or die "Cannot open pegs file: $!";

    my $peg_output = [];
    my $genome_output = [];

    my $peg_num = 0;
    my $genomes_to_show = [];
    my %genomes_to_show;

    my($fid, $genome, @aliases);
       
    while (<$peg_fh>)
    {
	next if ($peg_num < $start);

	last if ($peg_num > ($start + $len));

	chomp;

	#
	# OK, this is a peg to process.
	# It's easy if we're compatible.
	#

	($fid, $genome, @aliases) = split(/\t/, $_);

	if ($session_info{compatible})
	{
	    push(@$peg_output, ['peg', $fid]);
	}
	else
	{
	    if (!$genomes_to_show{$genome})
	    {
		push(@$genomes_to_show, $genome);
		$genomes_to_show{$genome}++;
	    }
	    push(@$peg_output, ['peg_info', $fid, [@aliases], $genome]);
	}
    }
    continue
    {
	$peg_num++;
    }

    #
    # Read the genomes file, returning information about genomes referenced
    # in the pegs returned.
    #

    my $n_left = @$genomes_to_show;

    open(my $gfh, "<$spool_dir/genomes") or die "Cannot open genomes file: $!";
    while ($n_left > 0 and $_ = <$gfh>)
    {
	chomp;

	my($genome, $n_annos, $n_contigs, $n_nucs, $cksum) = split(/\t/);

	if ($genomes_to_show{$genome})
	{
	    push(@$genome_output, [$genome, $n_contigs, $n_nucs, $cksum]);
	    $n_left--;
	}
    }
    close($gfh);

    return [$peg_output, $genome_output];
}

sub finalize_pegs
{
    my($self, $session, $request) = @_;
    my($out);

    my $fig = new FIG;

    #
    # Walk the request handling appropriately. This is fairly easy, as it
    # is just a matter of pulling either sequence or location/contig data.
    #

    for my $item (@$request)
    {
	my($what, $peg) = @$item;

	if ($what eq "peg_genome")
	{
	    #
	    # Return the location and contig checksum for this peg.
	    #

	    my $loc = $fig->feature_location($peg);
	    my $contig = $fig->contig_of($loc);
	    my $cksum = $fig->contig_checksum($fig->genome_of($peg), $contig);
	    warn "Checksum for '$loc' '$contig' is $cksum\n";

	    push(@$out, ['peg_loc', $peg,
			$fig->strand_of($loc),
			$fig->beg_of($loc), $fig->end_of($loc),
			$cksum]);

	}
	elsif ($what eq "peg_unknown")
	{
	    my $seq = $fig->get_translation($peg);
	    push(@$out, ['peg_seq', $peg, $seq]);
	}
    }
    return $out;
}
    

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3