[Bio] / Sprout / ERDBLoadGroup.pm Repository:
ViewVC logotype

View of /Sprout/ERDBLoadGroup.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.6 - (download) (as text) (annotate)
Thu Apr 2 01:37:07 2009 UTC (10 years, 5 months ago) by parrello
Branch: MAIN
CVS Tags: rast_rel_2009_05_18
Changes since 1.5: +6 -1 lines
Added memory tracing and cleanup support.

#!/usr/bin/perl -w

#
# Copyright (c) 2003-2006 University of Chicago and Fellowship
# for Interpretations of Genomes. All Rights Reserved.
#
# This file is part of the SEED Toolkit.
#
# The SEED Toolkit is free software. You can redistribute
# it and/or modify it under the terms of the SEED Toolkit
# Public License.
#
# You should have received a copy of the SEED Toolkit Public License
# along with this program; if not write to the University of Chicago
# at info@ci.uchicago.edu or the Fellowship for Interpretation of
# Genomes at veronika@thefig.info or download a copy from
# http://www.theseed.org/LICENSE.TXT.
#

package ERDBLoadGroup;

    use strict;
    use Tracer;
    use ERDB;
    use Stats;
    use Time::HiRes qw(time);
    use ERDBGenerate;

=head1 ERDB Database Load Group Object

The process of loading an ERDB database can be a simple matter of creating some
sequential files from other sequential files, or it can be a complex web of
connected sub-processes involving multiple groups of tables being loaded in
parallel by multiple worker processes. The ERDB Database Load Group object
provides housekeeping functions to simplify the management of the more complex
load tasks.

When discussing an ERDB database load, there are two similar concepts we use to
break the load into pieces: I<sections> and I<groups>. A I<section> is a
partition of the data that can be processed in isolation from other sections. A
I<group> is a set of tables that should be loaded at the same time. An ERDB load
group is a request to generate load files for one or more sections of the data
targeting a single group of tables.

A certain amount of bookkeeping is required in order to handle parallelism. For
each table, a separate output file is generated for each section. If a section
does not complete successfully, then its load file is deleted and the section
must be loaded again. Because each section has its own load file, only the
particular sections that fail need to be reloaded.

Individual load groups should subclass this object, providing a virtual override
for the L</Generate> method.

The subclass name should consist of the group name followed by noise in capital
case. So, for example, the subclass name for a group named C<Feature> would be
C<FeatureSproutLoader> or C<FeatureAttributeLoader> or something similar. The
group name should only be letters, and only the first letter should be capitalized.
This allows the load script to be case-insensitive with regard to incoming group
names.

Any working or status files generated by a subclass should have a prefix of C<dt>-something.
This will insure they are deleted by the C<clear> option of [[ERDBGeneratorPl]].

The fields in this object are as follows.

=over 4

=item db

[[ErdbPm]] object for accessing the target database

=item directory

Directory into which the load files should be placed.

=item group

name of this load group

=item label

name of this worker process

=item lastKey

ID of the last major object processed

=item loaders

hash mapping the names of the group's tables to [[ERDBGeneratePm]] objects

=item stats

statistics object that can be used to track the progress of the load

=item section

name of this data section

=item source

object used to access the data from which the load files are to be generated

=item tables

reference to a list of the names of the tables in this group

=item options

hash containing the options originally passed in to the constructor

=back

=cut

=head3 new

    my $edbl = ERDBLoadGroup->new($db, $directory, $options, @tables);

Construct a new ERDBLoadGroup object. The following parameters are expected:

=over 4

=item db

The [[ErdbPm]] object for the database being loaded.

=item options

Reference to a hash of options. At the current time, no options are needed
by this object, but they may be important to subclass objects.

=item tables

A list of the names for the tables in this load group.

=back

=cut

sub new {
    # Get the parameters.
    my ($class, $db, $options, @tables) = @_;
    # Create a statistics object 
    my $stats = Stats->new();
    # Compute the group name from the class name. It is the first word in
    # a name that is presumably capital case.
    my $group = ($class =~ /^([A-Z][a-z]+)/ ? $1 : $class);
    # Get the directory.
    my $directory = $db->LoadDirectory();
    Confess("Load directory \"$directory\" not found or invalid.") if ! -d $directory;
    # Create the ERDBLoadGroup object. Note that so far we don't have any loaders
    # defined and the section has not yet been assigned. The "ProcessSection"
    # method is used to assign the section, and the loaders are created the first
    # time it's called.
    my $retVal = { 
                    db => $db,
                    directory => $directory,
                    group => $group,
                    stats => $stats,
                    source => undef,
                    label => ($options->{label} || $$),
                    lastKey => undef,
                    loaders => {},
                    tables => \@tables,
                    section => undef,
                    options => $options
                 };
    # Bless and return it.
    bless $retVal, $class;
    return $retVal;
}

=head2 Subclass Methods

=head3 Put

    $edbl->Put($table, %fields);

Place a table record in a load file. This method is the workhorse of the
file generation phase of a load.

=over 4

=item table

Name of the table being loaded.

=item fields

Hash of field names to field values for the fields in the table.

=back

=cut

sub Put {
    # Get the parameters.
    my ($self, $table, %fields) = @_;
    # Get the loader for this table.
    my $loader = $self->{loaders}->{$table};
    # Complain if it doesn't exist.
    Confess("Table $table not found in load group $self->{group}.") if ! defined $loader;
    # Put this record to the loader's output file.
    my $bytes = $loader->Put(%fields);
    # Count the record and the bytes of data. If no bytes were output, the record
    # was discarded.
    if (! $bytes) {
        $self->Add("$table-discards" => 1);
    } else {
        $self->Add("$table-records" => 1);
        $self->Add("$table-bytes" => $bytes);
    }
}

=head3 PutE

    $edbl->PutE($table => $id, %fields);

Place an entity-based table record in a load file. The first field
specified after the table name is the ID.

=over 4

=item table

Name of the relevant table.

=item id

ID of the relevant entity.

=item fields

Hash mapping field names to values.

=back

=cut

sub PutE {
    # Get the parameters.
    my ($self, $table, $id, %fields) = @_;
    # Put the record.
    $self->Put($table, id => $id, %fields);
    # Record that we've done a putE.
    $self->Add(putE => 1);
}

=head3 PutR

    $edbl->PutR($table => $from, $to, %fields);

Place a relationship record in a load file. The first two fields
specified after the table name are the from-link and the to-link,
respectively.

=over 4

=item table

Name of the relevant relationship.

=item from

ID of the from-entity.

=item to

ID of the to-entity.

=item fields

Hash mapping field names to field values.

=back

=cut

sub PutR {
    # Get the parameters.
    my ($self, $table, $from, $to, %fields) = @_;
    # Put the record.
    $self->Put($table, 'from-link' => $from, 'to-link' => $to, %fields);
    # Record that we've done a PutR.
    $self->Add(putR => 1);
}


=head3 Add

    $edbl->Add($statName => $count);

Add the specified count to the named statistical counter. The statistical
counts are kept in an internal statistics object whose contents are
displayed when the group is finished.

=over 4

=item statName

Name of the statistic to increment.

=item count

Value by which to increment it.

=back

=cut

sub Add {
    # Get the parameters.
    my ($self, $statName, $count) = @_;
    # Update the statistic.
    $self->{stats}->Add($statName => $count);
}

=head3 AddWarning

    $edbl->AddWarning($errorType => $message);

Record a warning. Warnings indicate possible errors in the incoming data.
The first warning of a specified type is added as a message to the load
statistic. All warnings are also traced at level 3.

=over 4

=item errorType

Type of error indicated by the warning. This is used as the label when the
warning is counted in the statistics object.

=item message

Message describing the reason for the warning.

=back

=cut

sub AddWarning {
    # Get the parameters.
    my ($self, $errorType, $message) = @_;
    # Count the warning.
    my $count = $self->Add($errorType);
    # Is this the first one of this type?
    if ($count == 1) {
        # Yes, add it to the messages for the end.
        $self->{stats}->AddMessage($errorType);
    } else {
        # No, just trace it.
        Trace("Data warning: $message") if T(3);
    }
}

=head3 Track

    $edbl->Track($statName => $key, $period);

Save the specified key as the one currently in progress. If an error
occurs, the key value will appear in the output log. The named statistic
will also be incremented, and if the count is an even multiple of the stated
period, a trace message will be output at level 3.

Most load groups have a primary object type that drives the main loop. When
something goes wrong, we want to know the ID of the offending object. When
things go right, we want to know how far we've progressed toward completion.
This method can be used to record each occurrence of a primary object, and
provide a log of the progress or our current position in times of stress.

=over 4

=item statName

Name of the statistic to be incremented. This should be a plural noun
describing the object whose key is coming in.

=item key

Key value to be displayed if something goes wrong.

=item period (optional)

If specified, should be the number of objects to be counted between each
level-3 trace message.

=back

=cut

sub Track {
    # Get the parameters.
    my ($self, $statName, $key, $period) = @_;
    # Save the key.
    $self->{lastKey} = $key;
    # Count it.
    my $newValue = $self->{stats}->Add($statName => 1);
    # Do we need to output a progress message?
    if ($period && T(3) && ($newValue % $period == 0)) {
        # Yes.
        MemTrace("$newValue $statName processed by $self->{label} for $self->{group} group.");
    }
}

=head3 section

    my $sectionID = $edbl->section();

Return the ID of the current section.

=cut

sub section {
    # Get the parameters.
    my ($self) = @_;
    # Return the result.
    return $self->{section};
}

=head3 source

    my $sourceObject = $edbl->source();

Return the source object used to get the data needed for creating
the load files.

=cut

sub source {
    # Get the parameters.
    my ($self) = @_;
    # If we do not have a source object, retrieve it.
    if (! defined $self->{source}) {
        $self->{source} = $self->{db}->GetSourceObject();
    }
    # Return the result.
    return $self->{source};
}

=head3 db

    my $erdbObject = $edbl->db();

Return the database object for the target database.

=cut

sub db {
    # Get the parameters.
    my ($self) = @_;
    # Return the result.
    return $self->{db};
}

=head2 Internal Methods

=head3 ProcessSection

    my $flag = $edbl->ProcessSection($section);

Generate the load file for a particular data section. This method calls
the virtual method L</Generate> to actually put the data into the load
files, and is responsible for assigning the section and finalizing the
load files if the load is successful.

=over 4

=item section

ID of the section to load.

=item RETURN

Returns TRUE if successful, FALSE if an error prevented loading the section.

=back

=cut

sub ProcessSection {
    # Get the parameters.
    my ($self, $section) = @_;
    # Declare the return variable. We'll set it to 1 if we succeed.
    # Save the section ID.
    $self->{section} = $section;
    # Get the database object.
    my $db = $self->db();
    # Get the list of tables for this group.
    my @tables = @{$self->{tables}};
    # Should we skip this section?
    if ($self->SkipIndicated($section, \@tables)) {
        Trace("Resume mode: section $section skipped for group $self->{group}.") if T(3);
        $self->Add("section-skips" => 1);
    } else {
        # Not skipping. Start a timer and protect ourselves from errors.
        my $startTime = time();
        eval {
            # Get the loader hash.
            my $loaderHash = $self->{loaders};
            # Initialize the loaders for the necessary tables.
            for my $table (@tables) {
                # Get this table's loader.
                my $loader = $loaderHash->{$table};
                # If it doesn't exist yet, create it.
                if (! defined $loader) {
                    $loader = ERDBGenerate->new($db, $self->{directory}, $table, $self->{stats});
                    # Save it for future use.
                    $loaderHash->{$table} = $loader;
                    # Count it.
                    $self->Add(tables => 1);
                }
                $loader->Start($section);
            }
            # Generate the data to put in the newly-created load files.
            $self->Generate();
            # Release our hold on the source object. This allows the database object to
            # decide whether or not we need a new one.
            delete $self->{source};
            # Clean up the database object.
            $db->Cleanup();
        };
        # Did it work?
        if ($@) {
            # No, so emit an error message and abort all the loaders.
            $self->{stats}->AddMessage("Error loading section $section: $@");
            if (defined $self->{lastKey}) {
                $self->{stats}->AddMessage("Error occurred while processing \"$self->{lastKey}\".");
            }
            $self->Add("section-errors" => 1);
            for my $loader (values %{$self->{loaders}}) {
                $loader->Abort();
            }
        } else {
            # Yes! Finish all the loaders.
            for my $loader (values %{$self->{loaders}}) {
                $loader->Finish();
            }
            # Update the load count.
            $self->Add("section-loads" => 1);
        }
        # Update the timer.
        $self->Add(duration => (time() - $startTime));
    }
}

=head3 DisplayStats

    my $text = $edbl->DisplayStats();

Display the statistics for this load gorup.

=cut

sub DisplayStats {
    # Get the parameters.
    my ($self) = @_;
    # Return the result.
    return $self->{stats}->Show();
}

=head3 GetGroupHash

    my $groupHash = ERDBLoadGroup::GetGroupHash($erdb);

Return a hash that maps each load group in the specified database to its
constituent tables. This is useful when checking for problems with a load
or performing finishing tasks.

=over 4

=item erdb

[[ErdbPm]] database whose load information is desired.

=item RETURN

Returns a reference to a hash that maps each group name to a list of
table names.

=back

=cut

sub GetGroupHash {
    # Get the parameters.
    my ($erdb) = @_;
    # Initialize the return variable.
    my $retVal = {};
    # Loop through the list of load groups.
    for my $group ($erdb->LoadGroupList()) {
        # Stash the loader's tables in the output hash.
        $retVal->{$group} = [ GetTables($erdb, $group) ];
    }
    # Return the result.
    return $retVal;
}

=head3 GetTables

    my @tables = ERDBLoadGroup::GetTables($group);

Return the list of tables belonging to the specified load group.

=over 4

=item erdb

Return the list of tables for the specified load group.

=item group

Name of relevant group.

=item RETURN

Returns a list of a tables loaded by the specified group.

=back

=cut

sub GetTables {
    # Get the parameters.
    my ($erdb, $group) = @_;
    # Create a loader for the specified group.
    my $loader = $erdb->Loader($group, undef, {});
    # Extract the list of tables.
    my @retVal = @{$loader->{tables}};
    # Return the result.
    return @retVal;
}


=head3 ComputeGroups

    my @groupList = ERDBLoadGroup::ComputeGroups($erdb, \@groups);

Compute the actual list of groups determined by the incoming group list.

=over 4

=item erdb

[[ErdbPm]] object for the database being loaded.

=item groups

Reference to a list of group names specified on the command line. A plus sign
(C<+>) has special meaning.

=item RETURN

Returns the actual list of groups to be processed by the calling command. The
names will have been normalized to capital case.

=back

=cut

sub ComputeGroups {
    # Get the parameters.
    my ($erdb, $groups) = @_;
    # Get the complete group list in standard order.
    my @allGroups = $erdb->LoadGroupList();
    # Create a hash for validation purposes. This will map each valid group
    # name to its position in the standard order.
    my %allGroupHash;
    for (my $i = 0; $i <= $#allGroups; $i++) {
        $allGroupHash{$allGroups[$i]} = $i;
    }
    # This variable will be the index of the last-processed group in
    # the standard order. We start it before the first group in the list.
    my $lastI = -1;
    # The listed groups will be put in here.
    my @retVal;
    # Process the group list.
    for my $group (@$groups) {
        # Process this group.
        if ($group eq '+') {
            # Here we have a plus sign. Push in everything after the previous
            # group processed. Note that we'll be ending at the last position.
            # A second "+" after this one will generate no entries in the result
            # list.
            my $firstI = $lastI + 1;
            $lastI = $#allGroups;
            push @retVal, @allGroups[$firstI..$lastI];
        } elsif (exists $allGroupHash{$group}) {
            # Here we have a valid group name. Push it into the list.
            push @retVal, $group;
            # Remember its location in case there's a plus sign.
            $lastI = $allGroupHash{$group};
        } else {
            # This is an error.
            Confess("Invalid load group name $group.");
        }
    }
    # Normalize the group names and return them.
    @retVal = map { ucfirst $_ } @retVal;
    Trace("Final group list is " . join(" ", @retVal) . ".") if T(2);
    return @retVal;
}

=head3 KillFileName

    my $fileName = ERDBLoadGroup::KillFileName($erdb, $directory);

Compute the kill file name for the specified database in the specified
directory. When the [[ERDBGeneratorPl]] script sees the kill file, it will
terminate itself at the end of the current section.

=over 4

=item erdb

Database 

=item directory (optional)

Load directory for the database.

=item RETURN

Returns the specified database's kill file name. If a directory is specified,
it is prefixed to the name with an intervening slash.


=back

=cut

sub KillFileName {
    # Get the parameters.
    my ($erdb, $directory) = @_;
    # Compute the kill file name. We start with the database name in
    # lower case, then prefix it with "kill_";
    my $dbName = lc ref $erdb;
    my $retVal = ERDBGenerate::CreateFileName("kill_$dbName", undef, 'control', $directory);
    # Return the result.
    return $retVal;
}

=head3 SkipIndicated

    my $flag = $edbl->SkipIndicated($section, \@tables);

Return FALSE if the current group should be run for the current section.
If the C<resume> option is not set, this method always returns FALSE;
otherwise, it will look at the files currently in the load directory and
if enough of them are present, it will return TRUE, indicating there's
no point in generating data for the indicated tables with respect to the
current section. In other words, it will return TRUE if, for every table,
there is either a load file for that table or a load file for the
specified section of that table.

=over 4

=item section

ID of the relevant section.

=item tables

List of tables to check.

=item RETURN

Returns TRUE if load files are already generated for the specified section, else FALSE.

=back

=cut

sub SkipIndicated {
    # Get the parameters.
    my ($self, $section, $tables) = @_;
    # Declare the return variable. It's FALSE if there's no resume parameter.
    my $retVal = $self->{options}->{resume};
    # Loop through the table names while $retval is TRUE.
    for my $table (@$tables) { last if ! $retVal;
        # Compute the file names.
        my @files = map { ERDBGenerate::CreateFileName($table, $_, data => $self->{directory}) }
            (undef, $section);
        # If neither is present, we can't skip. So, if the grep below returns an empty
        # list, we set $retVal FALSE, which stops the loop.
        if (scalar(grep { -f $_ } @files) == 0) {
            $retVal = 0;
            Trace("Section $section not found for $table in $self->{group}. Regeneration required.") if T(3);
        }
    }
    # Return the result.
    return $retVal;
}


=head2 Virtual Methods

=head3 Generate

    $edbl->Generate();

Generate the data for this load group with respect to the current
section. This method must be overridden by the subclass and should call
the L</Put> method to put data into the tables.

=cut

sub Generate {
    Confess("Pure virtual method Generate called.");
}

1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3