[Bio] / FigKernelPackages / Cluster.pm Repository:
ViewVC logotype

Annotation of /FigKernelPackages/Cluster.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.10 - (view) (download) (as text)

1 : olson 1.1 #
2 :     # Routines for managing SEED jobs on a cluster.
3 :     #
4 :    
5 :     package Cluster::DBJobMgr;
6 :     use strict;
7 :    
8 :     use base qw(Class::Accessor);
9 :    
10 :     __PACKAGE__->mk_accessors(qw(table_name fig db dbh dbms lock_mode));
11 :    
12 :     use constant {
13 :     AVAIL => 0,
14 :     TAKEN => 1,
15 :     DONE => 2,
16 : olson 1.6 FAILED => 3,
17 : olson 1.1 };
18 :    
19 :     #
20 :     # A database-based job manager.
21 :     #
22 :     # We use a table in the database to maintain the work to be done and the work
23 :     # as completed.
24 :     #
25 :    
26 :     sub new
27 :     {
28 :     my($class, $fig, $table_name) = @_;
29 :    
30 :     #
31 :     # Ensure table_name is valid.
32 :     #
33 :    
34 :     if ($table_name !~ /^\w+$/)
35 :     {
36 :     die "Cluster::DBJobMgr::new: Table name may only consist of alphanumeric characters, no spaces allowed.";
37 :     }
38 :    
39 :     my $db = $fig->db_handle;
40 :     my $dbh = $db->{_dbh};
41 :     my $dbms = $db->{_dbms};
42 :    
43 :     my $self = {
44 :     table_name => "pjs_$table_name",
45 :     fig => $fig,
46 :     db => $db,
47 :     dbh => $dbh,
48 :     dbms => $dbms,
49 :     lock_mode => "",
50 :     };
51 :    
52 :     bless $self, $class;
53 :    
54 :     if ($dbms eq "mysql")
55 :     {
56 :     $self->lock_mode("for update");
57 :     }
58 :    
59 :     return bless $self, $class;
60 :     }
61 :    
62 :     sub get_work
63 :     {
64 :     my($self, $worker) = @_;
65 :     my $work;
66 :    
67 :     my $dbh = $self->dbh;
68 :     my $table = $self->table_name;
69 :    
70 :     local $dbh->{AutoCommit} = 0;
71 :     local $dbh->{RaiseError} = 1;
72 :    
73 :     eval {
74 :     my $res = $dbh->selectall_arrayref("SELECT * FROM $table
75 :     WHERE status = ? LIMIT 1 " . $self->lock_mode,
76 :     undef,
77 :     AVAIL);
78 :     if (not $res or @$res == 0)
79 :     {
80 :     die "No work found\n";
81 :     }
82 :    
83 :     my ($peg, $status, $job_taken, $job_finished, $output) = @{$res->[0]};
84 :     # warn "Found peg=$peg status=$status job info $job_taken $job_finished\n";
85 :     $dbh->do("update $table set status = ?, worker = ?, job_taken = now() where peg = ?", undef,
86 :     TAKEN, $worker, $peg);
87 :    
88 :     $dbh->commit();
89 :     $work = $peg;
90 :     };
91 :    
92 :     if ($@)
93 :     {
94 :     warn "Error in get_work eval: $@\n";
95 :     $dbh->rollback();
96 :     return;
97 :     }
98 :     else
99 :     {
100 :     return $work;
101 :     }
102 :     }
103 :    
104 :     sub work_done
105 :     {
106 :     my($self, $work, $output) = @_;
107 :    
108 :     my $dbh = $self->dbh;
109 :     my $table = $self->table_name;
110 :    
111 :     local $dbh->{AutoCommit} = 0;
112 :     local $dbh->{RaiseError} = 1;
113 :    
114 :     eval {
115 :     $dbh->do("update $table set status = ?, job_finished = now(), output = ? where peg = ?", undef,
116 :     DONE, $output, $work);
117 :    
118 :     $dbh->commit();
119 :     };
120 :    
121 :     if ($@)
122 :     {
123 :     warn "Error in work_done eval: $@\n";
124 :     $dbh->rollback();
125 :     die "Invalid work request: $@";
126 :     }
127 :     else
128 :     {
129 :     return 1;
130 :     }
131 :    
132 :     }
133 :    
134 :     sub work_done
135 :     {
136 :     my($self, $work, $output) = @_;
137 :    
138 :     my $dbh = $self->dbh;
139 :     my $table = $self->table_name;
140 :    
141 :     local $dbh->{AutoCommit} = 0;
142 :     local $dbh->{RaiseError} = 1;
143 :    
144 :     eval {
145 :     $dbh->do("update $table set status = ?, job_finished = now(), output = ? where peg = ?", undef,
146 :     DONE, $output, $work);
147 :    
148 :     $dbh->commit();
149 :     };
150 :    
151 :     if ($@)
152 :     {
153 :     warn "Error in work_done eval: $@\n";
154 :     $dbh->rollback();
155 :     die "Invalid work_done request: $@";
156 :     }
157 :     else
158 :     {
159 :     return 1;
160 :     }
161 :    
162 :     }
163 :    
164 :     sub work_aborted
165 :     {
166 :     my($self, $work) = @_;
167 :    
168 :     my $dbh = $self->dbh;
169 :     my $table = $self->table_name;
170 :    
171 :     local $dbh->{AutoCommit} = 0;
172 :     local $dbh->{RaiseError} = 1;
173 :    
174 :     eval {
175 :     $dbh->do("update $table set status = ?, job_finished = now(), output = NULL where peg = ?", undef,
176 :     AVAIL, $work);
177 :    
178 :     $dbh->commit();
179 :     };
180 :    
181 :     if ($@)
182 :     {
183 :     warn "Error in work_aborted eval: $@\n";
184 :     $dbh->rollback();
185 :     die "Invalid work_aborted request:$@";
186 :     }
187 :     else
188 :     {
189 :     return 1;
190 :     }
191 :    
192 :     }
193 :    
194 : olson 1.2 package Cluster::Broker;
195 :    
196 : olson 1.8 use Cwd 'abs_path';
197 : olson 1.6 use File::Basename;
198 :     use Time::HiRes 'time';
199 :     use FIG_Config;
200 :     use FIG;
201 :    
202 :     our $ns = "http://mcs.anl.gov/FL/Schemas/cluster_service";
203 :    
204 : olson 1.2 use constant {
205 :     AVAIL => 0,
206 :     TAKEN => 1,
207 :     DONE => 2,
208 : olson 1.6 FAILED => 3,
209 : olson 1.2 };
210 :    
211 :     use strict;
212 : olson 1.5 use Data::Dumper;
213 : olson 1.6 use File::Copy;
214 :    
215 :     our $cluster_spool = "$FIG_Config::fig_disk/ClusterSpool";
216 : olson 1.2
217 :     =head1 Job Broker
218 :    
219 :     Perl code for job broker functionality.
220 :    
221 :     A broker instance maintains a database of jobs. Each job has a description
222 :     of some sort (heh), including a field defining the type of job. The plan
223 :     at this point is to not generalize the job type management, but to rather to
224 :     inclde a job type in the job table, but then to use a separate table to hold
225 :     the individual of work in each of the job types (sim-computaton, scopmap, etc).
226 :    
227 :     A complication that might occur is that we wish to be able to allocate a piece
228 :     of work with a single query "get me the next available piece of work and mark
229 :     that piece as being worked on". If jobs are kept in multiple tables, this becomes
230 :     difficult. We may be able to get around this by keeping a single work table
231 :     that refers, for each piece of work, to the work type and identifier for that
232 :     workpiece.
233 :    
234 :    
235 :     =cut
236 :    
237 :     sub init_job_tables
238 :     {
239 :     my($db) = @_;
240 :    
241 : olson 1.10 local $db->{_dbh}->{AutoCommit} = 0;
242 :    
243 : olson 1.2 my $serial_type;
244 :     if ($db->{_dbms} eq "mysql")
245 :     {
246 :     $serial_type = "int not null auto_increment";
247 :     }
248 :     elsif ($db->{_dbms} eq "Pg")
249 :     {
250 :     $serial_type = "serial";
251 : olson 1.10
252 :     $db->SQL("SET CONSTRAINTS ALL DEFERRED");
253 : olson 1.2 }
254 :    
255 : olson 1.7 #
256 :     # Tables for constants.
257 :     #
258 :    
259 :     $db->drop_table(tbl => 'js_work_type');
260 :     $db->create_table(tbl => 'js_work_type',
261 :     flds => qq(id $serial_type primary key,
262 :     name varchar(64)
263 :     )
264 :     );
265 :    
266 :     $db->drop_table(tbl => 'js_work_status');
267 :     $db->create_table(tbl => 'js_work_status',
268 :     flds => qq(status int primary key,
269 :     name varchar(64)
270 :     )
271 :     );
272 :    
273 :     $db->SQL("insert into js_work_status values (?, ?)", undef, AVAIL, 'AVAIL');
274 :     $db->SQL("insert into js_work_status values (?, ?)", undef, TAKEN, 'TAKEN');
275 :     $db->SQL("insert into js_work_status values (?, ?)", undef, FAILED, 'FAILED');
276 :     $db->SQL("insert into js_work_status values (?, ?)", undef, DONE, 'DONE');
277 :    
278 : olson 1.2 #
279 :     # A work entry belongs to a job.
280 :     # It can have zero or more exec records associated; each exec
281 :     # record documents an attempt to run that piece of work on
282 :     # a particular worker. If all goes well, there will be only
283 :     # one, but jobs get killed..
284 :     #
285 :    
286 : olson 1.6 $db->drop_table(tbl => 'js_job');
287 :     $db->create_table(tbl => 'js_job',
288 :     flds => qq(id $serial_type PRIMARY KEY
289 :     )
290 :     );
291 :    
292 :     #
293 : olson 1.7 # Table for generating work id's so the cluster and noncluster
294 :     # work items don't have overlapping IDs, so they can be distinguished
295 :     # by id.
296 :     #
297 :     $db->drop_table(tbl => 'js_work_id');
298 :     $db->create_table(tbl => 'js_work_id',
299 :     flds => qq(id $serial_type PRIMARY KEY
300 :     )
301 :     );
302 :    
303 :     $db->drop_table(tbl => 'js_cluster');
304 :     $db->create_table(tbl => 'js_cluster',
305 :     flds => qq(id $serial_type primary key,
306 :     name varchar(255),
307 :     info text
308 :     )
309 :     );
310 :    
311 :     $db->drop_table(tbl => 'js_worker');
312 :     $db->create_table(tbl => 'js_worker',
313 :     flds => qq(id $serial_type primary key,
314 :     cluster_id int REFERENCES js_cluster,
315 :     hostname varchar(255),
316 :     username varchar(32),
317 :     pid int,
318 :     exe varchar(255),
319 :     last_heartbeat timestamp
320 :     )
321 :     );
322 :    
323 :     #
324 : olson 1.6 # We use the is_cluster_work flag to implement the
325 :     # has_cluster_work relation - there is no work that
326 :     # is not either prolog or non-prolog work, so the
327 :     # job_id reference here implements the has_work relation,
328 :     # and the is_prolog_work flag partitions the work.
329 :     #
330 :     # If must_execute_on_cluster is not NULL, then this piece
331 :     # of work is a cluster-work item for that cluster. It must
332 :     # be executed before non cluster-work items.
333 :     # In this case, work_derived_from refers to the work item
334 :     # that was replicated to create this piece of work. This is
335 :     # used to locate the type-specific information of this piece
336 :     # of work.
337 :     #
338 :    
339 :     $db->drop_table(tbl => 'js_work');
340 :     $db->create_table(tbl => 'js_work',
341 : olson 1.7 flds => qq(id int PRIMARY KEY REFERENCES js_work_id,
342 : olson 1.6 job_id int REFERENCES js_job,
343 :     work_type int REFERENCES js_work_type,
344 :     status int REFERENCES js_work_status,
345 : olson 1.7 active_exec_id int,
346 :     output text
347 : olson 1.6 )
348 :     );
349 :     $db->create_index(tbl => 'js_work',
350 :     idx => 'js_work_idx_status',
351 :     flds => 'status');
352 :     $db->create_index(tbl => 'js_work',
353 :     idx => 'js_work_idx_status_jobid',
354 :     flds => ' job_id, status');
355 :    
356 :     #
357 : olson 1.7 # Execution record for a piece of work.
358 :     #
359 :    
360 :     $db->drop_table(tbl => 'js_exec');
361 :     $db->create_table(tbl => 'js_exec',
362 :     flds => qq(id $serial_type primary key,
363 :     work_id int REFERENCES js_work,
364 :     worker_id int REFERENCES js_worker,
365 :     status int REFERENCES js_work_status,
366 :     job_taken timestamp,
367 :     job_finished timestamp
368 :     )
369 :     );
370 :    
371 :     #
372 : olson 1.6 # A piece of per-cluster work.
373 :     #
374 :     # Since we don't completely know which cluster the work has to run on when it is
375 :     # created, we use a cluster_work entry with must_execute_on_cluster = NULL to
376 :     # hold the template for future clusters' work.
377 :     #
378 :    
379 :     $db->drop_table(tbl => 'js_cluster_work');
380 :     $db->create_table(tbl => 'js_cluster_work',
381 : olson 1.7 flds => qq(id int PRIMARY KEY REFERENCES js_work_id,
382 : olson 1.6 job_id int REFERENCES js_job,
383 :     work_type int REFERENCES js_work_type,
384 :     status int REFERENCES js_work_status,
385 : olson 1.7 active_exec_id int,
386 : olson 1.6 must_execute_on_cluster int REFERENCES js_cluster,
387 : olson 1.7 work_derived_from int REFERENCES js_cluster_work,
388 :     output text
389 : olson 1.6 )
390 :     );
391 :     $db->create_index(tbl => 'js_cluster_work',
392 :     idx => 'js_cluster_work_idx_status',
393 :     flds => 'status');
394 :     $db->create_index(tbl => 'js_cluster_work',
395 :     idx => 'js_cluster_work_idx_exec_type',
396 :     flds => 'must_execute_on_cluster, work_type');
397 :    
398 :    
399 :    
400 :     #
401 :     # Execution record for a piece of cluster work.
402 :     #
403 :    
404 :     $db->drop_table(tbl => 'js_cluster_exec');
405 :     $db->create_table(tbl => 'js_cluster_exec',
406 : olson 1.2 flds => qq(id $serial_type primary key,
407 : olson 1.6 cluster_id int REFERENCES js_cluster,
408 :     work_id int REFERENCES js_cluster_work,
409 :     worker_id int REFERENCES js_worker,
410 :     status int REFERENCES js_work_status,
411 : olson 1.2 job_taken timestamp,
412 :     job_finished timestamp
413 :     )
414 :     );
415 :    
416 :    
417 : olson 1.6 $db->drop_table(tbl => 'js_worker_can_execute');
418 :     $db->create_table(tbl => 'js_worker_can_execute',
419 :     flds => qq(worker_id int REFERENCES js_worker,
420 :     work_type int REFERENCES js_work_type
421 :     )
422 :     );
423 :     $db->create_index(tbl => 'js_worker_can_execute',
424 :     idx => 'js_worker_can_execute_idx',
425 :     flds =>'worker_id, work_type');
426 :    
427 :    
428 :     #
429 :     # Setup for the sim jobtype.
430 :     #
431 :     # The stuff in here isn't really needed at runtime, but for reporting
432 :     # or accounting info at the user level will be useful. It records the
433 :     # semantics of the parameters of the job.
434 :     #
435 :     # It is also used in distributing job-specific work (the threshhold here must
436 :     # be distributed to the clients).
437 :     #
438 :    
439 :     $db->drop_table(tbl => 'js_job_sim');
440 :     $db->create_table(tbl => 'js_job_sim',
441 :     flds => qq(id int PRIMARY KEY REFERENCES js_job,
442 :     nr_path varchar(255),
443 :     input_path varchar(255),
444 :     output_path varchar(255),
445 :     chunk_size int,
446 : olson 1.7 thresh double precision
447 : olson 1.6 )
448 :     );
449 :    
450 :     #
451 :     # For each piece of work, there is a js_work and a js_sim_work
452 :     # record. The js_work record keeps track of the job-independent
453 :     # info, the js_sim_work record keeps track of the job-dependent
454 :     # info (here, the input sequence).
455 :     #
456 :    
457 :     $db->drop_table(tbl => 'js_sim_work');
458 :     $db->create_table(tbl => 'js_sim_work',
459 :     flds => qq(id int PRIMARY KEY REFERENCES js_work,
460 :     input_seq text
461 :     )
462 :     );
463 :    
464 :     $db->SQL("insert into js_work_type (name) values ('sim')");
465 : olson 1.2
466 :     #
467 : olson 1.6 # File staging work.
468 : olson 1.2 #
469 :    
470 : olson 1.6 $db->drop_table(tbl => 'js_stage_work');
471 :     $db->create_table(tbl => 'js_stage_work',
472 :     flds => qq(id int PRIMARY KEY REFERENCES js_cluster_work,
473 :     path varchar(255)
474 : olson 1.2 )
475 :     );
476 :    
477 : olson 1.6 $db->SQL("insert into js_work_type (name) values ('stage')");
478 :    
479 :     #
480 :     # Blast NR staging work.
481 :     #
482 :    
483 :     $db->drop_table(tbl => 'js_stage_nr_work');
484 :     $db->create_table(tbl => 'js_stage_nr_work',
485 :     flds => qq(id int PRIMARY KEY REFERENCES js_cluster_work,
486 :     path varchar(255)
487 : olson 1.2 )
488 :     );
489 :    
490 : olson 1.6 $db->SQL("insert into js_work_type (name) values ('stage_nr')");
491 : olson 1.2
492 :     }
493 :    
494 :     #
495 :     # Constructor for a cluster scheduler instance.
496 :     #
497 :    
498 :     sub new
499 :     {
500 :     my($class, $fig) = @_;
501 :    
502 :     my $self = {
503 :     fig => $fig,
504 :     db => $fig->db_handle,
505 :     dbh => $fig->db_handle->{_dbh},
506 :     dbms => $fig->db_handle->{_dbms},
507 :     };
508 :    
509 :     bless $self, $class;
510 :    
511 :     return $self;
512 :     }
513 :    
514 : olson 1.6 ###############################
515 :     #
516 :     # Worker registration.
517 :     #
518 :    
519 : olson 1.2 =head1 register_worker
520 :    
521 :     Register this worker with the task manager. It will return a worker
522 :     ID for use in future calls. We pass the cluster name here as well,
523 :     and the cluster ID is also returned.
524 :    
525 :     =cut
526 :    
527 :     sub register_worker
528 :     {
529 : olson 1.6 my($self, $host, $user, $pid, $exe, $cluster_id, $work_types) = @_;
530 :    
531 :     #
532 :     # Validate work-types; should be either undef or a list.
533 :     #
534 :    
535 :     if ($work_types and ref($work_types) ne "ARRAY")
536 :     {
537 :     die "Invalid type for work_types list";
538 :     }
539 : olson 1.2
540 :     #
541 :     # We shouldn't have to worry about any concurrency issues in this code.
542 :     # Each worker individually registers and gets its own id.
543 :     #
544 :    
545 : olson 1.6 $cluster_id =~ /^\d+$/ or die "Invalid cluster id";
546 : olson 1.2
547 : olson 1.6 my $sth = $self->{dbh}->prepare("insert into js_worker (cluster_id, hostname, username, pid, exe) values (?, ?, ?, ?, ?)");
548 :     $sth->execute($cluster_id, $host, $user, $pid, $exe);
549 :    
550 :     my $id = $self->get_inserted_id('js_worker', $sth);
551 :    
552 :     #
553 :     # Create the worker_can_execute entries from the work-types list.
554 :     #
555 :    
556 :     if ($work_types)
557 :     {
558 :     my $qs = join(", ", map { "?" } @$work_types);
559 :    
560 :     $self->{db}->SQL(qq(INSERT INTO js_worker_can_execute
561 :     (SELECT ?, id
562 :     FROM js_work_type
563 :     WHERE name in ( $qs ))), undef, $id, @$work_types);
564 :     }
565 :    
566 :     return $id;
567 : olson 1.2 }
568 :    
569 : olson 1.6 sub lookup_cluster
570 : olson 1.2 {
571 : olson 1.6 my($self, $name) = @_;
572 : olson 1.2
573 : olson 1.6 my $res = $self->{db}->SQL("select id from js_cluster where name = ?", undef, $name);
574 : olson 1.2 if ($res and @$res > 0)
575 :     {
576 :     my $id = $res->[0]->[0];
577 :     return $id;
578 :     }
579 : olson 1.6 else
580 :     {
581 :     return undef;
582 :     }
583 :     }
584 :    
585 :     sub register_cluster
586 :     {
587 :     my($self, $name, $info) = @_;
588 :    
589 :     my $id = $self->lookup_cluster($name);
590 :    
591 :     return $id if defined($id);
592 : olson 1.2
593 : olson 1.6 my $sth = $self->{dbh}->prepare("insert into js_cluster (name, info) values (?, ?)");
594 : olson 1.4 $sth->execute($name, $info);
595 : olson 1.6 my $id = $self->get_inserted_id('js_cluster', $sth);
596 :    
597 :     #
598 :     # We must ensure that there is a cluster-work entry for each piece
599 :     # of work that is marked as cluster work.
600 :     #
601 :     # LOCKING
602 :     #
603 :    
604 :     $self->create_cluster_work_entries_for_cluster($id);
605 :    
606 : olson 1.2 return $id;
607 :     }
608 :    
609 : olson 1.6 sub create_cluster_work_entries_for_cluster
610 :     {
611 :     my($self, $cluster_id) = @_;
612 :    
613 :     #
614 :     # Find all cluster work so we can create new work entries for this cluster.
615 :     #
616 :    
617 :     my $res = $self->{db}->SQL("select id, job_id, work_type from js_cluster_work where must_execute_on_cluster is null");
618 :    
619 : olson 1.10 my $sth = $self->{dbh}->prepare(q(INSERT INTO js_cluster_work (id, job_id, work_type, status, must_execute_on_cluster, work_derived_from)
620 :     VALUES (?, ?, ?, ?, ?, ?)));
621 : olson 1.6
622 :     for my $ent (@$res)
623 :     {
624 :     my($id, $job_id, $work_type) = @$ent;
625 :    
626 : olson 1.10 my $nwork_id = $self->get_work_id();
627 :     $sth->execute($nwork_id, $job_id, $work_type, AVAIL, $cluster_id, $id);
628 : olson 1.6 }
629 :     }
630 :    
631 :     sub create_cluster_work_entries_for_job
632 :     {
633 :     my($self, $job_id) = @_;
634 :    
635 :     #
636 :     # Find all clusters so we can create new work entries for this job.
637 :     #
638 :    
639 :     my $res = $self->{db}->SQL("select id from js_cluster");
640 :    
641 :     my $job_res = $self->{db}->SQL("select id, work_type from js_cluster_work where must_execute_on_cluster is null and job_id = ?",
642 :     undef, $job_id);
643 :    
644 : olson 1.7 my $sth = $self->{dbh}->prepare(q(INSERT INTO js_cluster_work (id, job_id, work_type, status, must_execute_on_cluster, work_derived_from)
645 :     VALUES (?, ?, ?, ?, ?, ?)));
646 : olson 1.6
647 :     for my $ent (@$res)
648 :     {
649 :     my($cluster_id) = @$ent;
650 :    
651 :     for my $jent (@$job_res)
652 :     {
653 :     my($work_id, $work_type) = @$jent;
654 :    
655 : olson 1.7 my $nwork_id = $self->get_work_id();
656 :     $sth->execute($nwork_id, $job_id, $work_type, AVAIL, $cluster_id, $work_id);
657 : olson 1.6 }
658 :     }
659 :     }
660 :    
661 :     ###############################
662 :     #
663 :     # Work allocation.
664 :     #
665 :     #
666 :    
667 :     =head1 get_work
668 :    
669 :     Retrieve the next piece of work.
670 :    
671 :     We first want to retrieve work that is part of a job that does not have any (remaining)
672 :     cluster work items. If there is no such work, attempt to retrieve a piece of
673 :     cluster work. If there is none, return a wait code.
674 :    
675 :     We begin the process by creating two lists. First, a list of jobs that have cluster work to be done
676 :     (either available or currently being worked upon) on MY_CLUSTER_ID (the ID of the cluster that
677 :     the worker in question belongs to):
678 :    
679 :     SELECT id, job_id
680 :     FROM js_cluster_work
681 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID and status = AVAIL
682 :     ORDER BY job_id, id
683 :    
684 :     Second, a list of jobs that have noncluster work to do:
685 :    
686 :     SELECT id, job_id
687 :     FROM js_work
688 :     WHERE status = AVAIL
689 :     ORDER BY job_id, id
690 :    
691 :     We can now define our policy. The ORDER BY clauses, together with the
692 :     sequential allocation of job and work identifiers by the
693 :     auto-incrementing table keys, enforce a FIFO ordering on jobs and work
694 :     units. We choose work by picking the lowest numbered job between the
695 :     two lists.
696 :    
697 :     In other words, if we have a job with noncluster work available with a
698 :     lower jobid than another job with cluster-work available, we will
699 :     allocate first to the lower job.
700 :    
701 :     Similarly, if there is cluster work ready to be done for the cluster
702 :     the current worker is part of, we prefer doing that even to working on
703 :     a job that has work to be done that has a larger jobid.
704 :    
705 :     Note that that the two queries above do not take into account work
706 :     that is currently in progress. We must account for the following case:
707 :    
708 :     If there is no cluster work available for a particular job, and there is
709 :     noncluster work available for that job, we must check that the cluster
710 :     work for that job is actually finished:
711 :    
712 :     SELECT count(*)
713 :     FROM js_cluster_work
714 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID and status = 1 and
715 :     job_id = SOMEJOB
716 :    
717 :     We require that the count above be zero; otherwise we cannot allocate
718 :     work out of SOMEJOB. If it is the case that additional jobs are
719 :     available, it is then possible to allocate work out of them, following
720 :     the rules above.
721 :    
722 :     We can use grouping to determine this status in fewer queries:
723 :    
724 :     SELECT job_id, status, count(id)
725 :     FROM js_cluster_work
726 :     WHERE must_execute_on_cluster = MY_CLUSTER_ID
727 :     GROUP BY job_id, status
728 :     ORDER_BY job_id
729 :    
730 :     This gives us the complete status of cluster work for my cluster id.
731 :    
732 :     =cut
733 :    
734 :     sub get_work
735 :     {
736 :     my($self, $worker_id) = @_;
737 :     my(@times);
738 :     push(@times, time, '');
739 : olson 1.8 my @tables = qw(js_cluster_exec
740 :     js_cluster_work
741 :     js_exec
742 :     js_work
743 :     js_worker
744 :     );
745 :    
746 :     $self->worker_alive($worker_id);
747 :    
748 : olson 1.6 #
749 :     # A FigKernelPackages::DBrtns object
750 :     #
751 :     my $db = $self->{db};
752 :    
753 :     #
754 :     # A DBI database handle.
755 :     #
756 :     my $dbh = $self->{dbh};
757 :    
758 :     push(@times, time, 'init');
759 :     my $worker_info = $self->get_worker_info($worker_id);
760 :    
761 :     my $cluster_id = $worker_info->{cluster_id};
762 :    
763 :     #
764 :     # Serialize completely for now.
765 :     #
766 :    
767 :     push(@times, time, 'get info');
768 : olson 1.8
769 :     local $dbh->{AutoCommit} = 0;
770 :     $self->lock_tables(@tables);
771 :    
772 : olson 1.6 push(@times, time, 'lock tables');
773 :    
774 :     #
775 :     # Determine the jobs that have cluster and noncluster work.
776 :     #
777 :    
778 :     my $res = $db->SQL(qq(SELECT w.job_id, w.status, count(w.id)
779 :     FROM js_cluster_work w, js_worker_can_execute e
780 :     WHERE w.must_execute_on_cluster = ? AND
781 :     e.worker_id = ? AND
782 :     w.work_type = e.work_type
783 :     GROUP BY job_id, status),
784 :     undef, $cluster_id, $worker_id);
785 :     push(@times, time, 'get cluster');
786 :    
787 :     #
788 :     # Work is a hash with key of job_id. Each value is a
789 :     # pair
790 :     #
791 :     # hash from status => count of entries
792 :     # noonzero if work is available
793 :     #
794 :     my %work;
795 :    
796 :     for my $ent (@$res)
797 :     {
798 :     my($job_id, $status, $count) = @$ent;
799 :    
800 :     $work{$job_id}->[0]->{$status} = $count;
801 :     }
802 :    
803 :    
804 :     push(@times, time, 'crunch');
805 :     my $noncluster_work = $dbh->selectcol_arrayref(qq(SELECT distinct(job_id)
806 :     FROM js_work w, js_worker_can_execute e
807 :     WHERE status = ? AND
808 :     e.worker_id = ? AND
809 :     w.work_type = e.work_type
810 :     ORDER BY job_id), undef, AVAIL, $worker_id);
811 :     push(@times, time, 'get noncluster');
812 :     map { $work{$_}->[1] = 1; } @$noncluster_work;
813 :    
814 :     # print "Got work ", Dumper(\%work);
815 :    
816 :     #
817 :     # We can now walk %work looking for entries where there is either cluster work
818 :     # available, or cluster work is finished and noncluster work is available, or
819 :     # there is no cluster work and cluster work is available.
820 :     #
821 :     # If any of these conditions is not met, we return a waitcode.
822 :     #
823 :    
824 :     my $ret;
825 :    
826 :     push(@times, time, 'got work');
827 :     for my $job_id (sort keys %work)
828 :     {
829 :     my($cluster_hash, $noncluster_avail) = @{$work{$job_id}};
830 :    
831 :     #
832 :     # Only assign cluster work if there is noncluster work available to be worked on.
833 :     #
834 :     if ($cluster_hash->{+AVAIL} > 0 and
835 :     $noncluster_avail)
836 :     {
837 :     warn"ASSIGN cluster work for job $job_id\n";
838 :     $ret = $self->assign_cluster_work($job_id, $cluster_id, $worker_id);
839 :     last;
840 :     }
841 :     elsif ($cluster_hash->{+AVAIL} == 0 and
842 :     $cluster_hash->{+TAKEN} == 0 and
843 :     $noncluster_avail)
844 :     {
845 :     warn "ASSIGN noncluster for job $job_id\n";
846 :     $ret = $self->assign_noncluster_work($job_id, $worker_id);
847 :     last;
848 :     }
849 :     elsif (!defined($cluster_hash) and $noncluster_avail)
850 :     {
851 :     warn "ASSIGN noncluster (no cluster work) for job $job_id\n";
852 :     $ret = $self->assign_noncluster_work($job_id, $worker_id);
853 :     last;
854 :     }
855 :     }
856 :     push(@times, time, 'assigned');
857 :    
858 :     my $last = shift(@times);
859 :     shift(@times);
860 :     while (@times)
861 :     {
862 :     my($t, $tag) = splice(@times, 0, 2);
863 :     my $elap = 1000 * ($t - $last);
864 :     warn sprintf "Elap $tag: %.2f ms\n", $elap;
865 :     }
866 :    
867 :    
868 :     if (!$ret)
869 :     {
870 :     warn "ASSIGN waitcode\n";
871 :     $ret = $self->assign_waitcode();
872 :     }
873 :    
874 : olson 1.8 $self->unlock_tables(@tables);
875 :     $self->{dbh}->commit();
876 :    
877 : olson 1.6 return $ret;
878 :     }
879 :    
880 : olson 1.7 #
881 :     # Return a set of handles that should be used for uploading computation results.
882 :     #
883 :    
884 :     sub get_upload_handles
885 :     {
886 :     my($self, $job_id, $work_id, $worker_id, $filenames) = @_;
887 :    
888 : olson 1.8 $self->worker_alive($worker_id);
889 : olson 1.7 my $stage_url = new URI($self->{fig}->cgi_url());
890 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
891 :    
892 :     my $ret = {};
893 :     for my $name (@$filenames)
894 :     {
895 :     $stage_url->query_form(work_id => $work_id,
896 :     job_id => $job_id,
897 :     filename => $name);
898 :    
899 :     $ret->{$name} = $stage_url->as_string();
900 :     }
901 :     return $ret;
902 :     }
903 :    
904 :     #
905 : olson 1.8 # Heartbeat.
906 :     #
907 :    
908 :     sub worker_alive
909 :     {
910 :     my($self, $worker_id) = @_;
911 :    
912 :     $self->{db}->SQL("update js_worker set last_heartbeat = NOW() where id = ?",
913 :     undef, $worker_id);
914 :     }
915 :    
916 :     #
917 : olson 1.7 # Mark this work done.
918 :     #
919 :    
920 :     sub work_done
921 :     {
922 :     my($self, $job_id, $work_id, $worker_id, $output) = @_;
923 :    
924 : olson 1.8 $self->worker_alive($worker_id);
925 : olson 1.7 #
926 :     # Find the exec entry for this work. We also determine here
927 :     # whether this is a cluster or noncluster piece of work.
928 :     #
929 :    
930 : olson 1.8 local $self->{dbh}->{AutoCommit} = 0;
931 :     $self->lock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
932 : olson 1.7
933 :     my $ncw = $self->{db}->SQL(qq(SELECT status, active_exec_id
934 :     FROM js_work
935 :     WHERE id = ? and job_id = ?),
936 :     undef, $work_id, $job_id);
937 :    
938 :     if (@$ncw == 1)
939 :     {
940 :     my($status, $exec) = @{$ncw->[0]};
941 :    
942 :     warn "noncluster done: status=$status exec=$exec output=$output\n";
943 :    
944 : olson 1.8
945 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
946 :     $self->{dbh}->commit();
947 :    
948 :    
949 :     return $self->work_done_noncluster(1, $job_id, $work_id, $worker_id, $output, $status, $exec);
950 : olson 1.7 }
951 :    
952 :     my $cw = $self->{db}->SQL(qq(SELECT status, active_exec_id, must_execute_on_cluster
953 :     FROM js_cluster_work
954 :     WHERE id = ? and job_id = ?),
955 :     undef, $work_id, $job_id);
956 :    
957 :     if (@$cw == 1)
958 :     {
959 :     my($status, $exec, $cluster) = @{$cw->[0]};
960 :    
961 :     warn "cluster done: status=$status exec=$exec cluster=$cluster\n";
962 :    
963 : olson 1.8 $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
964 :     $self->{dbh}->commit();
965 :     return $self->work_done_cluster(1, $job_id, $work_id, $worker_id, $output, $status, $exec, $cluster);
966 : olson 1.7 }
967 :    
968 : olson 1.8 $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
969 :     $self->{dbh}->commit();
970 :    
971 :     die "Could not find work entries";
972 :     }
973 :     #
974 :     # Mark this work as failed.
975 :     #
976 :    
977 :     sub work_failed
978 :     {
979 :     my($self, $job_id, $work_id, $worker_id, $output) = @_;
980 :    
981 :     $self->worker_alive($worker_id);
982 :    
983 :     #
984 :     # Find the exec entry for this work. We also determine here
985 :     # whether this is a cluster or noncluster piece of work.
986 :     #
987 :    
988 :     local $self->{dbh}->{AutoCommit} = 0;
989 :     $self->lock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
990 :    
991 :     my $ncw = $self->{db}->SQL(qq(SELECT status, active_exec_id
992 :     FROM js_work
993 :     WHERE id = ? and job_id = ?),
994 :     undef, $work_id, $job_id);
995 :    
996 :     if (@$ncw == 1)
997 :     {
998 :     my($status, $exec) = @{$ncw->[0]};
999 :    
1000 :     warn "noncluster failed: status=$status exec=$exec output=$output\n";
1001 :    
1002 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1003 :     $self->{dbh}->commit();
1004 :     return $self->work_done_noncluster(0, $job_id, $work_id, $worker_id, $output, $status, $exec);
1005 :     }
1006 :    
1007 :     my $cw = $self->{db}->SQL(qq(SELECT status, active_exec_id, must_execute_on_cluster
1008 :     FROM js_cluster_work
1009 :     WHERE id = ? and job_id = ?),
1010 :     undef, $work_id, $job_id);
1011 :    
1012 :     if (@$cw == 1)
1013 :     {
1014 :     my($status, $exec, $cluster) = @{$cw->[0]};
1015 :    
1016 :     warn "cluster failed: status=$status exec=$exec cluster=$cluster\n";
1017 :    
1018 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1019 :     $self->{dbh}->commit();
1020 :    
1021 :     return $self->work_done_cluster(0, $job_id, $work_id, $worker_id, $output, $status, $exec, $cluster);
1022 :     }
1023 :    
1024 :     $self->unlock_tables(qw(js_work js_cluster_work js_exec js_cluster_exec));
1025 :     $self->{dbh}->commit();
1026 : olson 1.7 die "Could not find work entries";
1027 :     }
1028 :    
1029 :     #
1030 :     # Noncluster work is complete. Mark the exec record, then mark the work record.
1031 :     #
1032 :    
1033 :     sub work_done_noncluster
1034 :     {
1035 : olson 1.8 my($self, $success, $job_id, $work_id, $worker_id, $output, $old_status, $exec) = @_;
1036 : olson 1.7
1037 :     my $db = $self->{db};
1038 :    
1039 : olson 1.8
1040 : olson 1.7 $db->SQL(qq(UPDATE js_exec
1041 :     SET status = ?, job_finished = NOW()
1042 : olson 1.8 WHERE id = ?),
1043 :     undef,
1044 :     $success ? DONE : FAILED,
1045 :     $exec);
1046 : olson 1.7
1047 :     $db->SQL(qq(UPDATE js_work
1048 :     SET status = ?, output = ?
1049 : olson 1.8 WHERE id = ?),
1050 :     undef,
1051 :     $success ? DONE : AVAIL,
1052 :     $output,
1053 :     $work_id);
1054 : olson 1.7
1055 :     return 1;
1056 :     }
1057 :    
1058 :    
1059 :     sub work_done_cluster
1060 :     {
1061 : olson 1.8 my($self, $success, $job_id, $work_id, $worker_id, $old_status, $output, $exec, $cluster) = @_;
1062 : olson 1.7
1063 :     my $db = $self->{db};
1064 :    
1065 :     $db->SQL(qq(UPDATE js_cluster_exec
1066 :     SET status = ?, job_finished = NOW()
1067 : olson 1.8 WHERE id = ?),
1068 :     undef,
1069 :     $success ? DONE : FAILED,
1070 :     $exec);
1071 : olson 1.7
1072 :     $db->SQL(qq(UPDATE js_cluster_work
1073 :     SET status = ?
1074 : olson 1.8 WHERE id = ?),
1075 :     undef,
1076 :     $success ? DONE : AVAIL,
1077 :     $work_id);
1078 : olson 1.7
1079 :     }
1080 : olson 1.6
1081 :     ###############################
1082 :     #
1083 :     # Job setup code.
1084 :     #
1085 :     #
1086 :    
1087 :    
1088 :     =head1 setup_sim_job
1089 :    
1090 :     Set up for a new similarity computation.
1091 :    
1092 :     We are given a NR filename, a fasta input filename, a chunk size,
1093 :     and an optional BLAST threshhold.
1094 :    
1095 :     We create a new sim_job record for this job, and create a spool directory
1096 :     for it. The NR and fasta input are copied to the spool directory.
1097 :    
1098 :     The fasta is carved up into chunk_size blocks of sequences. js_work and js_sim_work
1099 :     records are created for each of the blocks.
1100 :    
1101 :     =cut
1102 :    
1103 :     sub setup_sim_job
1104 :     {
1105 :     my($self, $nr, $input, $chunk_size, $thresh) = @_;
1106 :    
1107 :     if (!defined($thresh))
1108 :     {
1109 :     $thresh = 1.0e-5;
1110 :     }
1111 :    
1112 :     &FIG::verify_dir($cluster_spool);
1113 :    
1114 :     #
1115 :     # Find the job type for sims.
1116 :     #
1117 :    
1118 :     my $sim_job_type;
1119 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'sim'");
1120 :     if ($res and @$res > 0)
1121 :     {
1122 :     $sim_job_type = $res->[0]->[0];
1123 :     }
1124 :     else
1125 :     {
1126 :     die "Cannot determine job type for sim jobs.";
1127 :     }
1128 :    
1129 :     #
1130 :     # Create the job record for this run.
1131 :     #
1132 :    
1133 : olson 1.7 #
1134 :     # Feh.
1135 :     #
1136 :    
1137 :     my $sth;
1138 :     if ($self->{dbms} eq "Pg")
1139 :     {
1140 :     $sth = $self->{dbh}->prepare("insert into js_job default values ");
1141 :     }
1142 :     else
1143 :     {
1144 :     $sth = $self->{dbh}->prepare("insert into js_job () values ()");
1145 :     }
1146 :    
1147 : olson 1.6 $sth->execute();
1148 :     my $id = $self->get_inserted_id('js_job', $sth);
1149 :    
1150 : olson 1.7 warn "Got new id $id\n";
1151 : olson 1.6
1152 :     my $sth = $self->{dbh}->prepare("insert into js_job_sim (id, chunk_size, thresh) values (?, ?, ?)");
1153 :     $sth->execute($id, $chunk_size, $thresh);
1154 :    
1155 :     #
1156 :     # Create the spool directory.
1157 :     #
1158 :    
1159 :     my $spool = "$cluster_spool/sim_$id";
1160 :    
1161 :     &FIG::verify_dir($spool);
1162 :    
1163 :     my $nr_file = "$spool/nr";
1164 :     my $input_file = "$spool/fasta";
1165 :     my $output_dir = "$spool/out";
1166 :     &FIG::verify_dir($output_dir);
1167 :    
1168 : olson 1.8 $nr = abs_path($nr);
1169 :     $input = abs_path($input);
1170 :    
1171 : olson 1.6 #
1172 :     # for now, symlink so we don't have to wait on copy.
1173 :     #
1174 :    
1175 :     if (-f $nr_file)
1176 :     {
1177 :     unlink($nr_file) or die "Could not remove old nr file $nr_file: $!\n";
1178 :     }
1179 :    
1180 :     if (-s $nr > 100000)
1181 :     {
1182 :     symlink($nr, $nr_file);
1183 :     }
1184 :     else
1185 :     {
1186 :     copy($nr, $nr_file);
1187 :     }
1188 :    
1189 :     if (-f $input_file)
1190 :     {
1191 :     unlink($input_file) or die "Could not remove old input file $input_file: $!\n";
1192 :     }
1193 :    
1194 :     if (-s $input > 100000)
1195 :     {
1196 :     symlink($input, $input_file);
1197 :     }
1198 :     else
1199 :     {
1200 :     copy($input, $input_file);
1201 :     }
1202 :    
1203 :     $self->{db}->SQL("update js_job_sim set nr_path = ?, input_path = ?, output_path = ? where id = ?",
1204 :     undef, $nr_file, $input_file, $output_dir, $id);
1205 :    
1206 :     $self->add_nr_input_file($id, $nr_file);
1207 :     $self->add_input_file($id, $input_file);
1208 :     #
1209 :     # We know enough now to chunk up the job and create the work entries.
1210 :     #
1211 :    
1212 :     open(my $fasta_fh, "<$input_file");
1213 :    
1214 :     local($/) = "\n>";
1215 :    
1216 :     sub add_work_chunk
1217 :     {
1218 :     my($chunk) = @_;
1219 :    
1220 :     my $chunk_txt = join("\n", @$chunk) . "\n";
1221 :    
1222 : olson 1.7 my $work_id = $self->get_work_id();
1223 :    
1224 :     my $sth = $self->{dbh}->prepare("insert into js_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1225 :     $sth->execute($work_id, $id, $sim_job_type, AVAIL);
1226 : olson 1.6 # print "Created work $work_id\n";
1227 :    
1228 :     $self->{db}->SQL("insert into js_sim_work values (?, ?)", undef, $work_id, $chunk_txt);
1229 :     }
1230 :    
1231 :     my @cur_chunk;
1232 :    
1233 :     while (<$fasta_fh>)
1234 :     {
1235 :     chomp;
1236 :    
1237 :     #
1238 :     # Zorch the leading > we get on the first line.
1239 :     #
1240 :    
1241 :     s/^>//g;
1242 :    
1243 :     #
1244 :     # And add it back; the chomp removes it.
1245 :     #
1246 :     push(@cur_chunk, ">$_");
1247 :    
1248 :     if (@cur_chunk == $chunk_size)
1249 :     {
1250 :     add_work_chunk(\@cur_chunk);
1251 :     @cur_chunk = ();
1252 :     }
1253 :    
1254 :     }
1255 :     if (@cur_chunk > 0)
1256 :     {
1257 :     add_work_chunk(\@cur_chunk);
1258 :     }
1259 :     close($fasta_fh);
1260 :    
1261 :     #
1262 :     # If we know of any clusters, we need to create the cluster-work entris
1263 :     # for this job.
1264 :     #
1265 :    
1266 :     $self->create_cluster_work_entries_for_job($id, $sim_job_type);
1267 :    
1268 :     }
1269 :    
1270 :    
1271 :    
1272 :     ###############################
1273 :     #
1274 :     # Utilities.
1275 :     #
1276 :     #
1277 :    
1278 :     sub assign_waitcode
1279 :     {
1280 :     my($self) = @_;
1281 : olson 1.7
1282 :     return {
1283 :     work_name => "wait",
1284 :     job_specific => {}
1285 :     };
1286 : olson 1.6 }
1287 :    
1288 :     =head1 assign_cluster_work
1289 :    
1290 :     Assign a piece of cluster work from job $job_id to worker $worker_id.
1291 :    
1292 :     =cut
1293 :    
1294 :     sub assign_cluster_work
1295 :     {
1296 :     my($self, $job_id, $cluster_id, $worker_id) = @_;
1297 :    
1298 :     my $res = $self->{db}->SQL(qq(SELECT w.id, w.work_type, n.name, w.work_derived_from
1299 :     FROM js_cluster_work w, js_work_type n
1300 :     WHERE
1301 :     w.status = ? AND
1302 :     w.work_type = n.id AND
1303 :     w.job_id = ? AND
1304 :     w.must_execute_on_cluster = ?
1305 :     ORDER BY w.id
1306 :     LIMIT 1
1307 :     ), undef, AVAIL, $job_id, $cluster_id);
1308 :     if (not $res or @$res == 0)
1309 :     {
1310 :     die "assign_cluster_work: work lookup failed\n";
1311 :     }
1312 :     my($work_id, $work_type, $work_name, $derived_from) = @{$res->[0]};
1313 :    
1314 :     #
1315 :     # Create an execution record for this assignment.
1316 :     #
1317 :    
1318 :     my $sth = $self->{dbh}->prepare(qq(INSERT INTO js_cluster_exec (work_id, cluster_id, worker_id,
1319 :     status, job_taken)
1320 :     VALUES (?, ?, ?, ?, NOW())));
1321 :     $sth->execute($work_id, $cluster_id, $worker_id, TAKEN);
1322 :     my $exec_id = $self->get_inserted_id('js_cluster_exec', $sth);
1323 :    
1324 :     #
1325 :     # Now update the work record.
1326 :     #
1327 :    
1328 :     $self->{db}->SQL(qq(UPDATE js_cluster_work
1329 :     SET status = ?, active_exec_id = ?
1330 :     WHERE id = ?), undef, TAKEN, $exec_id, $work_id);
1331 :    
1332 :     return $self->construct_work_return($job_id, $worker_id, $work_id, $derived_from,
1333 :     $exec_id, $work_type, $work_name);
1334 :     }
1335 :    
1336 :    
1337 :     sub assign_noncluster_work
1338 :     {
1339 :     my($self, $job_id, $worker_id) = @_;
1340 :    
1341 :     my $res = $self->{db}->SQL(qq(SELECT w.id, w.work_type, n.name
1342 :     FROM js_work w, js_work_type n
1343 :     WHERE
1344 :     w.status = ? AND
1345 :     w.work_type = n.id AND
1346 :     w.job_id = ?
1347 :     ORDER BY w.id
1348 :     LIMIT 1
1349 :     ), undef, AVAIL, $job_id);
1350 :    
1351 :     if (not $res or @$res == 0)
1352 :     {
1353 :     die "assign_cluster_work: work lookup failed\n";
1354 :     }
1355 :     my($work_id, $work_type, $work_name) = @{$res->[0]};
1356 :    
1357 :     #
1358 :     # Create an execution record for this assignment.
1359 :     #
1360 :    
1361 :     my $sth = $self->{dbh}->prepare(qq(INSERT INTO js_exec (work_id, worker_id, status, job_taken)
1362 :     VALUES (?, ?, ?, NOW())));
1363 :     $sth->execute($work_id, $worker_id, TAKEN);
1364 :     my $exec_id = $self->get_inserted_id('js_exec', $sth);
1365 :    
1366 :     #
1367 :     # Now update the work record.
1368 :     #
1369 :    
1370 :     $self->{db}->SQL(qq(UPDATE js_work
1371 :     SET status = ?, active_exec_id = ?
1372 :     WHERE id = ?), undef, TAKEN, $exec_id, $work_id);
1373 :    
1374 :     return $self->construct_work_return($job_id, $worker_id, $work_id, $work_id,
1375 :     $exec_id, $work_type, $work_name);
1376 :     }
1377 :    
1378 :     =head1
1379 :    
1380 :     Construct the work assignment to be returned from a get_work
1381 :     request. This routine is given all of the particulars for a pice
1382 :     of work, including the name of the worktype. We attempt to create
1383 :     the return by invoking $self->constuct_work_for_TYPE.
1384 :    
1385 :     (Better design likely needed for this, but this is proof of principle code.)
1386 :    
1387 :     $actual_work_id is the work_id that has the type-specific work attached. This
1388 :     will be different than $work_id in the case of cluster work, where a base
1389 :     piece of work is snapshotted for each cluster; the type-specific work information
1390 :     remains attached to the base work.
1391 :    
1392 :     =cut
1393 :    
1394 :     sub construct_work_return
1395 :     {
1396 :     my($self, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id, $work_type, $work_name) = @_;
1397 :    
1398 :     #
1399 :     # Construct the return struct. It has the following fields at all times;
1400 :     # per-work-type methods are allowed to add/modify as desired.
1401 :     #
1402 :     my $ret = {
1403 :     job_id => $job_id,
1404 :     worker_id => $worker_id,
1405 :     work_id => $work_id,
1406 :     exec_id => $exec_id,
1407 :     work_name => $work_name,
1408 :     job_specific => {},
1409 :     };
1410 :    
1411 :     my $ok = eval {
1412 :     $work_name =~ /^\w+$/ or die "Invalid work_name $work_name";
1413 :     my $method = "construct_work_for_$work_name";
1414 :     $self->$method($ret->{job_specific}, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id);
1415 :     };
1416 :    
1417 :     if (!$ok)
1418 :     {
1419 :     warn "construct_work_return: work-specific construction for $work_name failed: $@";
1420 :     }
1421 :    
1422 :     return $ret;
1423 :     }
1424 :    
1425 :     sub construct_work_for_stage_nr
1426 :     {
1427 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1428 :    
1429 : olson 1.7 #
1430 :     # For staging, we return the URL by which the client will retrieve the file.
1431 :     #
1432 :    
1433 :     my $stage_url = new URI($self->{fig}->cgi_url());
1434 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
1435 :     $stage_url->query_form(work_id => $actual_work_id,
1436 :     job_id => $job_id);
1437 :    
1438 :     my $res = $self->{dbh}->selectcol_arrayref("select path from js_stage_nr_work where id = ?",
1439 :     undef, $actual_work_id);
1440 :    
1441 :     $return_struct->{file} = basename($res->[0]);
1442 :     $return_struct->{url} = $stage_url->as_string();
1443 : olson 1.6 }
1444 :    
1445 :     sub construct_work_for_stage
1446 :     {
1447 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1448 :    
1449 :     #
1450 :     # For staging, we return the URL by which the client will retrieve the file.
1451 :     #
1452 :    
1453 :     my $stage_url = new URI($self->{fig}->cgi_url());
1454 :     $stage_url->path_segments($stage_url->path_segments(), "cluster_stage.cgi");
1455 :     $stage_url->query_form(work_id => $actual_work_id,
1456 :     job_id => $job_id);
1457 :    
1458 : olson 1.7 my $res = $self->{dbh}->selectcol_arrayref("select path from js_stage_work where id = ?",
1459 :     undef, $actual_work_id);
1460 : olson 1.6
1461 : olson 1.7 $return_struct->{file} = basename($res->[0]);
1462 : olson 1.6 $return_struct->{url} = $stage_url->as_string();
1463 :    
1464 :     }
1465 :    
1466 :     =head1 construct_work_for_sim
1467 :    
1468 :     Job-specific work return method.
1469 :    
1470 :     =cut
1471 :    
1472 :     sub construct_work_for_sim
1473 :     {
1474 :     my($self, $return_struct, $job_id, $worker_id, $work_id, $actual_work_id, $exec_id) = @_;
1475 :    
1476 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT input_seq
1477 :     FROM js_sim_work
1478 :     WHERE id = ?), undef, $actual_work_id);
1479 :     if (not $out or @$out != 1)
1480 :     {
1481 :     die "construct_work_for_sim: query did not return expected results for actual_work_id=$actual_work_id";
1482 :     }
1483 :    
1484 :     $return_struct->{input_seq} = $out->[0];
1485 :    
1486 :     #
1487 :     # We also lookup the BLAST threshhold information from the job.
1488 :     #
1489 :    
1490 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT thresh
1491 :     FROM js_job_sim
1492 :     WHERE id = ?), undef, $job_id);
1493 :     if (not $out or @$out != 1)
1494 :     {
1495 :     die "construct_work_for_sim: job query did not return expected results for job_id=$job_id";
1496 :     }
1497 :    
1498 :     $return_struct->{blast_thresh} = $out->[0];
1499 :     }
1500 :    
1501 :     =head1 open_staging_file
1502 :    
1503 :     Open a filehandle to the file we are staging to a client.
1504 :    
1505 :     We are given the job_id and work_id that define the file. If there are any
1506 :     problems, return undef (or die if there is an error message).
1507 :    
1508 :     =cut
1509 :    
1510 :     sub open_staging_file
1511 :     {
1512 :     my($self, $job_id, $work_id) = @_;
1513 :    
1514 :     #
1515 :     # Determine what kind of staging this was.
1516 :     #
1517 :    
1518 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT wt.name
1519 :     FROM js_cluster_work w, js_work_type wt
1520 :     WHERE wt.id = w.work_type AND
1521 :     w.job_id = ? AND
1522 :     w.id = ?),
1523 :     undef, $job_id, $work_id);
1524 :     $out and @$out == 1 or
1525 :     die "open_staging_file: job query did not return expected results for work_id=$work_id";
1526 :    
1527 :     my $work_type = $out->[0];
1528 :    
1529 :     my $out = $self->{dbh}->selectcol_arrayref(qq(SELECT path
1530 :     FROM js_${work_type}_work
1531 : olson 1.7 WHERE id = ? ), undef, $work_id);
1532 : olson 1.6 if (not $out or @$out != 1)
1533 :     {
1534 :     die "open_staging_file: job query did not return expected results for work_id=$work_id";
1535 :     }
1536 :    
1537 :     my $fh;
1538 :     my $size;
1539 :     my $file = $out->[0];
1540 :    
1541 :     $size = -s $file;
1542 :    
1543 :     if (open($fh, "<$file"))
1544 :     {
1545 :     return ($fh, $size, basename($file));
1546 :     }
1547 :     else
1548 :     {
1549 :     die "open_staging_file: could not open file: $!";
1550 :     }
1551 :    
1552 :     }
1553 :    
1554 : olson 1.7 =head1 open_output_file
1555 :    
1556 :     Open a filehandle to the file we are writing as output from a worker.
1557 :    
1558 :     We are given the job_id and work_id that define the file. If there are any
1559 :     problems, return undef (or die if there is an error message).
1560 :    
1561 :     =cut
1562 :    
1563 :     sub open_output_file
1564 :     {
1565 :     my($self, $job_id, $work_id, $filename) = @_;
1566 :    
1567 :     my $job_dir = "$cluster_spool/job_$job_id";
1568 :     &FIG::verify_dir($job_dir);
1569 :     my $work_dir = "$job_dir/work_$work_id";
1570 :     &FIG::verify_dir($work_dir);
1571 :    
1572 :     my $local_path = "$work_dir/" . basename($filename);
1573 :    
1574 :     my $fh;
1575 :     if (open($fh, ">$local_path"))
1576 :     {
1577 :     return $fh;
1578 :     }
1579 :     else
1580 :     {
1581 :     die "Cannot open $local_path: $!";
1582 :     }
1583 :     }
1584 :    
1585 : olson 1.6 =head1 add_input_file
1586 :    
1587 :     Add a file to the set of files to be staged for input to a job. Returns the file id.
1588 :    
1589 :     =cut
1590 :    
1591 :     sub add_input_file
1592 :     {
1593 :     my($self, $job_id, $path) = @_;
1594 :    
1595 :     my $stage_job_type;
1596 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'stage'");
1597 :     if ($res and @$res > 0)
1598 :     {
1599 :     $stage_job_type = $res->[0]->[0];
1600 :     }
1601 :     else
1602 :     {
1603 :     die "Cannot determine job type for sim jobs.";
1604 :     }
1605 :    
1606 : olson 1.7 my $work_id = $self->get_work_id();
1607 :    
1608 :     my $sth = $self->{dbh}->prepare("insert into js_cluster_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1609 : olson 1.6
1610 : olson 1.7 $sth->execute($work_id, $job_id, $stage_job_type, AVAIL);
1611 : olson 1.6
1612 :     $self->{db}->SQL("insert into js_stage_work values (?, ?)", undef, $work_id, $path);
1613 :     }
1614 :    
1615 :     sub add_nr_input_file
1616 :     {
1617 :     my($self, $job_id, $path) = @_;
1618 :    
1619 :     my $stage_job_type;
1620 :     my $res = $self->{db}->SQL("select id from js_work_type where name = 'stage_nr'");
1621 :     if ($res and @$res > 0)
1622 :     {
1623 :     $stage_job_type = $res->[0]->[0];
1624 :     }
1625 :     else
1626 :     {
1627 :     die "Cannot determine job type for sim jobs.";
1628 :     }
1629 :    
1630 : olson 1.7 my $work_id = $self->get_work_id();
1631 :    
1632 :     my $sth = $self->{dbh}->prepare("insert into js_cluster_work (id, job_id, work_type, status) values (?, ?, ?, ?)");
1633 : olson 1.6
1634 : olson 1.7 $sth->execute($work_id, $job_id, $stage_job_type, AVAIL);
1635 : olson 1.6
1636 :     $self->{db}->SQL("insert into js_stage_nr_work values (?, ?)", undef, $work_id, $path);
1637 :     }
1638 :    
1639 : olson 1.7 sub get_work_id
1640 :     {
1641 :     my($self) = @_;
1642 :     my $sth;
1643 :     if ($self->{dbms} eq "Pg")
1644 :     {
1645 :     $sth = $self->{dbh}->prepare("insert into js_work_id default values ");
1646 :     }
1647 :     else
1648 :     {
1649 :     $sth = $self->{dbh}->prepare("insert into js_work_id () values ()");
1650 :     }
1651 :     $sth->execute();
1652 :     my $work_id = $self->get_inserted_id('js_work_id', $sth);
1653 :     return $work_id;
1654 :     }
1655 :    
1656 :    
1657 : olson 1.6 sub lock_tables
1658 :     {
1659 :     my($self, @tables) = @_;
1660 : olson 1.8
1661 :     if ($self->{dbms} eq "Pg")
1662 :     {
1663 :     $self->{db}->SQL("LOCK TABLE " . join(", ", @tables));
1664 :     }
1665 :     else
1666 :     {
1667 :     }
1668 : olson 1.6 }
1669 :    
1670 :     sub unlock_tables
1671 :     {
1672 :     my($self, @tables) = @_;
1673 : olson 1.8
1674 :     if ($self->{dbms} eq "Pg")
1675 :     {
1676 :    
1677 :     }
1678 :     else
1679 :     {
1680 :     }
1681 : olson 1.6 }
1682 :    
1683 : olson 1.2 sub get_inserted_id
1684 :     {
1685 : olson 1.5 my($self, $table, $sth) = @_;
1686 : olson 1.2 if ($self->{dbms} eq "Pg")
1687 :     {
1688 : olson 1.4 my $oid = $sth->{pg_oid_status};
1689 :     my $ret = $self->{db}->SQL("select id from $table where oid = ?", undef, $oid);
1690 : olson 1.2 return $ret->[0]->[0];
1691 :     }
1692 :     elsif ($self->{dbms} eq "mysql")
1693 :     {
1694 :     my $id = $self->{dbh}->{mysql_insertid};
1695 : olson 1.6 # print "mysql got $id\n";
1696 : olson 1.2 return $id;
1697 :     }
1698 :     }
1699 :    
1700 : olson 1.6 sub get_worker_info
1701 :     {
1702 :     my($self, $id) = @_;
1703 :    
1704 :     my $res = $self->{dbh}->selectall_hashref("select * from js_worker where id = ?",
1705 :     'id', undef, $id);
1706 :     return $res->{$id};
1707 :     }
1708 :    
1709 :    
1710 :    
1711 : olson 1.1 1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3