[Bio] / FigKernelScripts / FFB2_get_oligos2.pl Repository:
ViewVC logotype

View of /FigKernelScripts/FFB2_get_oligos2.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.4 - (download) (as text) (annotate)
Tue Apr 23 20:31:41 2013 UTC (6 years, 6 months ago) by olson
Branch: MAIN
CVS Tags: rast_rel_2014_0912, rast_rel_2014_0729, HEAD
Changes since 1.3: +2 -2 lines
Update to figfam processing

# -*- perl -*-

use strict;
use Data::Dumper;
use Carp;
use Cache::Memcached::Fast;
use IPC::Run qw(start finish reap_nb run);
use Proc::ParallelLoop;
use IPC::Open3;
use POSIX ":sys_wait_h";
use FileHandle;

my $usage = "usage: FFB2_get_oligos families.2c family.functions function.index outdir [memcache-host memcache-port]";

my $min_sz = 7;
my $max_sz = 12;
my($fam2cF,$famFuncF,$famFuncIdx, $out_dir);

(
 ($fam2cF     = shift @ARGV) && open(FAM2C,"<$fam2cF") &&
 ($famFuncF   = shift @ARGV) &&
 ($famFuncIdx = shift @ARGV) &&
 ($out_dir    = shift @ARGV)
)
    || die $usage;

open(FFIDX, ">", $famFuncIdx) or die "Cannot write $famFuncIdx:$ !";

-d $out_dir || mkdir $out_dir || die "Cannot mkdir $out_dir: $!";

my $mchost = shift;
my $mcport = shift;
my $mc;

if ($mchost && $mcport)
{
    $mc = new Cache::Memcached::Fast({ servers => ["$mchost:$mcport"] } );
}

#
# We write $chunksize oligos to a sort and later merge the output files into a pipe to usable_motifs.
#

my $chunksize = 1_000_000;
my @cmd = ("sort", "-S", "100M");

my $tmp_dir = "$FIG_Config::temp/ff_bundle.$$";
-d $tmp_dir || mkdir $tmp_dir || die "Cannot mkdir $tmp_dir: $!";

use FIG;
my $fig = new FIG;
my %fam_functions = map { $_ =~ /^(FIG\d+)\t(\S.*\S)/; $1 => $2 } `cat $famFuncF`;
my $nextF = 0;
my $x = <FAM2C>;
my %function_index;
while ($x && ($x =~ /^(\S+)/))
{
    my $fam = $1;
    my $fam_function  = $fam_functions{$fam};
    my $fam_functionI = &fam_function_index(\%function_index,\$nextF,$fam_function);
    while ($x && ($x =~ /^(\S+)\t(\S+)/) && ($1 eq $fam))
    {
	my $peg = $2;
	if (my $seq = &get_translation($fig, $peg))
	{
	    my $i;
	    my $ln = length($seq);
	    for($i=0; ($i <= ($ln - $min_sz)); $i++)
	    {
		my $oligo = uc substr($seq,$i,($i < ($ln-$max_sz)) ? $max_sz : ($ln-$i));
		if ($oligo =~ /^([ACDEFGHIKLMNPQRSTVWY])[ACDEFGHIKLMNPQRSTVWY]*$/)
		{
		    my $offset = $ln - $i;
		    bundle_write($1, $oligo . "\t" . $fam_functionI . "\t" . $fam . "\tOFF" . $offset . "\n");
		}
	    }
	}
	$x = <FAM2C>;
    }
}
close(FAM2C);
close FFIDX;

bundle_finish();

my %n_written;
my %file_idx;
my %files;
my %fh;
my @pending;

sub bundle_write
{
    my($char, $str) = @_;

    my $ent = $fh{$char};

    if ($n_written{$char} >= $chunksize)
    {
	bundle_close($char);
	undef $ent;
	bundle_check();
	$n_written{$char} = 0;
    }
    if (!$ent)
    {
	$ent = bundle_open($char);
    }
    my $fh = $ent->{fh};
    print $fh $str;
    $n_written{$char}++;
}

sub bundle_open
{
    my($char) = @_;

    my $idx = $file_idx{$char} + 0;
    my $file = sprintf("$tmp_dir/bundle.$char.%05d", $idx);
    print "Write to $file\n";
    $file_idx{$char} = $idx + 1;

    push @{$files{$char}}, $file;

    my($rpipe, $wpipe);
    pipe($rpipe, $wpipe);

    my $pid = fork;
    if ($pid == 0)
    {
	open(STDIN, "<&", $rpipe) or die "Cannot dup stdin: $!";
	close($rpipe);
	close($wpipe);
	open(STDOUT, ">", $file) or die "Cannot write $file: $!";
	exec(@cmd);
	die "exec failed: $!";
    }

    close($rpipe);
    my $ent = { fh => $wpipe, file => $file, pid => $pid };
    $fh{$char} = $ent;
    return $ent;
}

sub bundle_close
{
    my($char) = @_;
    my $ent = $fh{$char};
    return unless $ent;
    $ent->{fh}->close;
    delete $fh{$char};
    push @pending, $ent;
}

sub bundle_check
{
    my @np;
    for my $ent (@pending)
    {
	my $r = waitpid($ent->{pid}, WNOHANG);
	if ($r)
	{
	    print "Wait $ent->{pid} returns $r err=$?\n";
	}
	else
	{
	    push(@np, $ent);
	}
    }
    @pending = @np;
}
    
sub bundle_finish
{
    for my $char (keys %fh)
    {
	bundle_close($char);
    }
    
    for my $ent (@pending)
    {
	print "Wait for $ent->{pid}\n";
	my $r = waitpid($ent->{pid}, 0);
	if ($r)
	{
	    print "Wait $ent->{pid} returns $r err=$?\n";
	}
    }

#    pareach [ keys %files ], sub {
#	my $char = shift;
    for my $char (keys %files)
    {
	my $out = "$out_dir/all.$char";

	my @files = @{$files{$char}};
	my $r = run ["sort", "-m", @files], ">", $out;
	print "run for @files returns $r\n";
#	my $cmd = "ls -l @files; sort -m @files > $out";
#	my $rc = system($cmd);
#	print "rc=$rc: $cmd\n";
    }
}
    
sub get_translation
{
    my($fig, $peg) = @_;

    my $tr;
    if ($mc)
    {
	$tr = $mc->get("s:$peg");
    }
    if (!$tr)
    {
	$tr = $fig->get_translation($peg);
	if ($tr && $mc)
	{
	    $mc->set("s:$peg", $tr);
	}
    }
    return $tr;
}

sub fam_function_index {
    my($fam_functions,$nextFP,$fam_function) = @_;

    defined($fam_function) || confess "bad";
    my $x = $fam_functions->{$fam_function};
    if (! defined($x)) 
    {
	$x = $fam_functions->{$fam_function} = $$nextFP;
	print FFIDX "$x\t$fam_function\n";
	$$nextFP++;
    }
    return $x;
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3