[Bio] / FigKernelPackages / FFserver.pm Repository:
ViewVC logotype

View of /FigKernelPackages/FFserver.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.7 - (download) (as text) (annotate)
Tue May 12 21:16:29 2009 UTC (10 years, 7 months ago) by olson
Branch: MAIN
Changes since 1.6: +1 -1 lines
add status line to dead post error message

package FFserver;

use LWP::UserAgent;
use Data::Dumper;
use YAML;

use strict;

sub new
{
    my($class, $server_url) = @_;

    $server_url = "http://servers.nmpdr.org/figfam/server.cgi" unless $server_url;


    my $self = {
	server_url => $server_url,
	ua => LWP::UserAgent->new(),
    };

    return bless $self, $class;
}

sub members_of_families
{
    my($self, @ids) = @_;
    return $self->run_query('members_of_families', @ids);
}

sub families_containing_peg
{
    my($self, @ids) = @_;
    return $self->run_query('families_containing_peg', @ids);
}

sub should_be_member
{
    my($self, @id_seq_pairs) = @_;
    return $self->run_query('should_be_member', @id_seq_pairs);
}

sub all_families
{
    my($self) = @_;
    return $self->run_query('all_families');
}

sub run_query
{
    my($self, $function, @args ) = @_;
    my $form = [function  => $function,
		args => YAML::Dump(@args),
		];
    return $self->run_query_form($form);
}

sub run_query_form
{
    my($self, $form) = @_;

    my $res = $self->{ua}->post($self->{server_url}, $form);
    
    if ($res->is_success)
    {
	return Load($res->content);
    }
    else
    {
	die "error on post " . $res->status_line . " " . $res->content;
    }
}

sub assign_function_to_prot
{
    my($self, $input) = @_;

    my $wq;
    
    if (ref($input) eq 'ARRAY')
    {
	$wq = SequenceListWorkQueue->new($input);
    }
    else
    {
	$wq = FastaWorkQueue->new($input);
    }

    return ResultHandler->new($wq, $self->{server_url}, 'assign_function_to_prot', \&id_seq_pair_bundler, \&tab_delimited_output_parser);
}

sub assign_functions_to_dna
{
    my($self, $input, $min_hits, $max_gap) = @_;

    my $wq;
    
    if (ref($input) eq 'ARRAY')
    {
	$wq = SequenceListWorkQueue->new($input);
    }
    else
    {
	$wq = FastaWorkQueue->new($input);
    }

    return ResultHandler->new($wq, $self->{server_url}, 'assign_functions_to_DNA', \&id_seq_pair_bundler, \&tab_delimited_output_parser, [min_hits => $min_hits, max_gap => $max_gap]);
}

sub id_seq_pair_bundler
{
    my($item) = @_;
    my($id, $seq) = @$item[0,2];
    return "id_seq", join(",", $id, (ref($seq) eq 'SCALAR' ? $$seq : $seq));
}

sub tab_delimited_output_parser
{
    my($line) = @_;
    chomp $line;
    my @cols = split(/\t/, $line);
    return \@cols;
}


sub tab_delimited_dna_data_output_parser
{
    my($line) = @_;
    chomp $line;
    my ($id, $idbe, $fam) = split(/\t/, $line);
    my ($beg, $end) = $idbe =~ /_(\d+)_(\d+)$/;
    return [$id, $beg, $end, $fam];
}

package ResultHandler;
use strict;
use Data::Dumper;

sub new
{
    my($class, $work_queue, $server_url, $function, $input_bundler, $output_parser, $form_vars) = @_;

    my $self = {
	work_queue => $work_queue,
	server_url => $server_url,
	function => $function,
	input_bundler => $input_bundler,
	output_parser => $output_parser,
	ua => LWP::UserAgent->new(),
	cur_result => undef,
	form_vars => $form_vars ? $form_vars : [],
    };
    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    if ($self->{cur_result})
    {
	return $self->get_next_from_result();
    }
    else
    {
	my @inp = $self->{work_queue}->get_next_n_bytes(16000);
	if (@inp)
	{
	    my $form = [@{$self->{form_vars}}];
	    push(@$form, function => $self->{function},
			 map { &{$self->{input_bundler}}($_) } @inp);
	    print "Invoke " .Dumper($form);

	    my $res = $self->{ua}->post($self->{server_url}, $form);
	    if ($res->is_success)
	    {
		$self->{cur_result} = $res->content;
		#print "res: $self->{cur_result}\n";
		return $self->get_next_from_result();
	    }
	    else
	    {
		die "error " . $res->status_line . " on post " . $res->content;
	    }
	}
	else
	{
	    return;
	}
    }
}

sub get_next_from_result
{
    my($self) = @_;
    if ($self->{cur_result} =~ s/^([^\n]*)\n//)
    {
	return &{$self->{output_parser}}($1);
    }
}

package SequenceWorkQueue;
use strict;

sub new
{
    my($class) = @_;

    my $self = {};
    
    return bless $self, $class;
}

sub get_next_n
{
    my($self, $n) = @_;
    my @out;
    
    for (my $i = 0;$i < $n; $i++)
    {
	my($id, $com, $seqp) = $self->get_next();
	if (defined($id))
	{
	    push(@out, [$id, $com, $seqp]);
	}
	else
	{
	    last;
	}
    }
    return @out;
}

sub get_next_n_bytes
{
    my($self, $n) = @_;
    my @out;

    my $size = 0;
    while ($size < $n)
    {
	my($id, $com, $seqp) = $self->get_next();
	if (defined($id))
	{
	    push(@out, [$id, $com, $seqp]);
	    $size += (ref($seqp) eq 'SCALAR') ? length($$seqp) : length($seqp);
	}
	else
	{
	    last;
	}
    }
    return @out;
}

package FastaWorkQueue;
use strict;
use base 'SequenceWorkQueue';
use FileHandle;
use FIG;

sub new
{
    my($class, $input) = @_;

    my $fh;
    if (ref($input))
    {
	$fh = $input;
    }
    else
    {
	$fh = new FileHandle("<$input");
    }

    my $self = $class->SUPER::new();

    $self->{fh} = $fh;

    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    my($id, $seqp, $com) = &FIG::read_fasta_record($self->{fh});
    return defined($id) ? ($id, $com, $seqp) : ();
}

package SequenceListWorkQueue;
use strict;
use base 'SequenceWorkQueue';

sub new
{
    my($class, $input) = @_;

    my $fh;
    if (ref($input) ne 'ARRAY')
    {
	die "SequenceWorkQueue requires a list as input";
    }

    my $self = $class->SUPER::new();

    $self->{list} = $input;

    return bless $self, $class;
}

sub get_next
{
    my($self) = @_;

    my $top = shift @{$self->{list}};

    return defined($top) ? @$top : ();
}


1;


MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3