[Bio] / FigKernelScripts / p3x-propagate-family-names.pl Repository:
ViewVC logotype

View of /FigKernelScripts/p3x-propagate-family-names.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (download) (as text) (annotate)
Wed Nov 2 16:25:31 2016 UTC (3 years ago) by olson
Branch: MAIN
CVS Tags: HEAD
P3 utility scripts.

#
# Propagate family names to a new merged family set.
#
# Requires the following parameters:
#
# old-families		Tab-delimited file of merged family data for old families
# old-family-data	Directory containing per-genus data files for old families
# new-families		Tab-delimited file of merged family data for new families
# new-family-data	Directory containing per-genus data files for new families


use strict;
use DB_File;
use Getopt::Long::Descriptive;
use Parallel::ForkManager;
use Data::Dumper;
use IO::Pipe;
use IO::Select;

STDOUT->autoflush(1);
STDERR->autoflush(1);

my($opt, $usage) = describe_options("%c %o fam-type old-fams old-data new-fams new-data",
				    ["fam-type is either 'local' or 'global'"],
				    [],
				    ["genus|g=s" => "read NR data for this genus only (for use with local fams for a given genus)"],
				    ["log|l=s" => "succinct logfile"],
				    ["show-progress" => "Show progress per-family"],
				    ["debug|d" => "enable debugging output"],
				    ["parallel|p=i" => "Parallelism", { default => 1 }],
				    ["help|h" => "Show this help message"],
				    );
print($usage->text), exit 0 if $opt->help;
die($usage->text), if @ARGV != 5;

my $log_fh;
if ($opt->log)
{
    open($log_fh, ">", $opt->log) or die "Cannot open logfile " . $opt->log . ": $!";
    $log_fh->autoflush(1);
}

my $fam_type = shift;
my $old_fams = shift;
my $old_data = shift;
my $new_fams = shift;
my $new_data = shift;

my %fam_type_to_key = (global => 'gf', local => 'lf');
my $fam_type_key = $fam_type_to_key{$fam_type};
$fam_type_key or die "Invalid family type specified (valid values are 'local' and 'global')\n";

print STDERR "fam_type_key=$fam_type_key\n";

-f $old_fams or die "Old fams file $old_fams does not exist\n";
-d $old_data or die "Old fams data dir $old_data does not exist\n";
-f $new_fams or die "New fams file $new_fams does not exist\n";
-d $new_data or die "New fams data dir $new_data does not exist\n";

my $forkmgr = Parallel::ForkManager->new($opt->parallel);

my($old_md5, $old_is_key, $old_md5_to_peg, $new_md5, $new_is_key, $new_md5_to_peg) = read_pegsyn([$old_data, $new_data], $opt->genus, $forkmgr);
#my($new_md5, $new_is_key, $new_md5_to_peg) = read_pegsyn($new_data, $forkmgr);
#print Dumper($old_md5, $new_md5);
my $old_exists = {};
$old_exists->{$_} = 1 foreach values %$old_md5;

my $new_exists = {};
$new_exists->{$_} = 1 foreach values %$new_md5;

{
    my($old_peg_to_fam, $old_gfam_to_peg, $old_lfam_to_peg, $old_fam_to_fun) = read_fams($old_fams, $old_md5, $old_is_key);
    my($new_peg_to_fam, $new_gfam_to_peg, $new_lfam_to_peg, $new_fam_to_fun) = read_fams($new_fams, $new_md5, $new_is_key);

    if ($fam_type_key eq 'lf')
    {
	propagate_names('lf',
			$old_peg_to_fam, $old_lfam_to_peg, $old_fam_to_fun,
			$new_peg_to_fam, $new_lfam_to_peg, $new_fam_to_fun);
    }
    else
    {
	propagate_names('gf',
			$old_peg_to_fam, $old_gfam_to_peg, $old_fam_to_fun,
			$new_peg_to_fam, $new_gfam_to_peg, $new_fam_to_fun);
    }
}

sub propagate_names
{
    my($fam_key,
       $old_peg_to_fam, $old_fam_to_peg, $old_fam_to_fun,
       $new_peg_to_fam, $new_fam_to_peg, $new_fam_to_fun) = @_;

    print STDERR "Propagate names using $fam_key\n";

    my %old_fam_used;
    my %new_fam_name;
    my $new_idx = 0;

    #
    # Phase 1:
    # For each old family OF:
    #   Determine the set of new fams each peg maps to.
    #   If each of these new families only contains pegs from OF, we can map.
    #   If there is more than one family, we are splitting. Choose one name to propagate and create new names for the others.
    #
    

    while (my($fam, $pegs) = each(%$old_fam_to_peg))
    {
	my %nfams;
	print "Process old family $fam $old_fam_to_fun->{$fam}\n" if $opt->show_progress;
	my $bad = 0;
	my %nfam;
	my %nfam_checked;
    OUTER:
	for my $peg (@$pegs)
	{
	    #
	    # If this peg does not exist in the new family data, disregard.
	    #
	    if (!$new_exists->{$peg})
	    {
		print "  No new for $peg\n" if $opt->debug;
		next;
	    }
	    
	    #
	    # $nfam is the new family corresponding to this peg.
	    #
	    my $nfam = $new_peg_to_fam->{$peg}->{$fam_key};
	    my $nfun = $new_fam_to_fun->{$nfam};
	    
	    print "  O $peg => N $nfam $nfun " . ($nfam_checked{$nfam} ? "(already checked)" : "") . "\n"
		if $opt->debug;

	    next if $nfam_checked{$nfam}++;

	    #
	    # If we haven't checked $nfam yet, evaluate all pegs in $nfam
	    # If any peg in $nfam exists in the old families, and the
	    # old family that it is in is not the same as the old family $fam
	    # that we are currently evaluating, then mark this old family
	    # as bad for the purposes of propagating the name as a clean
	    # rename or split.
	    #
	    for my $npeg (@{$new_fam_to_peg->{$nfam}})
	    {
		#
		# $npeg is a peg in the new family corresponding to the peg
		# in the old family that we are considering.
		# $npeg_ofam is the old family that it corresponds to.
		#
		
		if ($old_exists->{$npeg})
		{
		    my $npeg_ofam = $old_peg_to_fam->{$npeg}->{$fam_key};
		    
		    if ($npeg_ofam eq $fam)
		    {
			print "    N $npeg $npeg_ofam OK\n" if $opt->debug;
			$nfam{$nfam}++;
		    }
		    else
		    {
			print "    N $npeg $npeg_ofam BAD\n" if $opt->debug;
			$bad++;
			last OUTER if $bad > 10;
		    }
		}	    
	    }
	}
	if (!$bad)
	{
	    if (%nfam == 1)
	    {
		my($nfam) = keys %nfam;
		$new_fam_name{$nfam} = $fam;
		$old_fam_used{$fam} = $nfam;
		print "$nfam NOW $fam\n";
		print $log_fh "$nfam NOW $fam\n" if $log_fh;
	    }
	    elsif (%nfam > 1)
	    {
		print "SPLIT O $fam => N " . join(" ", keys %nfam) . "\n";
		print $log_fh "SPLIT O $fam => N " . join(" ", keys %nfam) . "\n" if $log_fh;
		my($nfam, @rest) = sort { $nfam{$b} <=> $nfam{$a} } keys %nfam;
		$new_fam_name{$nfam} = $fam;
		$old_fam_used{$fam} = $nfam;
		print "$nfam NOW $fam\n";
		print $log_fh "$nfam NOW $fam\n" if $log_fh;
		for my $r (@rest)
		{
		    my $nm = "NEW_" . $new_idx++;
		    $new_fam_name{$r} = $nm;
		    print "$r NOW $nm\n";
		    print $log_fh "$r NOW $nm\n" if $log_fh;
		}
	    }
	}	    
	print Dumper($bad, \%nfam) if $opt->debug;
    }
    
    #
    # Phase 2.
    # For each new family NF that has not been renamed,
    # find the set of old families that correspond to its pegs. If
    # each peg in those old families is in NF, this is a join.
    #
    
    print "PHASE 2\n";
    while (my($nfam, $npegs) = each(%$new_fam_to_peg))
    {
	next if $new_fam_name{$nfam};

	print "Process new family $nfam $new_fam_to_fun->{$nfam}\n" if $opt->show_progress;
	
	my %mapped_nfams;
	my %ocount; 		#  count of refs to old family.
	my @npegs_that_exist = grep { $old_exists->{$_} } @$npegs;
	if (@npegs_that_exist == 0)
	{
	    print "  All new pegs\n" if $opt->debug;

	    my $nm = "NEW_" . $new_idx++;
	    $new_fam_name{$nfam} = $nm;
	    print "$nfam NOW $nm\n";
	    print $log_fh "$nfam NOW $nm\n" if $log_fh;
	}
	else
	{
	    for my $npeg (@npegs_that_exist)
	    {
		#
		# For each peg in the new family, if the corresponding peg exists
		# in the old families then $ofam is that family.
		#
		# We examine each peg in the old family ($opeg below)
		# and map it back to the corresponding new family.
		# We maintain a count of the new new families thus mapped.
		#
		# If all of those mappings fold back to the same new family $nfam
		# then we have a case where the old families joined to form the new family.
		#
		
		my $ofam = $old_peg_to_fam->{$npeg}->{$fam_key};
		print "  O $npeg $old_md5_to_peg->{$npeg} $ofam $old_fam_to_fun->{$ofam}\n" if $opt->debug;
		#
		# We only need to check each ofam once.
		#
		if ($ocount{$ofam} == 0)
		{
		    for my $opeg (@{$old_fam_to_peg->{$ofam}})
		    {
			next unless $new_exists->{$opeg};
			my $mapped_nfam = $new_peg_to_fam->{$opeg}->{$fam_key};
			print "    O $opeg $old_md5_to_peg->{$opeg} N $mapped_nfam $new_fam_to_fun->{$mapped_nfam}\n" if $opt->debug;
			$mapped_nfams{$mapped_nfam}++;
		    }
		}
		$ocount{$ofam}++;
	    }

	    print Dumper($nfam, \%mapped_nfams, \%ocount) if $opt->debug;
	    if (keys %mapped_nfams == 1)
	    {
		#
		# Process a clean join. $ocount{$ofam} holds the number of times $ofam occurs in the
		# join, so take the largest value to use as the name for the new family.
		#
		my($oname, @rest) = sort { $ocount{$b} <=> $ocount{$a} } keys %ocount;
		$new_fam_name{$nfam} = $oname;
		$old_fam_used{$oname} = $nfam;
		print "$nfam NOW $oname\n";
		print "JOIN $oname @rest => $nfam\n";
		print $log_fh "$nfam NOW $oname\n" if $log_fh;
		print $log_fh "JOIN $oname @rest => $nfam\n" if $log_fh;
		if ($opt->debug)
		{
		    for my $n ($oname, @rest)
		    {
			print "\t$n\t$old_fam_to_fun->{$n}\n";
		    }
		}
	    }
	    else
	    {
		if ($opt->debug)
		{
		    for my $ofam (keys %ocount)
		    {
			print "  O: $ofam $old_fam_to_fun->{$ofam}\n";
		    }
		    for my $nfam (keys %mapped_nfams)
		    {
			print "  N: $nfam $new_fam_to_fun->{$nfam}\n";
		    }
		}
	    }
	}
    }

    #
    # Phase 3:
    #
    # This is a mop-up phase to try to propagate good enough names to more mixed groups.
    #
    # For each old family OF:
    #   Determine the set of new fams each peg maps to.
    #   Determine if there is a new family with sufficient representation (based on the
    #   number of pegs mapping to it in the new families). If there is, label that
    #   family with OF.
    #

    print "PHASE 3\n";
    while (my($fam, $pegs) = each (%$old_fam_to_peg))
    {
	next if $old_fam_used{$fam};
	print "Process old family $fam $old_fam_to_fun->{$fam}\n" if $opt->show_progress;

	my %nfams;
	my $n = 0;

	for my $peg (@$pegs)
	{
	    next unless $new_exists->{$peg};
	    my $nfam = $new_peg_to_fam->{$peg}->{$fam_key};
	    $nfams{$nfam}++;
	    $n++;
	}

	if ($n == 0)
	{
	    print "  No pegs, skipping\n" if $opt->debug;
	    next;
	}

	my $thresh = 0.75;
	my @by_weight = sort { $nfams{$b} <=> $nfams{$a} } keys %nfams;
	for my $nfam (@by_weight)
	{
	    my $r = $nfams{$nfam} / $n;
	    my $renamed = $new_fam_name{$nfam};
	    printf("  %02f: $nfam $new_fam_to_fun->{$nfam} already renamed: $renamed\n", $r) if $opt->debug;
	}
	my $cand = $by_weight[0];
	my $frac = $nfams{$cand} / $n;
	my $renamed = $new_fam_name{$cand};
	if ($frac > $thresh && !$renamed)
	{
	    $new_fam_name{$cand} = $fam;
	    $old_fam_used{$fam} = $cand;
	    printf "$cand NOW $fam weight=%2f\n", $frac;
	    printf $log_fh "$cand NOW $fam weight=%2f\n", $frac if $log_fh;
	}
    }

    my $n_new_fams = keys %$new_fam_to_peg;
    my $n_mapped = keys %new_fam_name;
    print "$n_new_fams new families, propagated $n_mapped names\n";
    print "Unmapped new:\n";
    print "\t$_\t$new_fam_to_fun->{$_}\n" foreach sort { $a <=> $b } grep { !$new_fam_name{$_} } keys %$new_fam_to_peg;
    if ($log_fh)
    {
	print $log_fh "$n_new_fams new families, propagated $n_mapped names\n";
	print $log_fh "Unmapped new:\n";
	print $log_fh "\t$_\t$new_fam_to_fun->{$_}\n" foreach sort { $a <=> $b } grep { !$new_fam_name{$_} } keys %$new_fam_to_peg;
    }
}

#
# Read the families file and extract tables mapping from peg => family and family => peg.
# Values are hashes so we can mark status.
#
sub read_fams
{
    my($fams, $md5, $is_key) = @_;

    open(my $fh, "<", $fams) or die "Cannot open $fams: $!";

    # GF00000000  2  2  fig|655438.3.peg.1461 294 (2E,6E)-farnesyl diphosphate synthase (EC 2.5.1.10)  62  Cycloclasticus  62
    my $to_fam = {};
    my $gfam_to_peg = {};
    my $lfam_to_peg = {};
    my $fam_to_fun = {};
    while (<$fh>)
    {
	chomp;
	my($gf, $n, $ng, $peg, $len, $fun, $fam, $genus) = split(/\t/);
	my $lf = "$genus.$fam";
	next unless $is_key->{$peg};
	$fam_to_fun->{$lf} = $fam_to_fun->{$gf} = $fun;
	my $m = $md5->{$peg};
	$to_fam->{$m} = { gf => $gf, lfnum => $fam, lf => $lf, genus => $genus, func => $fun };
	push(@{$gfam_to_peg->{$gf}}, $m);
	push(@{$lfam_to_peg->{$lf}}, $m);
    }
    return($to_fam, $gfam_to_peg, $lfam_to_peg, $fam_to_fun);
}

sub read_pegsyn
{
    my($data_list, $genus_to_process, $forkmgr) = @_;

    my @res;
    my @work;
    for my $didx (0..$#$data_list)
    {
	my $data = $data_list->[$didx];

	my $md5 = {};
	my $is_key = {};
	my $to_peg = {};
	$res[$didx] = [$md5, $is_key, $to_peg];

	#
	# Pegsyn is in a nr/peg.synonyms file inside each subdir of $data.
	#
	opendir(D1, $data) or die "Cannot opendir $data: $!";
	while (my $genus = readdir(D1))
	{
	    next if $genus =~ /^\./;
	    next if $genus_to_process && $genus_to_process ne $genus;
	    next unless -d "$data/$genus";
	    my $pegsyn = "$data/$genus/nr/peg.synonyms";
	    my $sz = -s $pegsyn;
	    die "Missing $pegsyn" if ! -f _;
	    push(@work, [$sz, $didx, $pegsyn]);
	}
	closedir(D1);
    }

    my @work_assigned;
    my $n = $opt->parallel;
    $work_assigned[$_] = [0, []] for 0 .. $n - 1;

    for my $item (sort { $b->[0] <=> $a->[0] } @work)
    {
	my $idxMin = 0;
	$work_assigned[$idxMin]->[0] < $work_assigned[$_]->[0] or $idxMin = $_ for 1..$#work_assigned;
	my $e = $work_assigned[$idxMin];
	$e->[0] += $item->[0];
	push(@{$e->[1]}, $item);
    }

    for my $wa (@work_assigned)
    {
	my $n = @{$wa->[1]};
	print "$wa->[0] $n\n";
    }

    my $sel = IO::Select->new;

    my $packet_size = 1_000_000;

    for my $work (@work_assigned)
    {
	my($size, $files) = @$work;

	my $pipe = IO::Pipe->new;

	my $pid = $forkmgr->start;
	if ($pid)
	{
	    $pipe->reader;
	    $pipe->blocking(0);
	    $sel->add($pipe);
	}
	else
	{
	    $pipe->writer;
	    $pipe->autoflush(1);

	    for my $this_work (@$files)
	    {
		my($sz, $didx, $file) = @$this_work;
		
		my $fh;

		my $packet;

		print "$$ $file\n";
		open($fh, "<", $file) or die "Cannot read $file: $!";
		while (<$fh>)
		{
		    # gnl|md5|8eea9950bc1e0f99f75a4a976304d636,169	fig|438.15.peg.2419,169;fig|940265.3.peg.655,169;fig|940265.4.peg.1650,169
		    
		    if (/^gnl\|md5\|([0-9a-f]{32})[^\t]+\t(.*)/)
		    {
			my $md5x = $1;
			my $flist = $2;
			my @dat;
			while ($flist =~ /(fig\|\d+\.\d+\.[^.]+\.\d+)/g)
			{
			    push(@dat, $1);
			}

			$packet .= join("\t", $md5x, @dat) . "\n";
			my $plen = length($packet);
			if ($plen > $packet_size)
			{
			    my $hdr = pack("LL", $plen, $didx);
			    my $hlen = length($hdr);
			    # print Dumper($hlen, $plen, unpack("LL", $hdr));

			    print $pipe $hdr, $packet;
			    $packet = '';
			}
		    }
		}
		my $plen = length($packet);
		if ($plen > 0)
		{
		    my $hdr = pack("LL", $plen, $didx);
		    
		    my $hlen = length($hdr);
		    # print Dumper($hlen, $plen, unpack("LL", $hdr));
		   
		    print $pipe $hdr, $packet;
		    $packet = '';
		}
		close($fh);
	    }
	    $pipe->close();
	    $forkmgr->finish();
	}
    }

    while ($sel->count)
    {
	my @read = $sel->can_read();

	for my $fh (@read)
	{
	    my $buf;
	    my $got = read($fh, $buf, 8);
	    if (!$got)
	    {
		print "EOF\n";
		$sel->remove($fh);
		$fh->close();
	    }
	    else
	    {
		my($n, $didx) = unpack("LL", $buf);

		undef $buf;
		my $data = '';

		# print "Got pkt got=$got $n didx=$didx\n";
		while ($n > 0)
		{
		    my $got = read($fh, $buf, $n);
		    # print "$didx: $res[$didx] len=$got\n";
		    $n -= $got;
		    $data .= $buf;
		}

		my($md5, $is_key, $to_peg) = @{$res[$didx]};

		for my $l (split(/\n/, $data))
		{
		    my($md5x, $key, @rest) = split(/\t/, $l);
		    $is_key->{$key} = 1;
		    $to_peg->{$md5x} = $key;
		    $md5->{$_} = $md5x foreach $key, @rest;

		}
	    }
	}
    }
    my @list = map { @$_ } @res;
    return @list;
}

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3