[Bio] / MGRASTBackend / mgrast_pipeline.pl Repository:
ViewVC logotype

Annotation of /MGRASTBackend/mgrast_pipeline.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (view) (download) (as text)

1 : jared 1.1 use FIG;
2 :     use FIG_Config;
3 :     use GenomeMeta;
4 :     use Data::Dumper;
5 :     use SGE;
6 : jared 1.2 use Tracer;
7 : jared 1.1
8 :     use MGRASTBackend::MGRASTPipeline qw( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done );
9 :    
10 :     use strict;
11 :    
12 :     my $job_spool_dir = $FIG_Config::mgrast_jobs;
13 :    
14 :     # Verify we have at least 10G of space left.
15 :     &check_free_space($job_spool_dir);
16 :    
17 :     # Get job list
18 :     my $jobs = &get_jobs($job_spool_dir);
19 :    
20 :     # Setup SGE
21 :     my $sge = new SGE;
22 :    
23 :     # Get pipeline stages and check job
24 :     for my $job (@$jobs){
25 : jared 1.2 Trace("Getting pipeline for $job at $job\n") if T(1);
26 : jared 1.1 my $stages = &get_pipeline_for_job("$job_spool_dir/$job");
27 :     if(defined $stages){
28 :     Trace("Stages found for job $job\n") if T(2);
29 :     check_job($job, "$job_spool_dir/$job", $stages, $sge);
30 :     } else {
31 :     Confess("Stages not found for job $job.");
32 :     return;
33 :     }
34 :     }
35 :    
36 :     sub check_job{
37 :     my($job_id, $job_dir, $stages, $sge) = @_;
38 :    
39 :     Trace("Checking $job_id at $job_dir\n") if T(1);
40 :    
41 :     if (! -f "$job_dir/ACTIVE"){
42 :     Trace("Skipping job $job_id as not active\n") if T(2);
43 :     return;
44 :     }
45 :    
46 :     if (! -f "$job_dir/MGRAST2"){
47 :     Trace("Skipping job $job_id: it is not a mgrast2 job\n") if T(2);
48 :     return;
49 :     }
50 :    
51 :     if (-f "$job_dir/DONE"){
52 :     Trace("Skipping job $job_id as done\n") if T(2);
53 :     return;
54 :     }
55 :    
56 :     my $meta = new GenomeMeta("metajob_$job_id", "$job_dir/meta.xml");
57 :    
58 :     if (!$meta){
59 :     Confess("Could not create meta for $job_dir/meta.xml");
60 :     return;
61 :     }
62 :    
63 :     for my $stage (@$stages){
64 :     my($name, $processor) = @$stage;
65 :    
66 :     my $status = $meta->get_metadata("status.$name");
67 :    
68 :     next if $status eq "complete";
69 :     return if $status eq "error" or $status eq 'in_progress' or $status eq 'running';
70 :    
71 :     #
72 :     # Stage is not complete and not in error. Process it.
73 :     #
74 :     # Note that if the stage is marked as queued, we will
75 :     # invoke the processor. This as designed, so that an
76 :     # SGE-aware processor can ensure the task is still queued
77 :     # and hasn't failed in a way that it did not get marked
78 :     # as running.
79 :     #
80 :    
81 :     eval {
82 :     if (ref($processor) eq 'CODE'){
83 : jared 1.3 &$processor($name, $job_id, $job_dir, $meta, $sge);
84 : jared 1.1 }
85 :     elsif (ref($processor)){
86 :     print Dumper($processor);
87 :     }
88 :     else{
89 :     warn "Unknown processor " . Dumper($processor);
90 :     }
91 :     };
92 :    
93 :     if ($@){
94 :     print "Error processing job $job_id\n$@\n";
95 :     }
96 :     return;
97 :     }
98 :    
99 :     #
100 :     # This job is done.
101 :     #
102 :    
103 :     &mark_job_done( $job_id, $job_dir, $meta);
104 :     }
105 :    
106 :    
107 :    
108 :    
109 :    

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3