[Bio] / FigKernelPackages / JobScheduler.pm Repository:
ViewVC logotype

Annotation of /FigKernelPackages/JobScheduler.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.10 - (view) (download) (as text)

1 : olson 1.1 #
2 : olson 1.6 # Copyright (c) 2003-2006 University of Chicago and Fellowship
3 :     # for Interpretations of Genomes. All Rights Reserved.
4 :     #
5 :     # This file is part of the SEED Toolkit.
6 :     #
7 :     # The SEED Toolkit is free software. You can redistribute
8 :     # it and/or modify it under the terms of the SEED Toolkit
9 :     # Public License.
10 :     #
11 :     # You should have received a copy of the SEED Toolkit Public License
12 :     # along with this program; if not write to the University of Chicago
13 :     # at info@ci.uchicago.edu or the Fellowship for Interpretation of
14 :     # Genomes at veronika@thefig.info or download a copy from
15 :     # http://www.theseed.org/LICENSE.TXT.
16 :     #
17 :    
18 :     #
19 : olson 1.1 # This is a simple job scheduler, built for the SEED environment.
20 :     #
21 : olson 1.3 # A job queue is maintained in the directory $FIG_Config::fig/var/JobQueue.
22 : olson 1.1 #
23 :     # Each entry in the queue is a directory named J_XXXX where J_XXXX is the job ID.
24 :     #
25 :     # In each entry is a file job.in which contains the input to the job.
26 :     # The job's output and error are written to files named job.out and job.err.
27 :     # The exit status is written to a file job.exit_status.
28 :     # The job's current queue status is kept in a file job.queue_status.
29 :     #
30 :     # The actual job to be executed is a script job.script. It is the responsibility
31 :     # of the application enqueuing the job that the script is created with proper
32 :     # executable perms, #! lines, etc.
33 :     #
34 :     # A new job is created using $job = $scheduler->job_create().
35 :     #
36 :     # The paths to the job script, in, out, and error files are obtained by
37 :     # $job->get_script_path(), get_in_path(), get_out_path(), get_err_path().
38 :     #
39 :     # Any access to a job's data must occur with the lockfile job.lock held.
40 :     #
41 :     # When a job is ready to be started, $job->enqueue() is invoked.
42 :     #
43 :     # Queue status values:
44 :     #
45 :     # X Job not yet ready
46 :     # Q Job queued, waiting to run.
47 :     # R Job currently running.
48 :     # D Job done.
49 :     # F Job failed.
50 :     #
51 :    
52 :     package JobScheduler;
53 :    
54 : olson 1.2 use Carp;
55 : olson 1.1 use FIG;
56 :     use FIG_Config;
57 :     use FileHandle;
58 :     use DirHandle;
59 : overbeek 1.7 use FileLocking;
60 : olson 1.1 use Fcntl ':flock';
61 :    
62 :     use strict;
63 :    
64 :     my %status_strings = (X => "Not ready",
65 :     Q => "Queued",
66 :     R => "Running",
67 :     D => "Complete",
68 :     F => "Failed");
69 : parrello 1.8 =head2 Methods
70 :    
71 :     =cut
72 : olson 1.1
73 :     sub new
74 :     {
75 :     my($class, $dir) = @_;
76 :    
77 : olson 1.3 $dir = "$FIG_Config::fig/var/JobQueue" unless $dir;
78 : olson 1.1
79 : olson 1.3 &FIG::verify_dir("$FIG_Config::fig/var");
80 : olson 1.4 #warn "Scheduler using $dir\n";
81 : olson 1.1 &FIG::verify_dir($dir);
82 :    
83 :     my $self = {
84 :     dir => $dir,
85 :     };
86 :    
87 :     bless $self, $class;
88 :    
89 :     return $self;
90 :     }
91 :    
92 :     sub job_create
93 :     {
94 :     my($self) = @_;
95 :    
96 :     my $job_id = $self->get_next_job_id();
97 :    
98 :     my $job_dir = "$self->{dir}/$job_id";
99 :    
100 :     mkdir($job_dir) or die "Error creating job queue directory $job_dir: $!\n";
101 :    
102 :     my $job = JobScheduler::Job->new($self, $job_id, $job_dir);
103 :    
104 :     $job->set_queue_status("X");
105 :    
106 :     #
107 :     # Create an empty stdin file.
108 :     #
109 :     my $in_path = $job->get_in_path();
110 :     open(my $in_fh, ">$in_path");
111 :     close($in_fh);
112 :    
113 :     return $job;
114 :     }
115 :    
116 : parrello 1.8 =head3 job_delete
117 : redwards 1.5
118 :     Remove a job directory and all associated files. This will completely remove the job, so be sure you really want to do this :)
119 :    
120 :     Returns 1 on success and 0 on error, and writes the error to STDERR
121 :    
122 :     =cut
123 :    
124 :     sub job_delete
125 :     {
126 :     my($self, $job)=@_;
127 :     my $job_dir = "$self->{dir}/$job";
128 :     unless (-e $job_dir) {print STDERR "No directory found for requested job $job\n"; return 0}
129 :     my $result=`rm -rf $job_dir`;
130 :     if ($result) {print STDERR "Removing caused this error:\n$result\n"; return 0}
131 :     else {return 1}
132 :     }
133 :    
134 : olson 1.1 =pod
135 :    
136 : parrello 1.8 =head3 get_job_to_execute
137 : olson 1.1
138 :     Determine the next job that is ready to run.
139 :    
140 :     If one exists, returns a pair ($job, $lock_fh) where $lock_fh is the lockfile handle.
141 :    
142 :     =cut
143 :    
144 :     sub get_job_to_execute
145 :     {
146 :     my($self) = @_;
147 :    
148 :     #
149 :     # Scan the job queue looking for the next job that is ready to run.
150 :     #
151 :    
152 :    
153 :     my @jobs = $self->get_job_list();
154 : olson 1.2 # warn "Candidate jobs: @jobs\n";
155 : olson 1.1
156 :     #
157 :     # Run through the jobs in order.
158 :     #
159 :     # We grab the lock here because we will take the first job
160 :     # that is ready to run, and wish to hold the lock while
161 :     # we change status to "running".
162 :     #
163 :    
164 :     my($job_to_run, $job_lock);
165 :    
166 :     for my $id (@jobs)
167 :     {
168 :     my $job = $self->get_job($id);
169 :    
170 :     my $lock = $job->lock();
171 :    
172 :     my $status = $job->get_queue_status(1);
173 :    
174 :     if ($status eq "Q")
175 :     {
176 :     #
177 :     # It's ready to run.
178 :     #
179 :     $job_to_run = $job;
180 :     $job_lock = $lock;
181 :     last;
182 :     }
183 :     else
184 :     {
185 :     $lock->close();
186 :     }
187 :     }
188 :    
189 :     if ($job_to_run)
190 :     {
191 :     return ($job_to_run, $job_lock);
192 :     }
193 :     return undef;
194 :     }
195 :    
196 :     sub get_job_list
197 :     {
198 :     my($self) = @_;
199 :    
200 :     my $dh = new DirHandle("$self->{dir}");
201 :     my @jobs = sort grep { /^J_\d+/ } $dh->read();
202 :    
203 :     return @jobs;
204 :     }
205 :    
206 :     =pod
207 :    
208 : parrello 1.8 =head3 get_job($id)
209 : olson 1.1
210 :     Get a job object for job id $id.
211 :    
212 :     =cut
213 :    
214 :     sub get_job
215 :     {
216 :     my($self, $id) = @_;
217 :    
218 :     my $job_dir = "$self->{dir}/$id";
219 :    
220 :     my $job;
221 :     if (-d $job_dir)
222 :     {
223 :     $job = JobScheduler::Job->new($self, $id, $job_dir);
224 :     }
225 :    
226 :     return $job;
227 :     }
228 :    
229 :     sub get_next_job_id
230 :     {
231 :     my($self) = @_;
232 :    
233 :     #
234 :     # Use $dir/NextJob to get the index of the next job to be created.
235 :     #
236 :     # Ensure we hold the $dir/sched.lock lockfile before reading or modifying NextJob.
237 :     #
238 :    
239 : olson 1.2 my $lock = $self->lock_scheduler();
240 : olson 1.1
241 :     my $job_fh;
242 :     my $job_file = "$self->{dir}/NextJob";
243 :     my $job_id;
244 :    
245 :     if (open($job_fh, "<$job_file"))
246 :     {
247 :     $job_id = <$job_fh>;
248 :     chomp($job_id);
249 :     close($job_fh);
250 :     }
251 :     else
252 :     {
253 :     $job_id = 1000;
254 :     }
255 :    
256 :    
257 :     #
258 :     # Write the jobfile back with an incremented id.
259 :     #
260 :    
261 :     open($job_fh, ">$job_file") or die "Cannot write $job_file: $!\n";
262 :    
263 :     printf $job_fh "%d\n", $job_id + 1;
264 :    
265 :     close($job_fh);
266 :    
267 :     $lock->close();
268 :     return sprintf("J_%05d", $job_id);
269 :     }
270 :    
271 :     sub lock_scheduler
272 :     {
273 :     my($self) = @_;
274 :    
275 :     my $fh = claim_lockfile("$self->{dir}/sched.lock");
276 :    
277 :     return $fh;
278 :     }
279 :    
280 :     =pod
281 :    
282 : parrello 1.8 =head3 get_status()
283 : olson 1.1
284 :     Return the current status of jobs in the scheduler.
285 :     This will be a list of [job_id, status code, status string] tuples.
286 :    
287 :     =cut
288 :    
289 :     sub get_status
290 :     {
291 :     my($self) = @_;
292 :    
293 :     my @jobs = $self->get_job_list();
294 :    
295 :     my @ret;
296 :    
297 :     for my $id (@jobs)
298 :     {
299 :     my $job = $self->get_job($id);
300 :     my $stat = $job->get_queue_status();
301 :     push(@ret, [$id, $stat, $status_strings{$stat}]);
302 :     }
303 :     return @ret;
304 :     }
305 :    
306 :     =pod
307 :    
308 : parrello 1.8 =head3 claim_lockfile($file)
309 : olson 1.1
310 :     Open $file and invoke flock(LOCK_EX) on it.
311 :    
312 :     Returns the open filehandle, to be closed when the lock is to be released.
313 :    
314 :     =cut
315 :     sub claim_lockfile
316 :     {
317 :     shift if UNIVERSAL::isa($_[0],__PACKAGE__);
318 :     my($file) = @_;
319 :    
320 :     my $fh = new FileHandle;
321 :    
322 : olson 1.2 sysopen($fh, $file, O_RDWR | O_CREAT) or confess "Cannot open lockfile $file: $!\n";
323 : olson 1.1
324 :     flock($fh, LOCK_EX) or die "Cannot flock $file: $!\n";
325 :    
326 :     return $fh;
327 :     }
328 :    
329 :     package JobScheduler::Job;
330 :    
331 :     use strict;
332 :     use Errno;
333 :    
334 :     sub new
335 :     {
336 :     my($class, $scheduler, $id, $dir) = @_;
337 :    
338 :     my $self = {
339 :     dir => $dir,
340 :     scheduler => $scheduler,
341 :     id => $id,
342 :     };
343 :    
344 :     bless $self, $class;
345 :    
346 :     return $self;
347 :     }
348 :    
349 :     sub lock
350 :     {
351 :     my($self) = @_;
352 :    
353 :     return JobScheduler::claim_lockfile("$self->{dir}/job.lock");
354 :     }
355 :    
356 :     sub enqueue
357 :     {
358 :     my($self, $dont_lock) = @_;
359 :    
360 :     $self->set_queue_status("Q", $dont_lock);
361 :     }
362 :    
363 :     sub get_id
364 :     {
365 :     my($self) = @_;
366 :    
367 :     return $self->{id};
368 :     }
369 :    
370 :    
371 :     =pod
372 :    
373 : parrello 1.8 =head3 run($lock_fh)
374 : olson 1.1
375 :     Run this job. $lock_fh is the filehandle for the current lock on this job. The lock
376 :     will be released when the method exits.
377 :    
378 :     =cut
379 :    
380 :     sub run
381 :     {
382 :     my($self, $lock_fh) = @_;
383 :    
384 :     $lock_fh = $self->lock() unless $lock_fh;
385 :    
386 :     #
387 :     # Fork a process to run the job. It will chdir to the
388 :     # spool directory, and redirect stdin/out/err to the correct
389 :     # files.
390 :     #
391 :    
392 :     #
393 :     # First make sure we can execute the job script.
394 :     #
395 :    
396 :     open(my $log, ">>$self->{dir}/job.log");
397 :    
398 :     my $script = $self->get_script_path();
399 :     if (! -x $script)
400 :     {
401 :     print $log "Job script $script not executable\n";
402 :     warn "Job script $script not executable\n";
403 :     $self->set_queue_status("F", 1);
404 :     $lock_fh->close();
405 :     return;
406 :     }
407 :    
408 :     $self->set_queue_status("R", 1);
409 :    
410 :     #
411 :     # Fork a child.
412 :     #
413 :    
414 :     my $pid = fork;
415 :    
416 :     if ($pid == 0)
417 :     {
418 :     open(STDIN, "<" . $self->get_in_path());
419 :     open(STDOUT, ">" . $self->get_out_path());
420 :     open(STDERR, ">" . $self->get_err_path());
421 :    
422 :     chdir($self->{dir});
423 :    
424 :     $lock_fh->close();
425 :    
426 :     exec($script);
427 :    
428 :     exit 1;
429 :     }
430 :    
431 :     open(my $fh, ">$self->{dir}/monitor.pid");
432 :     print $fh "$$\n";
433 :     close($fh);
434 :    
435 :     open(my $fh, ">$self->{dir}/job.pid");
436 :     print $fh "$pid\n";
437 :     close($fh);
438 :    
439 :     $lock_fh->close();
440 :    
441 :     #
442 :     # Wait for the child to finish.
443 :     #
444 :    
445 :     my $wpid = waitpid($pid, 0);
446 :     my $stat = $?;
447 :    
448 :     print $log "Child $wpid finished with status $stat\n";
449 :     warn "Child $wpid finished with status $stat\n";
450 :    
451 :     my $lock = $self->lock();
452 :    
453 :     unlink("$self->{dir}/monitor.pid");
454 :     unlink("$self->{dir}/job.pid");
455 :    
456 :     if ($stat == 0)
457 :     {
458 :     $self->set_queue_status("D", 1);
459 :     }
460 :     else
461 :     {
462 :     $self->set_queue_status("F", 1);
463 :     }
464 :    
465 :     open(my $fh, ">$self->{dir}/job.exit_status");
466 :     print $fh "$stat\n";
467 :     close($fh);
468 :    
469 :     $lock->close();
470 :    
471 :     }
472 :    
473 :     sub set_queue_status
474 :     {
475 :     my($self, $status, $dont_lock) = @_;
476 :    
477 :     my $lock = $self->lock() unless $dont_lock;
478 :    
479 :     open(my $fh, ">$self->{dir}/job.queue_status") or
480 :     die "Cannot write $self->{dir}/job.queue_status: $!\n";
481 :    
482 :     print $fh "$status\n";
483 :     close($fh);
484 :    
485 :     $lock->close() if $lock;
486 :     }
487 :    
488 :     sub get_queue_status
489 :     {
490 :     my($self, $dont_lock) = @_;
491 :    
492 :     my $lock = $self->lock() unless $dont_lock;
493 :     my $status;
494 :    
495 :     if (open(my $fh, "<$self->{dir}/job.queue_status"))
496 :     {
497 :    
498 :     $status = <$fh>;
499 :     chomp($status);
500 :     close($fh);
501 :     }
502 :     else
503 :     {
504 :     if ($!{ENOENT})
505 :     {
506 :     #
507 :     # No status file is the same as "X".
508 :     #
509 :    
510 :     $status = "X";
511 :     }
512 :     else
513 :     {
514 :     die "Cannot read $self->{dir}/job.queue_status: $!\n";
515 :     }
516 :     }
517 :    
518 :     $status = "X" if $status eq "";
519 :    
520 :    
521 :     $lock->close() if $lock;
522 :    
523 :     return $status;
524 :     }
525 :    
526 :     sub get_script_path
527 :     {
528 :     my($self) = @_;
529 :    
530 :     return "$self->{dir}/job.script";
531 :     }
532 :    
533 :     sub get_in_path
534 :     {
535 :     my($self) = @_;
536 :    
537 :     return "$self->{dir}/job.in";
538 :     }
539 :    
540 :     sub get_out_path
541 :     {
542 :     my($self) = @_;
543 :    
544 :     return "$self->{dir}/job.out";
545 :     }
546 :    
547 :     sub get_err_path
548 :     {
549 :     my($self) = @_;
550 :    
551 :     return "$self->{dir}/job.err";
552 :     }
553 :    
554 :    
555 :     1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3