[Bio] / FigKernelPackages / Cluster.pm Repository:
ViewVC logotype

Annotation of /FigKernelPackages/Cluster.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.11 - (view) (download) (as text)

1 : olson 1.1 #
2 : olson 1.11 # Copyright (c) 2003-2006 University of Chicago and Fellowship
3 :     # for Interpretations of Genomes. All Rights Reserved.
4 :     #
5 :     # This file is part of the SEED Toolkit.
6 :     #
7 :     # The SEED Toolkit is free software. You can redistribute
8 :     # it and/or modify it under the terms of the SEED Toolkit
9 :     # Public License.
10 :     #
11 :     # You should have received a copy of the SEED Toolkit Public License
12 :     # along with this program; if not write to the University of Chicago
13 :     # at info@ci.uchicago.edu or the Fellowship for Interpretation of
14 :     # Genomes at veronika@thefig.info or download a copy from
15 :     # http://www.theseed.org/LICENSE.TXT.
16 :     #
17 :    
18 :     #
19 : olson 1.1 # Routines for managing SEED jobs on a cluster.
20 :     #
21 :    
22 :     package Cluster::DBJobMgr;
23 :     use strict;
24 :    
25 :     use base qw(Class::Accessor);
26 :    
27 :     __PACKAGE__->mk_accessors(qw(table_name fig db dbh dbms lock_mode));
28 :    
29 :     use constant {
30 :     AVAIL => 0,
31 :     TAKEN => 1,
32 :     DONE => 2,
33 : olson 1.6 FAILED => 3,
34 : olson 1.1 };
35 :    
36 :     #
37 :     # A database-based job manager.
38 :     #
39 :     # We use a table in the database to maintain the work to be done and the work
40 :     # as completed.
41 :     #
42 :    
43 :     sub new
44 :     {
45 :     my($class, $fig, $table_name) = @_;
46 :    
47 :     #
48 :     # Ensure table_name is valid.
49 :     #
50 :    
51 :     if ($table_name !~ /^\w+$/)
52 :     {
53 :     die "Cluster::DBJobMgr::new: Table name may only consist of alphanumeric characters, no spaces allowed.";
54 :     }
55 :    
56 :     my $db = $fig->db_handle;
57 :     my $dbh = $db->{_dbh};
58 :     my $dbms = $db->{_dbms};
59 :    
60 :     my $self = {
61 :     table_name => "pjs_$table_name",
62 :     fig => $fig,
63 :     db => $db,
64 :     dbh => $dbh,
65 :     dbms => $dbms,
66 :     lock_mode => "",
67 :     };
68 :    
69 :     bless $self, $class;
70 :    
71 :     if ($dbms eq "mysql")
72 :     {
73 :     $self->lock_mode("for update");
74 :     }
75 :    
76 :     return bless $self, $class;
77 :     }
78 :    
79 :     sub get_work
80 :     {
81 :     my($self, $worker) = @_;
82 :     my $work;
83 :    
84 :     my $dbh = $self->dbh;
85 :     my $table = $self->table_name;
86 :    
87 :     local $dbh->{AutoCommit} = 0;
88 :     local $dbh->{RaiseError} = 1;
89 :    
90 :     eval {
91 :     my $res = $dbh->selectall_arrayref("SELECT * FROM $table
92 :     WHERE status = ? LIMIT 1 " . $self->lock_mode,
93 :     undef,
94 :     AVAIL);
95 :     if (not $res or @$res == 0)
96 :     {
97 :     die "No work found\n";
98 :     }
99 :    
100 :     my ($peg, $status, $job_taken, $job_finished, $output) = @{$res->[0]};
101 :     # warn "Found peg=$peg status=$status job info $job_taken $job_finished\n";
102 :     $dbh->do("update $table set status = ?, worker = ?, job_taken = now() where peg = ?", undef,
103 :     TAKEN, $worker, $peg);
104 :    
105 :     $dbh->commit();
106 :     $work = $peg;
107 :     };
108 :    
109 :     if ($@)
110 :     {
111 :     warn "Error in get_work eval: $@\n";
112 :     $dbh->rollback();
113 :     return;
114 :     }
115 :     else
116 :     {
117 :     return $work;
118 :     }
119 :     }
120 :    
121 :     sub work_done
122 :     {
123 :     my($self, $work, $output) = @_;
124 :    
125 :     my $dbh = $self->dbh;
126 :     my $table = $self->table_name;
127 :    
128 :     local $dbh->{AutoCommit} = 0;
129 :     local $dbh->{RaiseError} = 1;
130 :    
131 :     eval {
132 :     $dbh->do("update $table set status = ?, job_finished = now(), output = ? where peg = ?", undef,
133 :     DONE, $output, $work);
134 :    
135 :     $dbh->commit();
136 :     };
137 :    
138 :     if ($@)
139 :     {
140 :     warn "Error in work_done eval: $@\n";
141 :     $dbh->rollback();
142 :     die "Invalid work request: $@";
143 :     }
144 :     else
145 :     {
146 :     return 1;
147 :     }
148 :    
149 :     }
150 :    
151 :     sub work_done
152 :     {
153 :     my($self, $work, $output) = @_;
154 :    
155 :     my $dbh = $self->dbh;
156 :     my $table = $self->table_name;
157 :    
158 :     local $dbh->{AutoCommit} = 0;
159 :     local $dbh->{RaiseError} = 1;
160 :    
161 :     eval {
162 :     $dbh->do("update $table set status = ?, job_finished = now(), output = ? where peg = ?", undef,
163 :     DONE, $output, $work);
164 :    
165 :     $dbh->commit();
166 :     };
167 :    
168 :     if ($@)
169 :     {
170 :     warn "Error in work_done eval: $@\n";
171 :     $dbh->rollback();
172 :     die "Invalid work_done request: $@";
173 :     }
174 :     else
175 :     {
176 :     return 1;
177 :     }
178 :    
179 :     }
180 :    
181 :     sub work_aborted
182 :     {
183 :     my($self, $work) = @_;
184 :    
185 :     my $dbh = $self->dbh;
186 :     my $table = $self->table_name;
187 :    
188 :     local $dbh->{AutoCommit} = 0;
189 :     local $dbh->{RaiseError} = 1;
190 :    
191 :     eval {
192 :     $dbh->do("update $table set status = ?, job_finished = now(), output = NULL where peg = ?", undef,
193 :     AVAIL, $work);
194 :    
195 :     $dbh->commit();
196 :     };
197 :    
198 :     if ($@)
199 :     {
200 :     warn "Error in work_aborted eval: $@\n";
201 :     $dbh->rollback();
202 :     die "Invalid work_aborted request:$@";
203 :     }
204 :     else
205 :     {
206 :     return 1;
207 :     }
208 :    
209 :     }
210 :    
211 : olson 1.2 package Cluster::Broker;
212 :    
213 : olson 1.8 use Cwd 'abs_path';
214 : olson 1.6 use File::Basename;
215 :     use Time::HiRes 'time';
216 :     use FIG_Config;
217 :     use FIG;
218 :    
219 :     our $ns = "http://mcs.anl.gov/FL/Schemas/cluster_service";
220 :    
221 : olson 1.2 use constant {
222 :     AVAIL => 0,
223 :     TAKEN => 1,
224 :     DONE => 2,
225 : olson 1.6 FAILED => 3,
226 : olson 1.2 };
227 :    
228 :     use strict;
229 : olson 1.5 use Data::Dumper;
230 : olson 1.6 use File::Copy;
231 :    
232 :     our $cluster_spool = "$FIG_Config::fig_disk/ClusterSpool";
233 : olson 1.2
234 :     =head1 Job Broker
235 :    
236 :     Perl code for job broker functionality.
237 :    
238 :     A broker instance maintains a database of jobs. Each job has a description
239 :     of some sort (heh), including a field defining the type of job. The plan
240 :     at this point is to not generalize the job type management, but to rather to
241 :     inclde a job type in the job table, but then to use a separate table to hold
242 :     the individual of work in each of the job types (sim-computaton, scopmap, etc).
243 :    
244 :     A complication that might occur is that we wish to be able to allocate a piece
245 :     of work with a single query "get me the next available piece of work and mark
246 :     that piece as being worked on". If jobs are kept in multiple tables, this becomes
247 :     difficult. We may be able to get around this by keeping a single work table
248 :     that refers, for each piece of work, to the work type and identifier for that
249 :     workpiece.
250 :    
251 :    
252 :     =cut
253 :    
254 :     sub init_job_tables
255 :     {
256 :     my($db) = @_;
257 :    
258 : olson 1.10 local $db->{_dbh}->{AutoCommit} = 0;
259 :    
260 : olson 1.2 my $serial_type;
261 :     if ($db->{_dbms} eq "mysql")
262 :     {
263 :     $serial_type = "int not null auto_increment";
264 :     }
265 :     elsif ($db->{_dbms} eq "Pg")
266 :     {
267 :     $serial_type = "serial";
268 : olson 1.10
269 :     $db->SQL("SET CONSTRAINTS ALL DEFERRED");
270 : olson 1.2 }
271 :    
272 : olson 1.7 #
273 :     # Tables for constants.
274 :     #
275 :    
276 :     $db->drop_table(tbl => 'js_work_type');
277 :     $db->create_table(tbl => 'js_work_type',
278 :     flds => qq(id $serial_type primary key,
279 :     name varchar(64)
280 :     )
281 :     );
282 :    
283 :     $db->drop_table(tbl => 'js_work_status');
284 :     $db->create_table(tbl => 'js_work_status',
285 :     flds => qq(status int primary key,
286 :     name varchar(64)
287 :     )
288 :     );
289 :    
290 :     $db->SQL("insert into js_work_status values (?, ?)", undef, AVAIL, 'AVAIL');
291 :     $db->SQL("insert into js_work_status values (?, ?)", undef, TAKEN, 'TAKEN');
292 :     $db->SQL("insert into js_work_status values (?, ?)", undef, FAILED, 'FAILED');
293 :     $db->SQL("insert into js_work_status values (?, ?)", undef, DONE, 'DONE');
294 :    
295 : olson 1.2 #
296 :     # A work entry belongs to a job.
297 :     # It can have zero or more exec records associated; each exec
298 :     # record documents an attempt to run that piece of work on
299 :     # a particular worker. If all goes well, there will be only
300 :     # one, but jobs get killed..
301 :     #
302 :    
303 : olson 1.6 $db->drop_table(tbl => 'js_job');
304 :     $db->create_table(tbl => 'js_job',
305 :     flds => qq(id $serial_type PRIMARY KEY
306 :     )
307 :     );
308 :    
309 :     #
310 : olson 1.7 # Table for generating work id's so the cluster and noncluster
311 :     # work items don't have overlapping IDs, so they can be distinguished
312 :     # by id.
313 :     #
314 :     $db->drop_table(tbl => 'js_work_id');
315 :     $db->create_table(tbl => 'js_work_id',
316 :     flds => qq(id $serial_type PRIMARY KEY
317 :     )
318 :     );
319 :    
320 :     $db->drop_table(tbl => 'js_cluster');
321 :     $db->create_table(tbl => 'js_cluster',
322 :     flds => qq(id $serial_type primary key,
323 :     name varchar(255),
324 :     info text
325 :     )
326 :     );
327 :    
328 :     $db->drop_table(tbl => 'js_worker');
329 :     $db->create_table(tbl => 'js_worker',
330 :     flds => qq(id $serial_type primary key,
331 :     cluster_id int REFERENCES js_cluster,
332 :     hostname varchar(255),
333 :     username varchar(32),
334 :     pid int,
335 :     exe varchar(255),
336 :     last_heartbeat timestamp
337 :     )
338 :     );
339 :    
340 :     #
341 : olson 1.6 # We use the is_cluster_work flag to implement the
342 :     # has_cluster_work relation - there is no work that
343 :     # is not either prolog or non-prolog work, so the
344 :     # job_id reference here implements the has_work relation,
345 :     # and the is_prolog_work flag partitions the work.
346 :     #
347 :     # If must_execute_on_cluster is not NULL, then this piece
348 :     # of work is a cluster-work item for that cluster. It must
349 :     # be executed before non cluster-work items.
350 :     # In this case, work_derived_from refers to the work item
351 :     # that was replicated to create this piece of work. This is
352 :     # used to locate the type-specific information of this piece
353 :     # of work.
354 :     #
355 :    
356 :     $db->drop_table(tbl => 'js_work');
357 :     $db->create_table(tbl => 'js_work',
358 : olson 1.7 flds => qq(id int PRIMARY KEY REFERENCES js_work_id,
359 : olson 1.6 job_id int REFERENCES js_job,
360 :     work_type int REFERENCES js_work_type,
361 :     status int REFERENCES js_work_status,
362 : olson 1.7 active_exec_id int,
363 :     output text
364 : olson 1.6 )
365 :     );
366 :     $db->create_index(tbl => 'js_work',
367 :     idx => 'js_work_idx_status',
368 :     flds => 'status');
369 :     $db->create_index(tbl => 'js_work',
370 :     idx => 'js_work_idx_status_jobid',
371 :     flds => ' job_id, status');
372 :    
373 :     #
374 : olson 1.7 # Execution record for a piece of work.
375 :     #
376 :    
377 :     $db->drop_table(tbl => 'js_exec');
378 :     $db->create_table(tbl => 'js_exec',
379 :     flds => qq(id $serial_type primary key,
380 :     work_id int REFERENCES js_work,
381 :     worker_id int REFERENCES js_worker,
382 :     status int REFERENCES js_work_status,
383 :     job_taken timestamp,
384 :     job_finished timestamp
385 :     )
386 :     );
387 :    
388 :     #
389 : olson 1.6 # A piece of per-cluster work.
390 :     #
391 :     # Since we don't completely know which cluster the work has to run on when it is
392 :     # created, we use a cluster_work entry with must_execute_on_cluster = NULL to
393 :     # hold the template for future clusters' work.
394 :     #
395 :    
396 :     $db->drop_table(tbl => 'js_cluster_work');
397 :     $db->create_table(tbl => 'js_cluster_work',
398 : olson 1.7 flds => qq(id int PRIMARY KEY REFERENCES js_work_id,
399 : olson 1.6 job_id int REFERENCES js_job,
400 :     work_type int REFERENCES js_work_type,
401 :     status int REFERENCES js_work_status,
402 : olson 1.7 active_exec_id int,
403 : olson 1.6 must_execute_on_cluster int REFERENCES js_cluster,
404 : olson 1.7 work_derived_from int REFERENCES js_cluster_work,
405 :     output text
406 : olson 1.6 )
407 :     );
408 :     $db->create_index(tbl => 'js_cluster_work',
409 :     idx => 'js_cluster_work_idx_status',
410 :     flds => 'status');
411 :     $db->create_index(tbl => 'js_cluster_work',
412 :     idx => 'js_cluster_work_idx_exec_type',
413 :     flds => 'must_execute_on_cluster, work_type');
414 :    
415 :    
416 :    
417 :     #
418 :     # Execution record for a piece of cluster work.
419 :     #
420 :    
421 :     $db->drop_table(tbl => 'js_cluster_exec');
422 :     $db->create_table(tbl => 'js_cluster_exec',
423 : olson 1.2 flds => qq(id $serial_type primary key,
424 : olson 1.6 cluster_id int REFERENCES js_cluster,
425 :     work_id int REFERENCES js_cluster_work,
426 :     worker_id int REFERENCES js_worker,
427 :     status int REFERENCES js_work_status,
428 : olson 1.2 job_taken timestamp,
429 :     job_finished timestamp
430 :     )
431 :     );
432 :    
433 :    
434 : olson 1.6 $db->drop_table(tbl => 'js_worker_can_execute');
435 :     $db->create_table(tbl => 'js_worker_can_execute',
436 :     flds => qq(worker_id int REFERENCES js_worker,
437 :     work_type int REFERENCES js_work_type
438 :     )
439 :     );
440 :     $db->create_index(tbl => 'js_worker_can_execute',
441 :     idx => 'js_worker_can_execute_idx',
442 :     flds =>'worker_id, work_type');
443 :    
444 :    
445 :     #
446 :     # Setup for the sim jobtype.
447 :     #
448 :     # The stuff in here isn't really needed at runtime, but for reporting
449 :     # or accounting info at the user level will be useful. It records the
450 :     # semantics of the parameters of the job.
451 :     #
452 :     # It is also used in distributing job-specific work (the threshhold here must
453 :     # be distributed to the clients).
454 :     #
455 :    
456 :     $db->drop_table(tbl => 'js_job_sim');
457 :     $db->create_table(tbl => 'js_job_sim',
458 :     flds => qq(id int PRIMARY KEY REFERENCES js_job,
459 :     nr_path varchar(255),
460 :     input_path varchar(255),
461 :     output_path varchar(255),
462 :     chunk_size int,
463 : olson 1.7 thresh double precision
464 : olson 1.6 )
465 :     );
466 :    
467 :     #
468 :     # For each piece of work, there is a js_work and a js_sim_work
469 :     # record. The js_work record keeps track of the job-independent
470 :     # info, the js_sim_work record keeps track of the job-dependent
471 :     # info (here, the input sequence).
472 :     #
473 :    
474 :     $db->drop_table(tbl => 'js_sim_work');
475 :     $db->create_table(tbl => 'js_sim_work',
476 :     flds => qq(id int PRIMARY KEY REFERENCES js_work,
477 :     input_seq text
478 :     )
479 :     );
480 :    
481 :     $db->SQL("insert into js_work_type (name) values ('sim')");
482 : olson 1.2
483 :     #
484 : olson 1.6 # File staging work.
485 : olson 1.2 #
486 :    
487 : olson 1.6 $db->drop_table(tbl => 'js_stage_work');
488 :     $db->create_table(tbl => 'js_stage_work',
489 :     flds => qq(id int PRIMARY KEY REFERENCES js_cluster_work,
490 :     path varchar(255)
491 : olson 1.2 )
492 :     );
493 :    
494 : olson 1.6 $db->SQL("insert into js_work_type (name) values ('stage')");
495 :    
496 :     #
497 :     # Blast NR staging work.
498 :     #
499 :    
500 :     $db->drop_table(tbl => 'js_stage_nr_work');
501 :     $db->create_table(tbl => 'js_stage_nr_work',
502 :     flds => qq(id int PRIMARY KEY REFERENCES js_cluster_work,
503 :     path varchar(255)
504 : olson 1.2 )
505 :     );
506 :    
507 : olson 1.6 $db->SQL("insert into js_work_type (name) values ('stage_nr')");
508 : olson 1.2
509 :     }
510 :    
511 :     #
512 :     # Constructor for a cluster scheduler instance.
513 :     #
514 :    
515 :     sub new
516 :     {
517 :     my($class, $fig) = @_;
518 :    
519 :     my $self = {
520 :     fig => $fig,
521 :     db => $fig->db_handle,
522 :     dbh => $fig->db_handle->{_dbh},
523 :     dbms => $fig->db_handle->{_dbms},
524 :     };
525 :    
526 :     bless $self, $class;
527 :    
528 :     return $self;
529 :     }
530 :    
531 : olson 1.6 ###############################
532 :     #
533 :     # Worker registration.
534 :     #
535 :    
536 : olson 1.2 =head1 register_worker
537 :    
538 :     Register this worker with the task manager. It will return a worker
539 :     ID for use in future calls. We pass the cluster name here as well,
540 :     and the cluster ID is also returned.
541 :    
542 :     =cut
543 :    
544 :     sub register_worker
545 :     {
546 : olson 1.6 my($self, $host, $user, $pid, $exe, $cluster_id, $work_types) = @_;
547 :    
548 :     #
549 :     # Validate work-types; should be either undef or a list.
550 :     #
551 :    
552 :     if ($work_types and ref($work_types) ne "ARRAY")
553 :     {
554 :     die "Invalid type for work_types list";
555 :     }
556 : olson 1.2
557 :     #
558 :     # We shouldn't have to worry about any concurrency issues in this code.
559 :     # Each worker individually registers and gets its own id.
560 :     #
561 :    
562 : olson 1.6 $cluster_id =~ /^\d+$/ or die "Invalid cluster id";
563 : olson 1.2
564 : olson 1.6 my $sth = $self->{dbh}->prepare("insert into js_worker (cluster_id, hostname, username, pid, exe) values (?, ?, ?, ?, ?)");
565 :     $sth->execute($cluster_id, $host, $user, $pid, $exe);
566 :    
567 :     my $id = $self->get_inserted_id('js_worker', $sth);
568 :    
569 :     #
570 :     # Create the worker_can_execute entries from the work-types list.
571 :     #
572 :    
573 :     if ($work_types)
574 :     {
575 :     my $qs = join(", ", map { "?" } @$work_types);
576 :    
577 :     $self->{db}->SQL(qq(INSERT INTO js_worker_can_execute
578 :     (SELECT ?, id
579 :     FROM js_work_type
580 :     WHERE name in ( $qs ))), undef, $id, @$work_types);
581 :     }
582 :    
583 :     return $id;
584 : olson 1.2 }
585 :    
586 : olson 1.6 sub lookup_cluster
587 : olson 1.2 {
588 : olson 1.6 my($self, $name) = @_;
589 : olson 1.2
590 : olson 1.6 my $res = $self->{db}->SQL("select id from js_cluster where name = ?", undef, $name);
591 : olson 1.2 if ($res and @$res > 0)
592 :     {
593 :     my $id = $res->[0]->[0];
594 :     return $id;
595 :     }
596 : olson 1.6 else
597 :     {
598 :     return undef;
599 :     }
600 :     }
601 :    
602 :     sub register_cluster
603 :     {
604 :     my($self, $name, $info) = @_;
605 :    
606 :     my $id = $self->lookup_cluster($name);
607 :    
608 :     return $id if defined($id);
609 : olson 1.2
610 : olson 1.6 my $sth = $self->{dbh}->prepare("insert into js_cluster (name, info) values (?, ?)");
611 : olson 1.4 $sth->execute($name, $info);
612 : olson 1.6 my $id = $self->get_inserted_id('js_cluster', $sth);
613 :    
614 :     #
615 :     # We must ensure that there is a cluster-work entry for each piece
616 :     # of work that is marked as cluster work.
617 :     #
618 :     # LOCKING
619 :     #
620 :    
621 :     $self->create_cluster_work_entries_for_cluster($id);
622 :    
623 : olson 1.2 return $id;
624 :     }
625 :    
626 : olson 1.6 sub create_cluster_work_entries_for_cluster
627 :     {
628 :     my($self, $cluster_id) = @_;
629 :    
630 :     #
631 :     # Find all cluster work so we can create new work entries for this cluster.
632 :     #
633 :    
634 :     my $res = $self->{db}->SQL("select id, job_id, work_type from js_cluster_work where must_execute_on_cluster is null");
635 :    
636 : olson 1.10 my $sth = $self->{dbh}->prepare(q(INSERT INTO js_cluster_work (id, job_id, work_type, status, must_execute_on_cluster, work_derived_from)
637 :     VALUES (?, ?, ?, ?, ?, ?)));
638 : olson 1.6
639 :     for my $ent (@$res)
640 :     {
641 :     my($id, $job_id, $work_type) = @$ent;
642 :    
643 : olson 1.10 my $nwork_id = $self->get_work_id();
644 :     $sth->execute($nwork_id, $job_id, $work_type, AVAIL, $cluster_id, $id);
645 : olson 1.6 }
646 :     }
647 :    
648 :     sub create_cluster_work_entries_for_job
649 :     {
650 :     my($self, $job_id) = @_;
651 :    
652 :     #
653 :     # Find all clusters so we can create new work entries for this job.
654 :     #
655 :    
656 :     my $res = $self->{db}->SQL("select id from js_cluster");
657 :    
658 :     my $job_res = $self->{db}->SQL("select id, work_type from js_cluster_work where must_execute_on_cluster is null and job_id = ?",
659 :     undef, $job_id);
660 :    
661 : olson 1.7 my $sth = $self->{dbh}->prepare(q(INSERT INTO js_cluster_work (id, job_id, work_type, status, must_execute_on_cluster, work_derived_from)
662 :     VALUES (?, ?, ?, ?, ?, ?)));
663 : olson 1.6
664 :     for my $ent (@$res)
665 :     {
666 :     my($cluster_id) = @$ent;
667 :    
668 :     for my $jent (@$job_res)
669 :     {
670 :     my($work_id, $work_type) = @$jent;
671 :    
672 : olson 1.7 my $nwork_id = $self->get_work_id();
673 :     $sth->execute($nwork_id, $job_id, $work_type, AVAIL, $cluster_id, $work_id);
674 : olson 1.6 }
675 :     }
676 :     }
677 :    
678 :     ###############################
679 :     #
680 :     # Work allocation.
681 :     #
682 :     #
683 :    
684 :     =head1 get_work
685 :    
686 :     Retrieve the next piece of work.
687 :    
688 :     We first want to retrieve work that is part of a job that does not have any (remaining)
689 :     cluster work items. If there is no such work, attempt to retrieve a piece of
690 :     cluster work. If there is none, return a wait code.
691 :    
692 :     We begin the process by creating two lists. First, a list of jobs that have cluster work to be done
693 :     (either available or currently being worked upon) on MY_CLUSTER_ID (the ID of the cluster that
694 :     the worker in question belongs to):
695 :    
696 :     SELECT id, job_id
697 :     FROM js_cluster_work
698 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID and status = AVAIL
699 :     ORDER BY job_id, id
700 :    
701 :     Second, a list of jobs that have noncluster work to do:
702 :    
703 :     SELECT id, job_id
704 :     FROM js_work
705 :     WHERE status = AVAIL
706 :     ORDER BY job_id, id
707 :    
708 :     We can now define our policy. The ORDER BY clauses, together with the
709 :     sequential allocation of job and work identifiers by the
710 :     auto-incrementing table keys, enforce a FIFO ordering on jobs and work
711 :     units. We choose work by picking the lowest numbered job between the
712 :     two lists.
713 :    
714 :     In other words, if we have a job with noncluster work available with a
715 :     lower jobid than another job with cluster-work available, we will
716 :     allocate first to the lower job.
717 :    
718 :     Similarly, if there is cluster work ready to be done for the cluster
719 :     the current worker is part of, we prefer doing that even to working on
720 :     a job that has work to be done that has a larger jobid.
721 :    
722 :     Note that that the two queries above do not take into account work
723 :     that is currently in progress. We must account for the following case:
724 :    
725 :     If there is no cluster work available for a particular job, and there is
726 :     noncluster work available for that job, we must check that the cluster
727 :     work for that job is actually finished:
728 :    
729 :     SELECT count(*)
730 :     FROM js_cluster_work
731 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID and status = 1 and
732 :     job_id = SOMEJOB
733 :    
734 :     We require that the count above be zero; otherwise we cannot allocate
735 :     work out of SOMEJOB. If it is the case that additional jobs are
736 :     available, it is then possible to allocate work out of them, following
737 :     the rules above.
738 :    
739 :     We can use grouping to determine this status in fewer queries:
740 :    
741 :     SELECT job_id, status, count(id)
742 :     FROM js_cluster_work
743 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID
744 :     GROUP BY job_id, status
745 :     ORDER_BY job_id
746 :    
747 :     This gives us the complete status of cluster work for my cluster id.
748 :    
749 :     =cut
750 :    
751 :     sub get_work
752 :     {
753 :     my($self, $worker_id) = @_;
754 :     my(@times);
755 :     push(@times, time, '');
756 : olson 1.8 my @tables = qw(js_cluster_exec
757 :     js_cluster_work
758 :     js_exec
759 :     js_work
760 :     js_worker
761 :     );
762 :    
763 :     $self->worker_alive($worker_id);
764 :    
765 : olson 1.6 #
766 :     # A FigKernelPackages::DBrtns object
767 :     #
768 :     my $db = $self->{db};
769 :    
770 :     #
771 :     # A DBI database handle.
772 :     #
773 :     my $dbh = $self->{dbh};
774 :    
775 :     push(@times, time, 'init');
776 :     my $worker_info = $self->get_worker_info($worker_id);
777 :    
778 :     my $cluster_id = $worker_info->{cluster_id};
779 :    
780 :     #
781 :     # Serialize completely for now.
782 :     #
783 :    
784 :     push(@times, time, 'get info');
785 : olson 1.8
786 :     local $dbh->{AutoCommit} = 0;
787 :     $self->lock_tables(@tables);
788 :    
789 : olson 1.6 push(@times, time, 'lock tables');
790 :    
791 :     #
792 :     # Determine the jobs that have cluster and noncluster work.
793 :     #
794 :    
795 :     my $res = $db->SQL(qq(SELECT w.job_id, w.status, count(w.id)
796 :     FROM js_cluster_work w, js_worker_can_execute e
797 :     WHERE w.must_execute_on_cluster = ? AND
798 :     e.worker_id = ? AND
799 :     w.work_type = e.work_type
800 :     GROUP BY job_id, status),
801 :     undef, $cluster_id, $worker_id);
802 :     push(@times, time, 'get cluster');
803 :    
804 :     #
805 :     # Work is a hash with key of job_id. Each value is a
806 :     # pair
807 :     #
808 :     # hash from status => count of entries
809 :     # noonzero if work is available
810 :     #
811 :     my %work;
812 :    
813 :     for my $ent (@$res)
814 :     {
815 :     my($job_id, $status, $count) = @$ent;
816 :    
817 :     $work{$job_id}->[0]->{$status} = $count;
818 :     }
819 :    
820 :    
821 :     push(@times, time, 'crunch');
822 :     my $noncluster_work = $dbh->selectcol_arrayref(qq(SELECT distinct(job_id)
823 :     FROM js_work w, js_worker_can_execute e
824 :     WHERE status = ? AND
825 :     e.worker_id = ? AND
826 :     w.work_type = e.work_type
827 :     ORDER BY job_id), undef, AVAIL, $worker_id);
828 :     push(@times, time, 'get noncluster');
829 :     map { $work{$_}->[1] = 1; } @$noncluster_work;
830 :    
831 :     # print "Got work ", Dumper(\%work);
832 :    
833 :     #
834 :     # We can now walk %work looking for entries where there is either cluster work
835 :     # available, or cluster work is finished and noncluster work is available, or
836 :     # there is no cluster work and cluster work is available.
837 :     #
838 :     # If any of these conditions is not met, we return a waitcode.
839 :     #
840 :    
841 :     my $ret;
842 :    
843 :     push(@times, time, 'got work');
844 :     for my $job_id (sort keys %work)
845 :     {
846 :     my($cluster_hash, $noncluster_avail) = @{$work{$job_id}};
847 :    
848 :     #
849 :     # Only assign cluster work if there is noncluster work available to be worked on.
850 :     #
851 :     if ($cluster_hash->{+AVAIL} > 0 and
852 :     $noncluster_avail)
853 :     {
854 :     warn"ASSIGN cluster work for job $job_id\n";
855 :     $ret = $self->assign_cluster_work($job_id, $cluster_id, $worker_id);
856 :     last;
857 :     }
858 :     elsif ($cluster_hash->{+AVAIL} == 0 and
859 :     $cluster_hash->{+TAKEN} == 0 and
860 :     $noncluster_avail)
861 :     {
862 :     warn "ASSIGN noncluster for job $job_id\n";
863 :     $ret = $self->assign_noncluster_work($job_id, $worker_id);
864 :     last;
865 :     }
866 :     elsif (!defined($cluster_hash) and $noncluster_avail)
867 :     {
868 :     warn "ASSIGN noncluster (no cluster work) for job $job_id\n";
869 :     $ret = $self->assign_noncluster_work($job_id, $worker_id);
870 :     last;
871 :     }
872 :     }
873 :     push(@times, time, 'assigned');
874 :    
875 :     my $last = shift(@times);
876 :     shift(@times);
877 :     while (@times)
878 :     {
879 :     my($t, $tag) = splice(@times, 0, 2);
880 :     my $elap = 1000 * ($t - $last);
881 :     warn sprintf "Elap $tag: %.2f ms\n", $elap;
882 :     }
883 :    
884 :    
885 :     if (!$ret)
886 :     {
887 :     warn "ASSIGN waitcode\n";
888 :     $ret = $self->assign_waitcode();
889 :     }
890 :    
891 : olson 1.8 $self->unlock_tables(@tables);
892 :     $self->{dbh}->commit();
893 :    
894 : olson 1.6 return $ret;
895 :     }
896 :    
897 : olson 1.7 #
898 :     # Return a set of handles that should be used for uploading computation results.
899 :     #
900 :    
901 :     sub get_upload_handles
902 :     {
903 :     my($self, $job_id, $work_id, $worker_id, $filenames) = @_;
904 :    
905 : olson 1.8 $self->worker_alive($worker_id);
906 : olson 1.7 my $stage_url = new URI($self->{fig}->cgi_url());
907 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
908 :    
909 :     my $ret = {};
910 :     for my $name (@$filenames)
911 :     {
912 :     $stage_url->query_form(work_id => $work_id,
913 :     job_id => $job_id,
914 :     filename => $name);
915 :    
916 :     $ret->{$name} = $stage_url->as_string();
917 :     }
918 :     return $ret;
919 :     }
920 :    
921 :     #
922 : olson 1.8 # Heartbeat.
923 :     #
924 :    
925 :     sub worker_alive
926 :     {
927 :     my($self, $worker_id) = @_;
928 :    
929 :     $self->{db}->SQL("update js_worker set last_heartbeat = NOW() where id = ?",
930 :     undef, $worker_id);
931 :     }
932 :    
933 :     #
934 : olson 1.7 # Mark this work done.
935 :     #
936 :    
937 :     sub work_done
938 :     {
939 :     my($self, $job_id, $work_id, $worker_id, $output) = @_;
940 :    
941 : olson 1.8 $self->worker_alive($worker_id);
942 : olson 1.7 #
943 :     # Find the exec entry for this work. We also determine here
944 :     # whether this is a cluster or noncluster piece of work.
945 :     #
946 :    
947 : olson 1.8 local $self->{dbh}->{AutoCommit} = 0;
948 :     $self->lock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
949 : olson 1.7
950 :     my $ncw = $self->{db}->SQL(qq(SELECT status, active_exec_id
951 :     FROM js_work
952 :     WHERE id = ? and job_id = ?),
953 :     undef, $work_id, $job_id);
954 :    
955 :     if (@$ncw == 1)
956 :     {
957 :     my($status, $exec) = @{$ncw->[0]};
958 :    
959 :     warn "noncluster done: status=$status exec=$exec output=$output\n";
960 :    
961 : olson 1.8
962 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
963 :     $self->{dbh}->commit();
964 :    
965 :    
966 :     return $self->work_done_noncluster(1, $job_id, $work_id, $worker_id, $output, $status, $exec);
967 : olson 1.7 }
968 :    
969 :     my $cw = $self->{db}->SQL(qq(SELECT status, active_exec_id, must_execute_on_cluster
970 :     FROM js_cluster_work
971 :     WHERE id = ? and job_id = ?),
972 :     undef, $work_id, $job_id);
973 :    
974 :     if (@$cw == 1)
975 :     {
976 :     my($status, $exec, $cluster) = @{$cw->[0]};
977 :    
978 :     warn "cluster done: status=$status exec=$exec cluster=$cluster\n";
979 :    
980 : olson 1.8 $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
981 :     $self->{dbh}->commit();
982 :     return $self->work_done_cluster(1, $job_id, $work_id, $worker_id, $output, $status, $exec, $cluster);
983 : olson 1.7 }
984 :    
985 : olson 1.8 $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
986 :     $self->{dbh}->commit();
987 :    
988 :     die "Could not find work entries";
989 :     }
990 :     #
991 :     # Mark this work as failed.
992 :     #
993 :    
994 :     sub work_failed
995 :     {
996 :     my($self, $job_id, $work_id, $worker_id, $output) = @_;
997 :    
998 :     $self->worker_alive($worker_id);
999 :    
1000 :     #
1001 :     # Find the exec entry for this work. We also determine here
1002 :     # whether this is a cluster or noncluster piece of work.
1003 :     #
1004 :    
1005 :     local $self->{dbh}->{AutoCommit} = 0;
1006 :     $self->lock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1007 :    
1008 :     my $ncw = $self->{db}->SQL(qq(SELECT status, active_exec_id
1009 :     FROM js_work
1010 :     WHERE id = ? and job_id = ?),
1011 :     undef, $work_id, $job_id);
1012 :    
1013 :     if (@$ncw == 1)
1014 :     {
1015 :     my($status, $exec) = @{$ncw->[0]};
1016 :    
1017 :     warn "noncluster failed: status=$status exec=$exec output=$output\n";
1018 :    
1019 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1020 :     $self->{dbh}->commit();
1021 :     return $self->work_done_noncluster(0, $job_id, $work_id, $worker_id, $output, $status, $exec);
1022 :     }
1023 :    
1024 :     my $cw = $self->{db}->SQL(qq(SELECT status, active_exec_id, must_execute_on_cluster
1025 :     FROM js_cluster_work
1026 :     WHERE id = ? and job_id = ?),
1027 :     undef, $work_id, $job_id);
1028 :    
1029 :     if (@$cw == 1)
1030 :     {
1031 :     my($status, $exec, $cluster) = @{$cw->[0]};
1032 :    
1033 :     warn "cluster failed: status=$status exec=$exec cluster=$cluster\n";
1034 :    
1035 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1036 :     $self->{dbh}->commit();
1037 :    
1038 :     return $self->work_done_cluster(0, $job_id, $work_id, $worker_id, $output, $status, $exec, $cluster);
1039 :     }
1040 :    
1041 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1042 :     $self->{dbh}->commit();
1043 : olson 1.7 die "Could not find work entries";
1044 :     }
1045 :    
1046 :     #
1047 :     # Noncluster work is complete. Mark the exec record, then mark the work record.
1048 :     #
1049 :    
1050 :     sub work_done_noncluster
1051 :     {
1052 : olson 1.8 my($self, $success, $job_id, $work_id, $worker_id, $output, $old_status, $exec) = @_;
1053 : olson 1.7
1054 :     my $db = $self->{db};
1055 :    
1056 : olson 1.8
1057 : olson 1.7 $db->SQL(qq(UPDATE js_exec
1058 :     SET status = ?, job_finished = NOW()
1059 : olson 1.8 WHERE id = ?),
1060 :     undef,
1061 :     $success ? DONE : FAILED,
1062 :     $exec);
1063 : olson 1.7
1064 :     $db->SQL(qq(UPDATE js_work
1065 :     SET status = ?, output = ?
1066 : olson 1.8 WHERE id = ?),
1067 :     undef,
1068 :     $success ? DONE : AVAIL,
1069 :     $output,
1070 :     $work_id);
1071 : olson 1.7
1072 :     return 1;
1073 :     }
1074 :    
1075 :    
1076 :     sub work_done_cluster
1077 :     {
1078 : olson 1.8 my($self, $success, $job_id, $work_id, $worker_id, $old_status, $output, $exec, $cluster) = @_;
1079 : olson 1.7
1080 :     my $db = $self->{db};
1081 :    
1082 :     $db->SQL(qq(UPDATE js_cluster_exec
1083 :     SET status = ?, job_finished = NOW()
1084 : olson 1.8 WHERE id = ?),
1085 :     undef,
1086 :     $success ? DONE : FAILED,
1087 :     $exec);
1088 : olson 1.7
1089 :     $db->SQL(qq(UPDATE js_cluster_work
1090 :     SET status = ?
1091 : olson 1.8 WHERE id = ?),
1092 :     undef,
1093 :     $success ? DONE : AVAIL,
1094 :     $work_id);
1095 : olson 1.7
1096 :     }
1097 : olson 1.6
1098 :     ###############################
1099 :     #
1100 :     # Job setup code.
1101 :     #
1102 :     #
1103 :    
1104 :    
1105 :     =head1 setup_sim_job
1106 :    
1107 :     Set up for a new similarity computation.
1108 :    
1109 :     We are given a NR filename, a fasta input filename, a chunk size,
1110 :     and an optional BLAST threshhold.
1111 :    
1112 :     We create a new sim_job record for this job, and create a spool directory
1113 :     for it. The NR and fasta input are copied to the spool directory.
1114 :    
1115 :     The fasta is carved up into chunk_size blocks of sequences. js_work and js_sim_work
1116 :     records are created for each of the blocks.
1117 :    
1118 :     =cut
1119 :    
1120 :     sub setup_sim_job
1121 :     {
1122 :     my($self, $nr, $input, $chunk_size, $thresh) = @_;
1123 :    
1124 :     if (!defined($thresh))
1125 :     {
1126 :     $thresh = 1.0e-5;
1127 :     }
1128 :    
1129 :     &FIG::verify_dir($cluster_spool);
1130 :    
1131 :     #
1132 :     # Find the job type for sims.
1133 :     #
1134 :    
1135 :     my $sim_job_type;
1136 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'sim'");
1137 :     if ($res and @$res > 0)
1138 :     {
1139 :     $sim_job_type = $res->[0]->[0];
1140 :     }
1141 :     else
1142 :     {
1143 :     die "Cannot determine job type for sim jobs.";
1144 :     }
1145 :    
1146 :     #
1147 :     # Create the job record for this run.
1148 :     #
1149 :    
1150 : olson 1.7 #
1151 :     # Feh.
1152 :     #
1153 :    
1154 :     my $sth;
1155 :     if ($self->{dbms} eq "Pg")
1156 :     {
1157 :     $sth = $self->{dbh}->prepare("insert into js_job default values ");
1158 :     }
1159 :     else
1160 :     {
1161 :     $sth = $self->{dbh}->prepare("insert into js_job () values ()");
1162 :     }
1163 :    
1164 : olson 1.6 $sth->execute();
1165 :     my $id = $self->get_inserted_id('js_job', $sth);
1166 :    
1167 : olson 1.7 warn "Got new id $id\n";
1168 : olson 1.6
1169 :     my $sth = $self->{dbh}->prepare("insert into js_job_sim (id, chunk_size, thresh) values (?, ?, ?)");
1170 :     $sth->execute($id, $chunk_size, $thresh);
1171 :    
1172 :     #
1173 :     # Create the spool directory.
1174 :     #
1175 :    
1176 :     my $spool = "$cluster_spool/sim_$id";
1177 :    
1178 :     &FIG::verify_dir($spool);
1179 :    
1180 :     my $nr_file = "$spool/nr";
1181 :     my $input_file = "$spool/fasta";
1182 :     my $output_dir = "$spool/out";
1183 :     &FIG::verify_dir($output_dir);
1184 :    
1185 : olson 1.8 $nr = abs_path($nr);
1186 :     $input = abs_path($input);
1187 :    
1188 : olson 1.6 #
1189 :     # for now, symlink so we don't have to wait on copy.
1190 :     #
1191 :    
1192 :     if (-f $nr_file)
1193 :     {
1194 :     unlink($nr_file) or die "Could not remove old nr file $nr_file: $!\n";
1195 :     }
1196 :    
1197 :     if (-s $nr > 100000)
1198 :     {
1199 :     symlink($nr, $nr_file);
1200 :     }
1201 :     else
1202 :     {
1203 :     copy($nr, $nr_file);
1204 :     }
1205 :    
1206 :     if (-f $input_file)
1207 :     {
1208 :     unlink($input_file) or die "Could not remove old input file $input_file: $!\n";
1209 :     }
1210 :    
1211 :     if (-s $input > 100000)
1212 :     {
1213 :     symlink($input, $input_file);
1214 :     }
1215 :     else
1216 :     {
1217 :     copy($input, $input_file);
1218 :     }
1219 :    
1220 :     $self->{db}->SQL("update js_job_sim set nr_path = ?, input_path = ?, output_path = ? where id = ?",
1221 :     undef, $nr_file, $input_file, $output_dir, $id);
1222 :    
1223 :     $self->add_nr_input_file($id, $nr_file);
1224 :     $self->add_input_file($id, $input_file);
1225 :     #
1226 :     # We know enough now to chunk up the job and create the work entries.
1227 :     #
1228 :    
1229 :     open(my $fasta_fh, "<$input_file");
1230 :    
1231 :     local($/) = "\n>";
1232 :    
1233 :     sub add_work_chunk
1234 :     {
1235 :     my($chunk) = @_;
1236 :    
1237 :     my $chunk_txt = join("\n", @$chunk) . "\n";
1238 :    
1239 : olson 1.7 my $work_id = $self->get_work_id();
1240 :    
1241 :     my $sth = $self->{dbh}->prepare("insert into js_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1242 :     $sth->execute($work_id, $id, $sim_job_type, AVAIL);
1243 : olson 1.6 # print "Created work $work_id\n";
1244 :    
1245 :     $self->{db}->SQL("insert into js_sim_work values (?, ?)", undef, $work_id, $chunk_txt);
1246 :     }
1247 :    
1248 :     my @cur_chunk;
1249 :    
1250 :     while (<$fasta_fh>)
1251 :     {
1252 :     chomp;
1253 :    
1254 :     #
1255 :     # Zorch the leading > we get on the first line.
1256 :     #
1257 :    
1258 :     s/^>//g;
1259 :    
1260 :     #
1261 :     # And add it back; the chomp removes it.
1262 :     #
1263 :     push(@cur_chunk, ">$_");
1264 :    
1265 :     if (@cur_chunk == $chunk_size)
1266 :     {
1267 :     add_work_chunk(\@cur_chunk);
1268 :     @cur_chunk = ();
1269 :     }
1270 :    
1271 :     }
1272 :     if (@cur_chunk > 0)
1273 :     {
1274 :     add_work_chunk(\@cur_chunk);
1275 :     }
1276 :     close($fasta_fh);
1277 :    
1278 :     #
1279 :     # If we know of any clusters, we need to create the cluster-work entris
1280 :     # for this job.
1281 :     #
1282 :    
1283 :     $self->create_cluster_work_entries_for_job($id, $sim_job_type);
1284 :    
1285 :     }
1286 :    
1287 :    
1288 :    
1289 :     ###############################
1290 :     #
1291 :     # Utilities.
1292 :     #
1293 :     #
1294 :    
1295 :     sub assign_waitcode
1296 :     {
1297 :     my($self) = @_;
1298 : olson 1.7
1299 :     return {
1300 :     work_name => "wait",
1301 :     job_specific => {}
1302 :     };
1303 : olson 1.6 }
1304 :    
1305 :     =head1 assign_cluster_work
1306 :    
1307 :     Assign a piece of cluster work from job $job_id to worker $worker_id.
1308 :    
1309 :     =cut
1310 :    
1311 :     sub assign_cluster_work
1312 :     {
1313 :     my($self, $job_id, $cluster_id, $worker_id) = @_;
1314 :    
1315 :     my $res = $self->{db}->SQL(qq(SELECT w.id, w.work_type, n.name, w.work_derived_from
1316 :     FROM js_cluster_work w, js_work_type n
1317 :     WHERE
1318 :     w.status = ? AND
1319 :     w.work_type = n.id AND
1320 :     w.job_id = ? AND
1321 :     w.must_execute_on_cluster = ?
1322 :     ORDER BY w.id
1323 :     LIMIT 1
1324 :     ), undef, AVAIL, $job_id, $cluster_id);
1325 :     if (not $res or @$res == 0)
1326 :     {
1327 :     die "assign_cluster_work: work lookup failed\n";
1328 :     }
1329 :     my($work_id, $work_type, $work_name, $derived_from) = @{$res->[0]};
1330 :    
1331 :     #
1332 :     # Create an execution record for this assignment.
1333 :     #
1334 :    
1335 :     my $sth = $self->{dbh}->prepare(qq(INSERT INTO js_cluster_exec (work_id, cluster_id, worker_id,
1336 :     status, job_taken)
1337 :     VALUES (?, ?, ?, ?, NOW())));
1338 :     $sth->execute($work_id, $cluster_id, $worker_id, TAKEN);
1339 :     my $exec_id = $self->get_inserted_id('js_cluster_exec', $sth);
1340 :    
1341 :     #
1342 :     # Now update the work record.
1343 :     #
1344 :    
1345 :     $self->{db}->SQL(qq(UPDATE js_cluster_work
1346 :     SET status = ?, active_exec_id = ?
1347 :     WHERE id = ?), undef, TAKEN, $exec_id, $work_id);
1348 :    
1349 :     return $self->construct_work_return($job_id, $worker_id, $work_id, $derived_from,
1350 :     $exec_id, $work_type, $work_name);
1351 :     }
1352 :    
1353 :    
1354 :     sub assign_noncluster_work
1355 :     {
1356 :     my($self, $job_id, $worker_id) = @_;
1357 :    
1358 :     my $res = $self->{db}->SQL(qq(SELECT w.id, w.work_type, n.name
1359 :     FROM js_work w, js_work_type n
1360 :     WHERE
1361 :     w.status = ? AND
1362 :     w.work_type = n.id AND
1363 :     w.job_id = ?
1364 :     ORDER BY w.id
1365 :     LIMIT 1
1366 :     ), undef, AVAIL, $job_id);
1367 :    
1368 :     if (not $res or @$res == 0)
1369 :     {
1370 :     die "assign_cluster_work: work lookup failed\n";
1371 :     }
1372 :     my($work_id, $work_type, $work_name) = @{$res->[0]};
1373 :    
1374 :     #
1375 :     # Create an execution record for this assignment.
1376 :     #
1377 :    
1378 :     my $sth = $self->{dbh}->prepare(qq(INSERT INTO js_exec (work_id, worker_id, status, job_taken)
1379 :     VALUES (?, ?, ?, NOW())));
1380 :     $sth->execute($work_id, $worker_id, TAKEN);
1381 :     my $exec_id = $self->get_inserted_id('js_exec', $sth);
1382 :    
1383 :     #
1384 :     # Now update the work record.
1385 :     #
1386 :    
1387 :     $self->{db}->SQL(qq(UPDATE js_work
1388 :     SET status = ?, active_exec_id = ?
1389 :     WHERE id = ?), undef, TAKEN, $exec_id, $work_id);
1390 :    
1391 :     return $self->construct_work_return($job_id, $worker_id, $work_id, $work_id,
1392 :     $exec_id, $work_type, $work_name);
1393 :     }
1394 :    
1395 :     =head1
1396 :    
1397 :     Construct the work assignment to be returned from a get_work
1398 :     request. This routine is given all of the particulars for a pice
1399 :     of work, including the name of the worktype. We attempt to create
1400 :     the return by invoking $self->constuct_work_for_TYPE.
1401 :    
1402 :     (Better design likely needed for this, but this is proof of principle code.)
1403 :    
1404 :     $actual_work_id is the work_id that has the type-specific work attached. This
1405 :     will be different than $work_id in the case of cluster work, where a base
1406 :     piece of work is snapshotted for each cluster; the type-specific work information
1407 :     remains attached to the base work.
1408 :    
1409 :     =cut
1410 :    
1411 :     sub construct_work_return
1412 :     {
1413 :     my($self, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id, $work_type, $work_name) = @_;
1414 :    
1415 :     #
1416 :     # Construct the return struct. It has the following fields at all times;
1417 :     # per-work-type methods are allowed to add/modify as desired.
1418 :     #
1419 :     my $ret = {
1420 :     job_id => $job_id,
1421 :     worker_id => $worker_id,
1422 :     work_id => $work_id,
1423 :     exec_id => $exec_id,
1424 :     work_name => $work_name,
1425 :     job_specific => {},
1426 :     };
1427 :    
1428 :     my $ok = eval {
1429 :     $work_name =~ /^\w+$/ or die "Invalid work_name $work_name";
1430 :     my $method = "construct_work_for_$work_name";
1431 :     $self->$method($ret->{job_specific}, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id);
1432 :     };
1433 :    
1434 :     if (!$ok)
1435 :     {
1436 :     warn "construct_work_return: work-specific construction for $work_name failed: $@";
1437 :     }
1438 :    
1439 :     return $ret;
1440 :     }
1441 :    
1442 :     sub construct_work_for_stage_nr
1443 :     {
1444 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1445 :    
1446 : olson 1.7 #
1447 :     # For staging, we return the URL by which the client will retrieve the file.
1448 :     #
1449 :    
1450 :     my $stage_url = new URI($self->{fig}->cgi_url());
1451 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
1452 :     $stage_url->query_form(work_id => $actual_work_id,
1453 :     job_id => $job_id);
1454 :    
1455 :     my $res = $self->{dbh}->selectcol_arrayref("select path from js_stage_nr_work where id = ?",
1456 :     undef, $actual_work_id);
1457 :    
1458 :     $return_struct->{file} = basename($res->[0]);
1459 :     $return_struct->{url} = $stage_url->as_string();
1460 : olson 1.6 }
1461 :    
1462 :     sub construct_work_for_stage
1463 :     {
1464 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1465 :    
1466 :     #
1467 :     # For staging, we return the URL by which the client will retrieve the file.
1468 :     #
1469 :    
1470 :     my $stage_url = new URI($self->{fig}->cgi_url());
1471 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
1472 :     $stage_url->query_form(work_id => $actual_work_id,
1473 :     job_id => $job_id);
1474 :    
1475 : olson 1.7 my $res = $self->{dbh}->selectcol_arrayref("select path from js_stage_work where id = ?",
1476 :     undef, $actual_work_id);
1477 : olson 1.6
1478 : olson 1.7 $return_struct->{file} = basename($res->[0]);
1479 : olson 1.6 $return_struct->{url} = $stage_url->as_string();
1480 :    
1481 :     }
1482 :    
1483 :     =head1 construct_work_for_sim
1484 :    
1485 :     Job-specific work return method.
1486 :    
1487 :     =cut
1488 :    
1489 :     sub construct_work_for_sim
1490 :     {
1491 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1492 :    
1493 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT input_seq
1494 :     FROM js_sim_work
1495 :     WHERE id = ?), undef, $actual_work_id);
1496 :     if (not $out or @$out != 1)
1497 :     {
1498 :     die "construct_work_for_sim: query did not return expected results for actual_work_id=$actual_work_id";
1499 :     }
1500 :    
1501 :     $return_struct->{input_seq} = $out->[0];
1502 :    
1503 :     #
1504 :     # We also lookup the BLAST threshhold information from the job.
1505 :     #
1506 :    
1507 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT thresh
1508 :     FROM js_job_sim
1509 :     WHERE id = ?), undef, $job_id);
1510 :     if (not $out or @$out != 1)
1511 :     {
1512 :     die "construct_work_for_sim: job query did not return expected results for job_id=$job_id";
1513 :     }
1514 :    
1515 :     $return_struct->{blast_thresh} = $out->[0];
1516 :     }
1517 :    
1518 :     =head1 open_staging_file
1519 :    
1520 :     Open a filehandle to the file we are staging to a client.
1521 :    
1522 :     We are given the job_id and work_id that define the file. If there are any
1523 :     problems, return undef (or die if there is an error message).
1524 :    
1525 :     =cut
1526 :    
1527 :     sub open_staging_file
1528 :     {
1529 :     my($self, $job_id, $work_id) = @_;
1530 :    
1531 :     #
1532 :     # Determine what kind of staging this was.
1533 :     #
1534 :    
1535 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT wt.name
1536 :     FROM js_cluster_work w, js_work_type wt
1537 :     WHERE wt.id = w.work_type AND
1538 :     w.job_id = ? AND
1539 :     w.id = ?),
1540 :     undef, $job_id, $work_id);
1541 :     $out and @$out == 1 or
1542 :     die "open_staging_file: job query did not return expected results for work_id=$work_id";
1543 :    
1544 :     my $work_type = $out->[0];
1545 :    
1546 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT path
1547 :     FROM js_${work_type}_work
1548 : olson 1.7 WHERE id = ? ), undef, $work_id);
1549 : olson 1.6 if (not $out or @$out != 1)
1550 :     {
1551 :     die "open_staging_file: job query did not return expected results for work_id=$work_id";
1552 :     }
1553 :    
1554 :     my $fh;
1555 :     my $size;
1556 :     my $file = $out->[0];
1557 :    
1558 :     $size = -s $file;
1559 :    
1560 :     if (open($fh, "<$file"))
1561 :     {
1562 :     return ($fh, $size, basename($file));
1563 :     }
1564 :     else
1565 :     {
1566 :     die "open_staging_file: could not open file: $!";
1567 :     }
1568 :    
1569 :     }
1570 :    
1571 : olson 1.7 =head1 open_output_file
1572 :    
1573 :     Open a filehandle to the file we are writing as output from a worker.
1574 :    
1575 :     We are given the job_id and work_id that define the file. If there are any
1576 :     problems, return undef (or die if there is an error message).
1577 :    
1578 :     =cut
1579 :    
1580 :     sub open_output_file
1581 :     {
1582 :     my($self, $job_id, $work_id, $filename) = @_;
1583 :    
1584 :     my $job_dir = "$cluster_spool/job_$job_id";
1585 :     &FIG::verify_dir($job_dir);
1586 :     my $work_dir = "$job_dir/work_$work_id";
1587 :     &FIG::verify_dir($work_dir);
1588 :    
1589 :     my $local_path = "$work_dir/" . basename($filename);
1590 :    
1591 :     my $fh;
1592 :     if (open($fh, ">$local_path"))
1593 :     {
1594 :     return $fh;
1595 :     }
1596 :     else
1597 :     {
1598 :     die "Cannot open $local_path: $!";
1599 :     }
1600 :     }
1601 :    
1602 : olson 1.6 =head1 add_input_file
1603 :    
1604 :     Add a file to the set of files to be staged for input to a job. Returns the file id.
1605 :    
1606 :     =cut
1607 :    
1608 :     sub add_input_file
1609 :     {
1610 :     my($self, $job_id, $path) = @_;
1611 :    
1612 :     my $stage_job_type;
1613 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'stage'");
1614 :     if ($res and @$res > 0)
1615 :     {
1616 :     $stage_job_type = $res->[0]->[0];
1617 :     }
1618 :     else
1619 :     {
1620 :     die "Cannot determine job type for sim jobs.";
1621 :     }
1622 :    
1623 : olson 1.7 my $work_id = $self->get_work_id();
1624 :    
1625 :     my $sth = $self->{dbh}->prepare("insert into js_cluster_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1626 : olson 1.6
1627 : olson 1.7 $sth->execute($work_id, $job_id, $stage_job_type, AVAIL);
1628 : olson 1.6
1629 :     $self->{db}->SQL("insert into js_stage_work values (?, ?)", undef, $work_id, $path);
1630 :     }
1631 :    
1632 :     sub add_nr_input_file
1633 :     {
1634 :     my($self, $job_id, $path) = @_;
1635 :    
1636 :     my $stage_job_type;
1637 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'stage_nr'");
1638 :     if ($res and @$res > 0)
1639 :     {
1640 :     $stage_job_type = $res->[0]->[0];
1641 :     }
1642 :     else
1643 :     {
1644 :     die "Cannot determine job type for sim jobs.";
1645 :     }
1646 :    
1647 : olson 1.7 my $work_id = $self->get_work_id();
1648 :    
1649 :     my $sth = $self->{dbh}->prepare("insert into js_cluster_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1650 : olson 1.6
1651 : olson 1.7 $sth->execute($work_id, $job_id, $stage_job_type, AVAIL);
1652 : olson 1.6
1653 :     $self->{db}->SQL("insert into js_stage_nr_work values (?, ?)", undef, $work_id, $path);
1654 :     }
1655 :    
1656 : olson 1.7 sub get_work_id
1657 :     {
1658 :     my($self) = @_;
1659 :     my $sth;
1660 :     if ($self->{dbms} eq "Pg")
1661 :     {
1662 :     $sth = $self->{dbh}->prepare("insert into js_work_id default values ");
1663 :     }
1664 :     else
1665 :     {
1666 :     $sth = $self->{dbh}->prepare("insert into js_work_id () values ()");
1667 :     }
1668 :     $sth->execute();
1669 :     my $work_id = $self->get_inserted_id('js_work_id', $sth);
1670 :     return $work_id;
1671 :     }
1672 :    
1673 :    
1674 : olson 1.6 sub lock_tables
1675 :     {
1676 :     my($self, @tables) = @_;
1677 : olson 1.8
1678 :     if ($self->{dbms} eq "Pg")
1679 :     {
1680 :     $self->{db}->SQL("LOCK TABLE " . join(", ", @tables));
1681 :     }
1682 :     else
1683 :     {
1684 :     }
1685 : olson 1.6 }
1686 :    
1687 :     sub unlock_tables
1688 :     {
1689 :     my($self, @tables) = @_;
1690 : olson 1.8
1691 :     if ($self->{dbms} eq "Pg")
1692 :     {
1693 :    
1694 :     }
1695 :     else
1696 :     {
1697 :     }
1698 : olson 1.6 }
1699 :    
1700 : olson 1.2 sub get_inserted_id
1701 :     {
1702 : olson 1.5 my($self, $table, $sth) = @_;
1703 : olson 1.2 if ($self->{dbms} eq "Pg")
1704 :     {
1705 : olson 1.4 my $oid = $sth->{pg_oid_status};
1706 :     my $ret = $self->{db}->SQL("select id from $table where oid = ?", undef, $oid);
1707 : olson 1.2 return $ret->[0]->[0];
1708 :     }
1709 :     elsif ($self->{dbms} eq "mysql")
1710 :     {
1711 :     my $id = $self->{dbh}->{mysql_insertid};
1712 : olson 1.6 # print "mysql got $id\n";
1713 : olson 1.2 return $id;
1714 :     }
1715 :     }
1716 :    
1717 : olson 1.6 sub get_worker_info
1718 :     {
1719 :     my($self, $id) = @_;
1720 :    
1721 :     my $res = $self->{dbh}->selectall_hashref("select * from js_worker where id = ?",
1722 :     'id', undef, $id);
1723 :     return $res->{$id};
1724 :     }
1725 :    
1726 :    
1727 :    
1728 : olson 1.1 1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3