[Bio] / MGRASTBackend / mgrast_pipeline.pl Repository:
ViewVC logotype

Annotation of /MGRASTBackend/mgrast_pipeline.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (view) (download) (as text)

1 : jared 1.1 use FIG;
2 :     use FIG_Config;
3 :     use GenomeMeta;
4 :     use Data::Dumper;
5 :     use SGE;
6 :    
7 :     use MGRASTBackend::MGRASTPipeline qw( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done );
8 :    
9 :     use strict;
10 :    
11 :     my $job_spool_dir = $FIG_Config::mgrast_jobs;
12 :    
13 :     # Verify we have at least 10G of space left.
14 :     &check_free_space($job_spool_dir);
15 :    
16 :     # Get job list
17 :     my $jobs = &get_jobs($job_spool_dir);
18 :    
19 :     # Setup SGE
20 :     my $sge = new SGE;
21 :    
22 :     # Get pipeline stages and check job
23 :     for my $job (@$jobs){
24 :     Trace("Getting pipeline for $job at $job_dir\n") if T(1);
25 :     my $stages = &get_pipeline_for_job("$job_spool_dir/$job");
26 :     if(defined $stages){
27 :     Trace("Stages found for job $job\n") if T(2);
28 :     check_job($job, "$job_spool_dir/$job", $stages, $sge);
29 :     } else {
30 :     Confess("Stages not found for job $job.");
31 :     return;
32 :     }
33 :     }
34 :    
35 :     sub check_job{
36 :     my($job_id, $job_dir, $stages, $sge) = @_;
37 :    
38 :     Trace("Checking $job_id at $job_dir\n") if T(1);
39 :    
40 :     if (! -f "$job_dir/ACTIVE"){
41 :     Trace("Skipping job $job_id as not active\n") if T(2);
42 :     return;
43 :     }
44 :    
45 :     if (! -f "$job_dir/MGRAST2"){
46 :     Trace("Skipping job $job_id: it is not a mgrast2 job\n") if T(2);
47 :     return;
48 :     }
49 :    
50 :     if (-f "$job_dir/DONE"){
51 :     Trace("Skipping job $job_id as done\n") if T(2);
52 :     return;
53 :     }
54 :    
55 :     my $meta = new GenomeMeta("metajob_$job_id", "$job_dir/meta.xml");
56 :    
57 :     if (!$meta){
58 :     Confess("Could not create meta for $job_dir/meta.xml");
59 :     return;
60 :     }
61 :    
62 :     for my $stage (@$stages){
63 :     my($name, $processor) = @$stage;
64 :    
65 :     my $status = $meta->get_metadata("status.$name");
66 :    
67 :     next if $status eq "complete";
68 :     return if $status eq "error" or $status eq 'in_progress' or $status eq 'running';
69 :    
70 :     #
71 :     # Stage is not complete and not in error. Process it.
72 :     #
73 :     # Note that if the stage is marked as queued, we will
74 :     # invoke the processor. This as designed, so that an
75 :     # SGE-aware processor can ensure the task is still queued
76 :     # and hasn't failed in a way that it did not get marked
77 :     # as running.
78 :     #
79 :    
80 :     eval {
81 :     if (ref($processor) eq 'CODE'){
82 :     #&$processor($name, $job_id, $job_dir, $meta, $sge);
83 :     print "&$processor($name, $job_id, $job_dir, $meta, $sge)";
84 :     }
85 :     elsif (ref($processor)){
86 :     print Dumper($processor);
87 :     print "$processor->process($name, $job_id, $job_dir, $meta, $sge)";
88 :     }
89 :     else{
90 :     warn "Unknown processor " . Dumper($processor);
91 :     }
92 :     };
93 :    
94 :     if ($@){
95 :     print "Error processing job $job_id\n$@\n";
96 :     }
97 :     return;
98 :     }
99 :    
100 :     #
101 :     # This job is done.
102 :     #
103 :    
104 :     &mark_job_done( $job_id, $job_dir, $meta);
105 :     }
106 :    
107 :    
108 :    
109 :    
110 :    

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3