[Bio] / MGRASTBackend / mgrast_pipeline.pl Repository:
ViewVC logotype

Annotation of /MGRASTBackend/mgrast_pipeline.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (view) (download) (as text)

1 : jared 1.1 use FIG;
2 :     use FIG_Config;
3 :     use GenomeMeta;
4 :     use Data::Dumper;
5 :     use SGE;
6 : jared 1.2 use Tracer;
7 : jared 1.1
8 :     use MGRASTBackend::MGRASTPipeline qw( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done );
9 :    
10 :     use strict;
11 :    
12 : jared 1.4 TSetup("2 main FIG", "TEXT");
13 :    
14 : jared 1.1 my $job_spool_dir = $FIG_Config::mgrast_jobs;
15 :    
16 :     # Verify we have at least 10G of space left.
17 :     &check_free_space($job_spool_dir);
18 :    
19 :     # Get job list
20 :     my $jobs = &get_jobs($job_spool_dir);
21 :    
22 :     # Setup SGE
23 :     my $sge = new SGE;
24 :    
25 :     # Get pipeline stages and check job
26 :     for my $job (@$jobs){
27 : jared 1.2 Trace("Getting pipeline for $job at $job\n") if T(1);
28 : jared 1.1 my $stages = &get_pipeline_for_job("$job_spool_dir/$job");
29 :     if(defined $stages){
30 :     Trace("Stages found for job $job\n") if T(2);
31 :     check_job($job, "$job_spool_dir/$job", $stages, $sge);
32 :     } else {
33 :     Confess("Stages not found for job $job.");
34 :     return;
35 :     }
36 :     }
37 :    
38 :     sub check_job{
39 :     my($job_id, $job_dir, $stages, $sge) = @_;
40 :    
41 :     Trace("Checking $job_id at $job_dir\n") if T(1);
42 :    
43 : jared 1.5 if (-f "$job_dir/DELETE"){
44 :     Trace("Skipping job $job_id: marked as deleted\n") if T(2);
45 :     return;
46 :     }
47 :    
48 : jared 1.1 if (! -f "$job_dir/ACTIVE"){
49 :     Trace("Skipping job $job_id as not active\n") if T(2);
50 :     return;
51 :     }
52 :    
53 :     if (! -f "$job_dir/MGRAST2"){
54 :     Trace("Skipping job $job_id: it is not a mgrast2 job\n") if T(2);
55 :     return;
56 :     }
57 :    
58 :     if (-f "$job_dir/DONE"){
59 :     Trace("Skipping job $job_id as done\n") if T(2);
60 :     return;
61 :     }
62 :    
63 :     my $meta = new GenomeMeta("metajob_$job_id", "$job_dir/meta.xml");
64 :    
65 :     if (!$meta){
66 :     Confess("Could not create meta for $job_dir/meta.xml");
67 :     return;
68 :     }
69 :    
70 :     for my $stage (@$stages){
71 :     my($name, $processor) = @$stage;
72 :    
73 :     my $status = $meta->get_metadata("status.$name");
74 :    
75 :     next if $status eq "complete";
76 :     return if $status eq "error" or $status eq 'in_progress' or $status eq 'running';
77 :    
78 :     #
79 :     # Stage is not complete and not in error. Process it.
80 :     #
81 :     # Note that if the stage is marked as queued, we will
82 :     # invoke the processor. This as designed, so that an
83 :     # SGE-aware processor can ensure the task is still queued
84 :     # and hasn't failed in a way that it did not get marked
85 :     # as running.
86 :     #
87 :    
88 :     eval {
89 :     if (ref($processor) eq 'CODE'){
90 : jared 1.3 &$processor($name, $job_id, $job_dir, $meta, $sge);
91 : jared 1.1 }
92 :     elsif (ref($processor)){
93 :     print Dumper($processor);
94 : jared 1.4 $processor->process($name, $job_id, $job_dir, $meta, $sge);
95 : jared 1.1 }
96 :     else{
97 :     warn "Unknown processor " . Dumper($processor);
98 :     }
99 :     };
100 :    
101 :     if ($@){
102 :     print "Error processing job $job_id\n$@\n";
103 :     }
104 :     return;
105 :     }
106 :    
107 :     #
108 :     # This job is done.
109 :     #
110 :    
111 :     &mark_job_done( $job_id, $job_dir, $meta);
112 :     }
113 :    
114 :    
115 :    
116 :    
117 :    

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3