Parent Directory
|
Revision Log
Revision 1.13 - (view) (download) (as text)
1 : | jared | 1.1 | package MGRASTBackend::MGRASTPipeline; |
2 : | |||
3 : | use ClusterStage; | ||
4 : | use Filesys::DfPortable; | ||
5 : | use Mail::Mailer; | ||
6 : | use Job48; | ||
7 : | use FIG; | ||
8 : | |||
9 : | use strict; | ||
10 : | use warnings; | ||
11 : | |||
12 : | use base qw( Exporter ); | ||
13 : | our @EXPORT = qw ( check_free_space get_jobs get_pipeline_for_job process_upload mark_job_done); | ||
14 : | |||
15 : | 1; | ||
16 : | |||
17 : | |||
18 : | ######################################## | ||
19 : | # | ||
20 : | # Basic methods | ||
21 : | # | ||
22 : | ######################################### | ||
23 : | |||
24 : | sub check_free_space { | ||
25 : | my ($job_spool_dir) = @_; | ||
26 : | my $df = dfportable($job_spool_dir, 1024*1024*1024); | ||
27 : | if (!defined($df)){ | ||
28 : | die "dfportable call failed on $job_spool_dir: $!"; | ||
29 : | } | ||
30 : | if ($df->{bavail} < 10){ | ||
31 : | die sprintf "Not enough free space available (%.1f GB) in $job_spool_dir", $df->{bavail}; | ||
32 : | } | ||
33 : | return; | ||
34 : | } | ||
35 : | |||
36 : | sub get_jobs { | ||
37 : | my ($job_spool_dir) = @_; | ||
38 : | opendir(D, $job_spool_dir) or die "Cannot open job directory $job_spool_dir: $!\n"; | ||
39 : | my @jobs = sort { $a <=> $b } grep { /^\d+$/ and -d "$job_spool_dir/$_" } readdir(D); | ||
40 : | return \@jobs; | ||
41 : | } | ||
42 : | |||
43 : | sub get_pipeline_for_job { | ||
44 : | my ($job_dir) = @_; | ||
45 : | jared | 1.4 | if(! -f "$job_dir/PIPELINE"){ |
46 : | jared | 1.1 | return &{&pipeline("default")}; |
47 : | } else { | ||
48 : | jared | 1.4 | my $pipe = &FIG::file_head("$job_dir/PIPELINE", 1); |
49 : | jared | 1.1 | chomp $pipe; |
50 : | if (ref(&pipeline($pipe)) eq 'CODE'){ | ||
51 : | return &{&pipeline($pipe)}; | ||
52 : | } else { | ||
53 : | return undef; | ||
54 : | } | ||
55 : | } | ||
56 : | } | ||
57 : | |||
58 : | ######################################## | ||
59 : | # | ||
60 : | # Pipelines | ||
61 : | # | ||
62 : | ######################################### | ||
63 : | |||
64 : | sub pipeline { | ||
65 : | my ($name) = @_; | ||
66 : | my %pipelines = ( | ||
67 : | default => \&default_process, | ||
68 : | dsouza | 1.13 | old => \&old_default_process, |
69 : | jared | 1.1 | ); |
70 : | |||
71 : | if(defined $pipelines{$name}){ | ||
72 : | return $pipelines{$name}; | ||
73 : | } else { | ||
74 : | jared | 1.7 | return $pipelines{default}; |
75 : | jared | 1.1 | } |
76 : | } | ||
77 : | |||
78 : | sub default_process { | ||
79 : | return [[uploaded => \&process_upload], | ||
80 : | [preprocess => ClusterStage->new('mg_preprocess', | ||
81 : | sge_flag => "-l mg_preprocess", | ||
82 : | )], | ||
83 : | [sims => ClusterStage->new('mg_sims', | ||
84 : | dsouza | 1.13 | sge_flag => "-l mg_postproc_taxa_sims", |
85 : | )], | ||
86 : | [check_sims => ClusterStage->new('mg_check_sims', | ||
87 : | sge_flag => "-l mg_postproc_taxa_sims", | ||
88 : | )], | ||
89 : | |||
90 : | [export => ClusterStage->new('mg_export', | ||
91 : | sge_flag => "-l mg_postproc_taxa_sims", | ||
92 : | )], | ||
93 : | ]; | ||
94 : | } | ||
95 : | |||
96 : | sub old_default_process { | ||
97 : | return [[uploaded => \&process_upload], | ||
98 : | [preprocess => ClusterStage->new('mg_preprocess', | ||
99 : | sge_flag => "-l mg_preprocess", | ||
100 : | )], | ||
101 : | [sims => ClusterStage->new('mg_sims', | ||
102 : | jared | 1.1 | start_locally => 1, |
103 : | )], | ||
104 : | [check_sims => ClusterStage->new('mg_check_sims', | ||
105 : | start_locally => 1, | ||
106 : | )], | ||
107 : | [create_seed_org => ClusterStage->new('mg_create_seed_org', | ||
108 : | sge_flag => "-l mg_postproc_taxa_sims", | ||
109 : | )], | ||
110 : | jared | 1.6 | [export_to_genbank => ClusterStage->new('mg_sims_to_gff_and_gbk', |
111 : | jared | 1.3 | start_locally => 1, |
112 : | jared | 1.1 | )], |
113 : | ]; | ||
114 : | } | ||
115 : | |||
116 : | ######################################## | ||
117 : | # | ||
118 : | # Pipeline Stages | ||
119 : | # | ||
120 : | ######################################### | ||
121 : | |||
122 : | sub process_upload | ||
123 : | { | ||
124 : | return; | ||
125 : | } | ||
126 : | |||
127 : | sub mark_job_done | ||
128 : | { | ||
129 : | |||
130 : | print "Attempting to Email user\n"; | ||
131 : | jared | 1.2 | |
132 : | jared | 1.1 | my($job_id, $job_dir, $meta, $req) = @_; |
133 : | |||
134 : | if (open(D, ">$job_dir/DONE")) | ||
135 : | { | ||
136 : | print D time . "\n"; | ||
137 : | close(D); | ||
138 : | } | ||
139 : | else | ||
140 : | { | ||
141 : | warn "Error opening $job_dir/DONE: $!\n"; | ||
142 : | } | ||
143 : | |||
144 : | my $job = new Job48($job_id); | ||
145 : | |||
146 : | my $userobj = $job->getUserObject(); | ||
147 : | |||
148 : | print "setting meta $meta\n"; | ||
149 : | $meta->set_metadata("status.final","complete"); | ||
150 : | print "setting meta $meta .. done\n"; | ||
151 : | |||
152 : | if ($userobj) | ||
153 : | { | ||
154 : | my $email = $userobj->email(); | ||
155 : | my $name = join(" " , $userobj->firstname(), $userobj->lastname()); | ||
156 : | |||
157 : | my $full = $name ? "$name <$email>" : $email; | ||
158 : | print "send email to $full\n"; | ||
159 : | |||
160 : | my $mail = Mail::Mailer->new(); | ||
161 : | $mail->open({ | ||
162 : | To => $full, | ||
163 : | From => 'Metagenome RAST server <mg-rast@mcs.anl.gov>', | ||
164 : | Subject => "MG-RAST job completed" | ||
165 : | }); | ||
166 : | |||
167 : | my $gname = $job->genome_name; | ||
168 : | my $entry = $FIG_Config::fortyeight_home; | ||
169 : | $entry = "http://metagenomics.nmpdr.org/" if $entry eq ''; | ||
170 : | print $mail "The annotation job that you submitted for $gname has completed.\n"; | ||
171 : | print $mail "It is available for browsing at $entry as job number $job_id.\n"; | ||
172 : | $mail->close(); | ||
173 : | } | ||
174 : | } |
MCS Webmaster | ViewVC Help |
Powered by ViewVC 1.0.3 |