[Bio] / ClusterTools / worker.pl Repository:
ViewVC logotype

View of /ClusterTools/worker.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.9 - (download) (as text) (annotate)
Tue Mar 1 20:37:09 2005 UTC (14 years, 8 months ago) by olson
Branch: MAIN
CVS Tags: HEAD
Changes since 1.8: +10 -3 lines
darn clusteR_id fix, plus some jazz hardcoded defaults

use Data::Dumper;

use lib '/home/olson/Cluster/lib';
use lib '/home/olson/Cluster/FigKernelPackages';

use IO::Select;
use SOAP::Lite;
use LWP::UserAgent;
use HTTP::Request::Common;

use strict;

use ClusterWorker;

use Getopt::Long;

#
# Logging initialization.
#

use Log::Log4perl;
use Log::Log4perl::Appender::Socket;

# Configuration in a string ...

Log::Log4perl::init_and_watch("worker.logconf", 60);

my $log = Log::Log4perl->get_logger("worker");

$log->info("Worker starting");
$log->debug("foo");

my $spool_dir = "/home/olson/cluster_spool";
my $ncbi_bin = "/home/olson/FIGdisk/env/linux-glibc-2.2/bin";

#my $spool_dir = "$ENV{TG_CLUSTER_SCRATCH}/cluster_spool";
#my $ncbi_bin = "$ENV{TG_CLUSTER_SCRATCH}/FIGdisk.Nov20/env/tg-ia64/bin";

my $ncbi_blastall = "$ncbi_bin/blastall";
my $ncbi_formatdb = "$ncbi_bin/formatdb";

-x $ncbi_blastall or $log->logdie("Blastall $ncbi_blastall not executable");
-x $ncbi_formatdb or $log->logdie("Formatdb $ncbi_formatdb not executable");

my $cluster_id;

my $args_ok = GetOptions("cluster_id=i" => \$cluster_id);

$args_ok or die "Usage: $0 --cluster_id <id>\n";
defined($cluster_id) or die "Usage: $0 --cluster_id <id>\n";


#
# Signals to handle.
#
my @signals = qw(INT HUP TERM);


my $GOT_QUIT = 0;

#
# Define the work-types we understand, and the methods for processing them.
#

my %work_types = (sim => \&do_sim_work,
		  stage => \&do_stage_work,
		  stage_nr => \&do_stage_nr_work,
		  wait => \&do_wait_work,
		 );


my $port = 80;
my $host = "xenophilus.nmpdr.org";
my $url = "http://$host:$port/olson/FIG/cluster_service.cgi";

if (! -d $spool_dir)
{
    mkdir($spool_dir) or die "Cannot mkdir $spool_dir: $!\n";
}

#
# Create a worker handle
#
my $c = new Cluster::Worker($url);


#
# Register with our information.
#

my $hostname = `hostname`;
chomp($hostname);
my $user = (getpwuid($>))[0];

my $worker_id = $c->register_worker($hostname, $user, $$, $0, $cluster_id, [keys(%work_types)]);
defined($worker_id) or $log->logdie("Did not get worker id");

my $log = Log::Log4perl->get_logger("C${cluster_id}_W$worker_id");

$log->info("received worker_id=$worker_id");

while (1)
{
    #
    # Ignore signals while getting work.
    #

    signal_ignore();
    
    my $work;
    eval {
	$work = $c->get_work($worker_id);
    };

    if ($@)
    {
	$log->logwarn("get_work call failed: $@");

	signal_normal();

	sleep(30);

	next;
    }


    #
    # Reenable signals with a handler that will fail the work unit
    # if the script is killed.
    #

    my($job_id, $work_id, $work_worker_id, $work_name) =
	@{$work}{qw(job_id work_id worker_id work_name)};

    $log->info("$worker_id got_work job_id=$job_id work_id=$work_id work_name=$work_name");

    not defined($work_worker_id) or ($work_worker_id == $worker_id) or
	$log->logwarn("Work returned has a different worker id ($work_worker_id) than mine ($worker_id)");

    #
    # Create job / work spool directories if they do not already exist.
    #
    
    my($job_dir, $work_dir);
    if (defined($job_id))
    {
	$job_dir = "$spool_dir/job_$job_id";
	if (! -d $job_dir)
	{
	    mkdir($job_dir) or $log->logdie("Cannot mkdir job spool dir $job_dir: $!");
	}

	if (defined($work_id))
	{
	    $work_dir = "$job_dir/work_$work_id";
	    if (! -d $work_dir)
	    {
		mkdir($work_dir) or $log->logdie("Cannot mkdir work spool dir $work_dir: $!");
	    }
	}
    }

    #
    # Determine what sort of work this is.
    #
    
    my $work_type = $work->{work_name};


    if (defined($work_type))
    {
	my $handler = $work_types{$work_type};

	#
	# And dispatch. If it fails (the handler calls die), call
	# work_abort() on that piece of work. We may want to return
	# a failure for that, or a "faild on this cluster" return. Later.
	#
    
	if ($handler)
	{
	    eval { &$handler($c, $work, $job_id, $work_id, $worker_id, $job_dir, $work_dir); };
	    if ($@)
	    {
		$log->logwarn("Work failed: $@");
		signal_ignore();
		abort_work($c, $job_id, $work_id, $worker_id, $@);
	    }
	    signal_normal();
	}
	else
	{
	    signal_ignore();
	    $log->logwarn("No handler available for worktype $work_type");
	    abort_work($c, $job_id, $work_id, $worker_id, "no handler available");
	}
    }
    else
    {
	#
	# We don't have a handler for this piece of work. We actually should,
	# since we said we did. Abort the piece of work and exit.
	#
	signal_ignore();
	abort_work($c, $job_id, $work_id, $worker_id, "No handler defined for work type $work_type");
	signal_normal();
	$log->logdie("No work type defined in return: " .  Dumper($work));
    }
}

sub abort_work
{
    my($c, $job_id, $work_id, $worker_id, $reason) = @_;

    $log->info("Invoking work_failed $job_id $work_id $reason\n");
    $c->work_failed($job_id, $work_id, $worker_id, $reason);
}

sub do_sim_work
{
    my($c, $work, $job_id, $work_id, $worker_id, $job_dir, $work_dir) = @_;

    signal_fail_work($c, $job_id, $work_id, $worker_id);

    my $sim_info = $work->{job_specific};

    my $fasta = $sim_info->{input_seq};
    my $thresh = $sim_info->{blast_thresh};

    my $l = length($fasta);
    $log->info("sim work. l=$l thres=$thresh");
    
    chdir($work_dir) or die "Cannot chdir $work_dir: $!";

    open(my $input_fh, ">input");
    print $input_fh $fasta;
    close($input_fh);

    #
    # Ensure that the nr database files are in place.
    #

    if (! -f "$job_dir/nr.phr" or
	! -f "$job_dir/nr.pin" or
	! -f "$job_dir/nr.psq")
    {
	my $out = `ls -al $job_dir`;
	$log->logdie("Blast databases missing from $job_dir: $out");
    }
    
    #
    # Invoke blast.
    #
    # Open a pipe from its stdout, not because we are interested in the
    # output, but because we want to keep this process awake and sending
    # heartbeats back to the broker.
    #

    my @cmd = ($ncbi_blastall,
	       -m => 8,
	       -e => $thresh,
	       -i => 'input',
	       '-FF',
	       -d => "$job_dir/nr",
	       -p => 'blastp',
	       -o => 'output');

    my $fh;
    my $pid = open($fh, "-|", @cmd);

    defined($pid) or $log->logdie("Open of $ncbi_blastall failed: $!");

    my $sel = IO::Select->new($fh);
    my($n, $buf);
    
    while (1)
    {
	my @ready = $sel->can_read(300);

	if (@ready)
	{
	    $n = read($fh, $buf, 4096);
	    if ($n == 0)
	    {
		#
		# EOF, our work here is done.
		#
		my $rc = close($fh);
		if (!$rc)
		{
		    #
		    # Problem on exit.
		    #

		    $log->logdie("Process exited with nonzero return status $?");
		}
		last;
	    }
	}
	else
	{
	    $c->worker_alive($worker_id);
	}
    }
    
    my $size = -s 'output';
    $log->info("Blastall finished, output size $size");
    my $h = $c->get_upload_handles($job_id, $work_id, $worker_id, ['output']);

    $url = $h->{output};

    if (upload_output($job_id, $work_id, $url, 'output'))
    {
	signal_ignore();
	$c->work_done($job_id, $work_id, $worker_id, "output");
    }
    else
    {
	$log->logdie("Upload failed");
    }	
}


sub do_stage_work
{
    my($c, $work, $job_id, $work_id, $worker_id, $job_dir, $work_dir) = @_;

    signal_fail_work($c, $job_id, $work_id, $worker_id);

    my $stage_info = $work->{job_specific};

    my $file = $stage_info->{file};
    my $url = $stage_info->{url};
    
    $log->info("Getting stage $file $url");
    
    my $ua = new LWP::UserAgent;

    my $out_file = "$job_dir/$file";

    #my $resp = $ua->get($url, ':content_file' => $out_file);

    my $req = HTTP::Request->new('GET', $url);
    my $resp = $ua->request($req, $out_file);

    $log->info("Got response " . $resp->status_line . "\n");

    my $info = `ls -l $out_file`;
    chomp($info);
    $log->info("Read file: $info");

    signal_ignore();
    $c->work_done($job_id, $work_id, $worker_id, $info);
}


sub do_stage_nr_work
{
    my($c, $work, $job_id, $work_id, $worker_id, $job_dir, $work_dir) = @_;

    signal_fail_work($c, $job_id, $work_id, $worker_id);

    my $stage_info = $work->{job_specific};

    my $file = $stage_info->{file};
    my $url = $stage_info->{url};

    $log->info("Getting NR $file $url");

    my $ua = new LWP::UserAgent;

    my $out_file = "$job_dir/$file";

    -f $out_file and unlink($out_file);


    #my $resp = $ua->get($url, ':content_file' => $out_file);

    my $req = HTTP::Request->new('GET', $url);
    my $resp = $ua->request($req, $out_file);

    $log->info("Got response " . $resp->status_line);
    $log->info("Read file: " . `ls -l $out_file`);

    chdir($job_dir) or $log->logdie("Cannot chdir $job_dir: $!");
    unlink("formatdb.log");
    my $rc = system($ncbi_formatdb, "-p", "T", "-i", $out_file);
    $rc == 0 or $log->logdie("formatdb failed with rc=$rc");

    my $info = `ls -l $out_file*`;
    chomp($info);
    $log->info("Read file: $info");

    signal_ignore();
    $c->work_done($job_id, $work_id, $worker_id, $info);
}


sub do_wait_work
{
    my($work) = @_;


    signal_normal();
    sleep(30);
}

sub upload_output
{
    my($job_id, $work_id, $url, $path) = @_;

    $HTTP::Request::Common::DYNAMIC_FILE_UPLOAD = 1;

    #print "Uploading file $path to $url...\n";
    my $req = POST($url,
		   Content_Type => 'form-data',
		   Content => [job_id => $job_id,
			       work_id => $work_id,
			       file => [$path]]);

    my $ua = new LWP::UserAgent;

    my $resp = $ua->request($req);

    if ($resp->is_success)
    {
	# print "Successful upload:\n";
	print $resp->content;
	return 1;
    }
    else
    {
	$log->warn("Failed upload for job_id=$job_id work_id=$work_id url=$url path=$path");
	$log->warn($resp->status_line);
	return undef;
    }
}

sub signal_ignore
{

    #
    # We don't completely ignore them; we remember if we got a 
    # signal so we can quit when signals are reenabled.
    #

    sub defer_sig 
    {
	my($sig) = @_;
	$log->logwarn("Deferring signal $sig");
	$GOT_QUIT = $sig;
    }

    for my $sig (@signals)
    {
	$SIG{$sig} = \&defer_sig;
    }
}

sub signal_normal
{
    sub normal_sig
    {
	my($sig) = @_;
	$log->logwarn("Quitting on signal $sig");
	exit();
    }

    if ($GOT_QUIT)
    {
	$log->logwarn("Quitting on deferred signal $GOT_QUIT");
	exit();
    }

    for my $sig (@signals)
    {
	$SIG{$sig} = \&normal_sig;
    }
}

sub signal_fail_work
{
    my($c, $job_id, $work_id, $worker_id) = @_;

    my $handler = sub {
	my($signame) = @_;
	$log->logwarn("Exiting on signal $signame");
	abort_work($c, $job_id, $work_id, $worker_id, "Exiting on signal $signame");
	exit;
    };

    if ($GOT_QUIT)
    {
	$log->logwarn("Exiting on deferred signal $GOT_QUIT");
	abort_work($c, $job_id, $work_id, $worker_id, "Exiting on deferred signal $GOT_QUIT");
	exit;
    }
	
    for my $sig (@signals)
    {
	my $old = $SIG{$sig};
	$SIG{$sig} = $handler;
    }
}


MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3