[Bio] / FortyEight / ClusterStage.pm Repository:
ViewVC logotype

Annotation of /FortyEight/ClusterStage.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (view) (download) (as text)

1 : olson 1.1 package ClusterStage;
2 :    
3 :     use FIG;
4 :     use FIG_Config;
5 :     use strict;
6 :     use Carp 'croak';
7 :     use Data::Dumper;
8 :     #
9 :     # Processing stage for check_jobs that performs a SGE submission to
10 :     # the cluster.
11 :     #
12 :     # The jobname passed in is the name of the executable to be executed
13 :     # on the cluster to handel this stage of the pipeline.
14 :     #
15 :    
16 :     sub new
17 :     {
18 :     my($class, $jobname, %opts) = @_;
19 :    
20 :     my $self = {
21 :     jobname => $jobname,
22 :     opts => {%opts}
23 :     };
24 :    
25 :     return bless $self, $class;
26 :     }
27 :    
28 :     #
29 :     # Process this stage via a cluster run.
30 :     #
31 :     # We will land here if the job has not started OR it has started
32 :     # but not finished. We use this opportunity to see if the job
33 :     # has vanished from the cluster. If it is not on the cluster, and
34 :     # we don't have an exit status written to the metadata, it is likely
35 :     # that the job crashed on the cluster and the exit status will
36 :     # never be set. In this event, mark the job as being in error.
37 :     #
38 :     # A job may consist of multiple SGE jobs. (a single large sims run
39 :     # may have a preprocess step, large fanout compute, and a single postprocess,
40 :     # all linked with SGE holds).
41 :     #
42 :     # In this case the stagename.sge_id vector will contain multiple
43 :     # job ids.
44 :     #
45 :     sub process
46 :     {
47 :     my($self, $name, $job_id, $job_dir, $meta, $sge) = @_;
48 :    
49 :     my $mf_running = "${name}.running";
50 :     my $is_running = $meta->get_metadata($mf_running);
51 :     my $state = $meta->get_metadata("status.$name");
52 :    
53 :     print "state=$state is_running=$is_running\n";
54 :    
55 :     $self->{name} = $name;
56 :    
57 :     &FIG::verify_dir("$job_dir/sge_output");
58 :    
59 :     if ($is_running eq 'yes')
60 :     {
61 :     #
62 :     # Check to see if any of the SGE job ids still exist.
63 :     #
64 :    
65 :     my $ids = $meta->get_metadata("${name}.sge_id");
66 :     if (!$ids)
67 :     {
68 :     $self->fatal($meta, "Job is marked running, but no SGE ids have been registered");
69 :     }
70 :    
71 :     my $running = 0;
72 :     my $pending = 0;
73 :     for my $id (@$ids)
74 :     {
75 :     my @l = $sge->job_running($id);
76 :     $running += @l;
77 :     print Dumper(\@l);
78 :     my @l = $sge->job_queued($id);
79 :     $pending += @l;
80 :     print Dumper(\@l);
81 :     }
82 :    
83 :     if ($state eq "queued")
84 :     {
85 :     if ($running == 0 && $pending == 0)
86 :     {
87 :     #
88 :     # Nothing running, nothing pending. We must have croaked.
89 :     #
90 :     print "Job is queued, but no SGE jobs are either running or pending\n";
91 :     $meta->set_metadata("status.$name", "error");
92 :     $meta->set_metadata("${name}.running", "no");
93 :    
94 :     $self->fatal($meta, "Job is marked queued, but no SGE jobs running or pending");
95 :     }
96 :     else
97 :     {
98 :     print "Job queued. Running=$running pending=$pending\n";
99 :     }
100 :     }
101 :     elsif ($state eq "running")
102 :     {
103 :     if ($running == 0)
104 :     {
105 :     #
106 :     # If there is nothing running, and we think we should be running, it is an error.
107 :     #
108 :     # If there is nothing running,
109 :     #
110 :    
111 :     print "Job marked running, but no SGE jobs running\n";
112 :     $meta->set_metadata("status.$name", "error");
113 :     $meta->set_metadata("${name}.running", "no");
114 :     $self->fatal($meta, "Job is marked running, but no SGE jobs are around any more (@$ids)");
115 :     }
116 :     else
117 :     {
118 :     print "Job running. Running=$running pending=$pending\n";
119 :     }
120 :     }
121 :    
122 :     #
123 :     # Otherwise, we are okay. Just return and go about your business.
124 :     #
125 :    
126 :     return;
127 :     }
128 :    
129 :     #
130 :     # Not running yet. Start up the job.
131 :     #
132 :    
133 :     if ($self->{opts}->{start_locally})
134 :     {
135 :     $self->start_job_local($name, $job_id, $job_dir, $meta, $sge);
136 :     }
137 :     else
138 :     {
139 :     $self->start_job_sge($name, $job_id, $job_dir, $meta, $sge);
140 :     }
141 :     }
142 :    
143 :    
144 :     #
145 :     # Start the job via an SGE submission
146 :     #
147 :     # This is for tasks that are themselves expensive
148 :     #
149 :     # Some tasks do a small amount of processing then submit jobs. Those
150 :     # should be run with start_job_local.
151 :     #
152 :     sub start_job_sge
153 :     {
154 :     my($self, $name, $job_id, $job_dir, $meta, $sge) = @_;
155 :    
156 :     my @sge_args;
157 :    
158 :     push(@sge_args, "-N ${name}_$job_id");
159 :     push(@sge_args, "-v PATH");
160 :     push(@sge_args, "-e $job_dir/sge_output");
161 :     push(@sge_args, "-o $job_dir/sge_output");
162 :     push(@sge_args, "-b yes");
163 :    
164 :     #
165 :     # If the user specified queue_flags, use those, and don't try to be
166 :     # clever here.
167 :     #
168 :    
169 :     my $opts = $self->{opts};
170 :     print Dumper($opts);
171 :    
172 :     if (exists($opts->{sge_flag}))
173 :     {
174 :     my $f = $opts->{sge_flag};
175 :     if (ref($f) eq 'ARRAY')
176 :     {
177 :     push(@sge_args, @$f);
178 :     }
179 :     else
180 :     {
181 :     push(@sge_args, $f);
182 :     }
183 :     }
184 :     else
185 :     {
186 :     #
187 :     # 48hr jobs get high priority
188 :     #
189 : mkubal 1.3 #push(@sge_args, "-l high");
190 : olson 1.1
191 :     #
192 :     # Pick a queue.
193 :     #
194 :     if (my $q = $opts->{queue})
195 :     {
196 :     push(@sge_args, "-q $q");
197 :     }
198 :     }
199 :    
200 :    
201 :     #
202 :     # Unless the options disable it, require db.
203 :     #
204 :    
205 :     if (not $opts->{no_localdb})
206 :     {
207 :     push(@sge_args, "-l localdb");
208 :     }
209 :    
210 :     my $sge_args = join(" ", @sge_args);
211 :    
212 :     #
213 :     # Executable is to be in the FIGdisk bin dir.
214 :     my $exe = "$FIG_Config::bin/$self->{jobname}";
215 :     if (! -x $exe)
216 :     {
217 : olson 1.2 $self->fatal($meta, "Executable $exe not found");
218 : olson 1.1 }
219 :    
220 :     #
221 :     # We're good to go.
222 :     #
223 :    
224 :     my $sge_id;
225 :    
226 :     eval {
227 :     $sge_id = $sge->submit_job($meta, $sge_args, "$exe $job_dir");
228 :     };
229 :    
230 :     if ($@)
231 :     {
232 :     $self->fatal($meta, "error starting SGE job $exe $job_dir: $@\n");
233 :     }
234 :    
235 :     #
236 :     # OK, cool.
237 :     #
238 :    
239 :     $meta->set_metadata("${name}.sge_id", [$sge_id]);
240 :     $meta->set_metadata("${name}.running", "yes");
241 :     $meta->set_metadata("status.$name", "queued");
242 :     }
243 :    
244 :     #
245 :     # Start the job via a local process invocation.
246 :     #
247 :     sub start_job_local
248 :     {
249 :     my($self, $name, $job_id, $job_dir, $meta, $sge) = @_;
250 :    
251 :     #
252 :     # Executable is to be in the FIGdisk bin dir.
253 :     my $exe = "$FIG_Config::bin/$self->{jobname}";
254 :     if (! -x $exe)
255 :     {
256 :     $self->fatal($meta, "Executable $exe not found");
257 :     }
258 :    
259 :     #
260 :     # We're good to go.
261 :     #
262 :    
263 :     my $pid = fork();
264 :    
265 :     if ($pid == 0)
266 :     {
267 :     my $stdout = "$job_dir/sge_output/immediate.$$.stdout";
268 :     my $stderr = "$job_dir/sge_output/immediate.$$.stderr";
269 :    
270 :     my $cmd = "$exe $job_dir > $stdout 2> $stderr";
271 :     print "$cmd\n";
272 :    
273 :     exec($cmd);
274 :     die "Exec failed: $!";
275 :     }
276 :    
277 :     my $stdout = "$job_dir/sge_output/immediate.$pid.stdout";
278 :     my $stderr = "$job_dir/sge_output/immediate.$pid.stderr";
279 :    
280 :     my $cmd = "$exe $job_dir > $stdout 2> $stderr";
281 :     print "$cmd\n";
282 :    
283 :     print "Waiting for $pid\n";
284 :     waitpid($pid, 0);
285 :    
286 :     if ($? != 0)
287 :     {
288 : olson 1.2 $self->fatal($meta, "Cmd failed with \$?=$?: $cmd");
289 : olson 1.1 }
290 :    
291 :     system("cat", $stdout);
292 :     }
293 :    
294 :     sub fatal
295 :     {
296 :     my($self, $meta, $msg) = @_;
297 :    
298 :     $meta->add_log_entry($0, ['fatal error', $msg]);
299 :     $meta->set_metadata("status." . $self->{name}, "error");
300 :    
301 :     croak "$0: $msg";
302 :     }
303 :    
304 :    
305 :     1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3