[Bio] / FigKernelPackages / FFserver.pm Repository:
ViewVC logotype

View of /FigKernelPackages/FFserver.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.14 - (download) (as text) (annotate)
Fri Jun 19 17:30:02 2009 UTC (10 years, 5 months ago) by olson
Branch: MAIN
Changes since 1.13: +35 -3 lines
server fixes

package FFserver;

#
# This is a SAS Component
#

use LWP::UserAgent;
use Data::Dumper;
use YAML;

use strict;

sub new
{
    my($class, $server_url) = @_;

    $server_url = "http://servers.nmpdr.org/figfam/server.cgi" unless $server_url;


    my $self = {
	server_url => $server_url,
	ua => LWP::UserAgent->new(),
    };
    $self->{ua}->timeout(20 * 60);

    return bless $self, $class;
}

sub members_of_families
{
    my($self, @ids) = @_;
    return $self->run_query('members_of_families', @ids);
}

sub families_containing_peg
{
    my($self, @ids) = @_;
    return $self->run_query('families_containing_peg', @ids);
}

sub function_of
{
    my($self, @ids) = @_;
    return $self->run_query('function_of', @ids);
}

sub org_of
{
    my($self, @ids) = @_;
    return $self->run_query('org_of', @ids);
}

sub seq_of
{
    my($self, @ids) = @_;
    return $self->run_query('seq_of', @ids);
}

sub aliases_of
{
    my($self, @ids) = @_;
    return $self->run_query('aliases_of', @ids);
}

sub families_implementing_role
{
    my($self,@roles) = @_;
    return $self->run_query('families_implementing_role', @roles);
}

sub families_with_function 
{
    my($self,@functions) = @_;
    return $self->run_query('families_with_function', @functions);
}

sub families_in_genome
{
    my($self,@genomes) = @_;
    return $self->run_query('families_in_genome', @genomes);
}

sub get_subsystem_based_figfams
{
    my ($self) = @_;
    return $self->run_query('get_subsystem_based_figfams');
}

sub should_be_member
{
    my($self, @id_seq_pairs) = @_;
    return $self->run_query('should_be_member', @id_seq_pairs);
}

sub all_families
{
    my($self) = @_;
    return $self->run_query('all_families');
}

sub run_query
{
    my($self, $function, @args ) = @_;
    my $form = [function  => $function,
		args => YAML::Dump(@args),
		];
    return $self->run_query_form($form);
}

sub run_query_form
{
    my($self, $form) = @_;

    my $res = $self->{ua}->post($self->{server_url}, $form);
    
    if ($res->is_success)
    {
	my $content = $res->content;
#	print "Got $content\n";
	my $ret;
	eval { 
	    $ret = Load($content);
	};
	if ($@)
	{
	    die "Query returned unparsable content ($@): " . $content;
	}
	return $ret;
    }
    else
    {
	die "error on post " . $res->status_line . " " . $res->content;
    }
}

sub assign_function_to_prot
{
    my($self, $input, $blast, $min_hits) = @_;

    my $wq;

    my $params = [blast => $blast, min_hits => $min_hits];
    
    if (ref($input) eq 'ARRAY')
    {
	$wq = SequenceListWorkQueue->new($input);
    }
    else
    {
	$wq = FastaWorkQueue->new($input);
    }

    my $req_bytes = $blast ? 1000 : 16000;

    return ResultHandler->new($wq, $self->{server_url}, 'assign_function_to_prot', \&id_seq_pair_bundler,
			      #\&tab_delimited_output_parser,
			      \&YAML::Load,
			      $params, $req_bytes);
}

sub assign_functions_to_dna
{
    my($self, $input, $min_hits, $max_gap, $blast) = @_;

    $min_hits = 3 unless defined($min_hits);
    $max_gap = 600 unless defined($max_gap);
    $blast = 0 unless defined($blast);

    my $wq;
    
    if (ref($input) eq 'ARRAY')
    {
	$wq = SequenceListWorkQueue->new($input);
    }
    else
    {
	$wq = FastaWorkQueue->new($input);
    }

    my $req_bytes = $blast ? 1000 : 500000;
    my $params = [min_hits => $min_hits, max_gap => $max_gap, blast => $blast];
    return ResultHandler->new($wq, $self->{server_url}, 'assign_functions_to_DNA',
			      \&id_seq_pair_bundler,
			      \&tab_delimited_output_parser, $params, $req_bytes);
}

sub id_seq_pair_bundler
{
    my($item) = @_;
    my($id, $seq) = @$item[0,2];
    return "id_seq", join(",", $id, (ref($seq) eq 'SCALAR' ? $$seq : $seq));
}

sub tab_delimited_output_parser
{
    my($line) = @_;
    chomp $line;
    my @cols = split(/\t/, $line);
    return \@cols;
}


sub tab_delimited_dna_data_output_parser
{
    my($line) = @_;
    chomp $line;
    my ($id, $idbe, $fam) = split(/\t/, $line);
    my ($beg, $end) = $idbe =~ /_(\d+)_(\d+)$/;
    return [$id, $beg, $end, $fam];
}

package ResultHandler;
use strict;
use Data::Dumper;

sub new
{
    my($class, $work_queue, $server_url, $function, $input_bundler, $output_parser, $form_vars, $req_bytes) = @_;

    my $self = {
	work_queue => $work_queue,
	server_url => $server_url,
	function => $function,
	input_bundler => $input_bundler,
	output_parser => $output_parser,
	ua => LWP::UserAgent->new(),
	cur_result => undef,
	form_vars => $form_vars ? $form_vars : [],
	req_bytes => ($req_bytes ? $req_bytes : 16000),
    };
    $self->{ua}->timeout(20 * 60);
    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    my $res =  $self->get_next_from_result();
    # print "gnfr returns: " , Dumper($res);

    if ($res)
    {
	return $res;
    }
    else
    {
	
	while (my @inp = $self->{work_queue}->get_next_n_bytes($self->{req_bytes}))
	{
	    my $form = [@{$self->{form_vars}}];
	    push(@$form, function => $self->{function},
			 map { &{$self->{input_bundler}}($_) } @inp);
	    # print "Invoke " .Dumper($form);

	    my $res = $self->{ua}->post($self->{server_url}, $form);
	    if ($res->is_success)
	    {
		eval { 
		    $self->{cur_result} = [YAML::Load($res->content)];
		};
		if ($@)
		{
		    die "Query returned unparsable content ($@): " . $res->content;
		}
		# print "res: " . Dumper($self->{cur_result});
		my $oneres =  $self->get_next_from_result();
		if ($oneres)
		{
		    return $oneres;
		}
	    }
	    else
	    {
		die "error " . $res->status_line . " on post " . $res->content;
	    }
	}
	return;
    }
}

sub get_next_from_result
{
    my($self) = @_;
    my $l = $self->{cur_result};
    if ($l and @$l)
    {
	return shift(@$l);
    }
    else
    {
	delete $self->{cur_result};
	return undef;
    }
}

package SequenceWorkQueue;
use strict;

sub new
{
    my($class) = @_;

    my $self = {};
    
    return bless $self, $class;
}

sub get_next_n
{
    my($self, $n) = @_;
    my @out;
    
    for (my $i = 0;$i < $n; $i++)
    {
	my($id, $com, $seqp) = $self->get_next();
	if (defined($id))
	{
	    push(@out, [$id, $com, $seqp]);
	}
	else
	{
	    last;
	}
    }
    return @out;
}

sub get_next_n_bytes
{
    my($self, $n) = @_;
    my @out;

    my $size = 0;
    while ($size < $n)
    {
	my($id, $com, $seqp) = $self->get_next();
	if (defined($id))
	{
	    push(@out, [$id, $com, $seqp]);
	    $size += (ref($seqp) eq 'SCALAR') ? length($$seqp) : length($seqp);
	}
	else
	{
	    last;
	}
    }
    return @out;
}

package FastaWorkQueue;
use strict;
use base 'SequenceWorkQueue';
use FileHandle;

sub new
{
    my($class, $input) = @_;

    my $fh;
    if (ref($input))
    {
	$fh = $input;
    }
    else
    {
	$fh = new FileHandle("<$input");
    }

    my $self = $class->SUPER::new();

    $self->{fh} = $fh;

    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    my($id, $seqp, $com) = read_fasta_record($self->{fh});
    return defined($id) ? ($id, $com, $seqp) : ();
}

sub read_fasta_record {
    my ($file_handle) = @_;
    my ($old_end_of_record, $fasta_record, @lines, $head, $sequence, $seq_id, $comment, @parsed_fasta_record);

    if (not defined($file_handle))  { $file_handle = \*STDIN; }

    $old_end_of_record = $/;
    $/ = "\n>";

    if (defined($fasta_record = <$file_handle>)) {
        chomp $fasta_record;
        @lines  =  split( /\n/, $fasta_record );
        $head   =  shift @lines;
        $head   =~ s/^>?//;
        $head   =~ m/^(\S+)/;
        $seq_id = $1;
        if ($head  =~ m/^\S+\s+(.*)$/)  { $comment = $1; } else { $comment = ""; }
        $sequence  =  join( "", @lines );
        @parsed_fasta_record = ( $seq_id, \$sequence, $comment );
    } else {
        @parsed_fasta_record = ();
    }

    $/ = $old_end_of_record;

    return @parsed_fasta_record;
}

package SequenceListWorkQueue;
use strict;
use base 'SequenceWorkQueue';

sub new
{
    my($class, $input) = @_;

    my $fh;
    if (ref($input) ne 'ARRAY')
    {
	die "SequenceWorkQueue requires a list as input";
    }

    my $self = $class->SUPER::new();

    $self->{list} = $input;

    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    my $top = shift @{$self->{list}};

    return defined($top) ? @$top : ();
}


1;


MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3