[Bio] / FortyEight / imp_check_tl_sim_status.pl Repository:
ViewVC logotype

Annotation of /FortyEight/imp_check_tl_sim_status.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (view) (download) (as text)

1 : olson 1.1 #
2 :     # Check the status of the timelogic sims computation.
3 :     #
4 :     # We maintain a hash in the sims job dir that is a helper for looking
5 :     # up the various pieces of data. These include
6 :     #
7 :     # 'jobid',taskid: timelogic job identifier
8 :     # 'status',taskid: current job status (unknown, queued, running, done, error).
9 :     #
10 :     # Job status is marked as done when the TL has finished, and we have performed
11 :     # the first stage of postprocessing and installed the postprocessed data
12 :     # into the job directory.
13 :     #
14 :     # This assumes we are running ON the timelogic machine. In the MCS installation
15 :     # this is acheived by submission of this job to the timelogic queue.
16 :     #
17 :     # For each job that we determine has completed since the last time we ran, we
18 :     # perform the per-sim-file processing that includes
19 :     #
20 :     # reformat_timelogic_sims Flip format to SEED style
21 :     # reduce_sims peg.syn hits Reduce to <hits> sims per id.
22 :     # reformat_sims NR.btree Add length fields, get rid of "garbage" sims
23 :     #
24 :     # We write the processed sim files into sims.job/sims.proc/proc.taskid
25 :     #
26 :     # We also copy the raw sim files into sims.job/sims.raw/raw.taskid
27 :     #
28 :    
29 :     use strict;
30 :     use FIG;
31 :     use FIG_Config;
32 :     use File::Basename;
33 :     use ImportJob;
34 :     use GenomeMeta;
35 :     use JobStage;
36 :     use DB_File;
37 :    
38 :     @ARGV == 1 or die "Usage: $0 job-dir\n";
39 :    
40 :     my $max_hits = 300;
41 :    
42 :     my $jobdir = shift;
43 :    
44 :     -d $jobdir or die "$0: job dir $jobdir does not exist\n";
45 :    
46 :     my $stage = new JobStage('ImportJob', 'check_tl_sim_status', $jobdir);
47 :    
48 :     $stage or die "$0: Could not create job object";
49 :    
50 :     $stage->log("Running on " . $stage->hostname);
51 :    
52 :     $stage->set_status("running");
53 :     $stage->set_running("yes");
54 :    
55 :     $stage->set_qualified_metadata("host", $stage->hostname);
56 :    
57 :     my $simdir = "$jobdir/sims.job";
58 :    
59 :     my $rawdir = "$simdir/sims.raw";
60 :     &FIG::verify_dir($rawdir);
61 :    
62 :     my $procdir = "$simdir/sims.proc";
63 :     &FIG::verify_dir($procdir);
64 :    
65 :     my $status_file = "$simdir/status.btree";
66 :     my %status;
67 :     my $status_tie;
68 :     if (-f $status_file)
69 :     {
70 :     $status_tie = tie %status, 'DB_File', $status_file, O_RDWR, 0, $DB_BTREE;
71 :     $status_tie or $stage->fatal("Cannot open status file $status_file: $!");
72 :     }
73 :     else
74 :     {
75 :     $status_tie = tie %status, 'DB_File', $status_file, O_RDWR | O_CREAT, 0777, $DB_BTREE;
76 :     $status_tie or $stage->fatal("Cannot create status file $status_file: $!");
77 :     }
78 :    
79 :     #
80 :     # Walk the task list, checking for tasks that
81 :     # a. have no timelogic identifier
82 :     # b. have a job status of unknown, queued, or busy.
83 :     #
84 :    
85 :     #
86 :     # Scan the search dir. we need this sooner or later.
87 :     #
88 :     # construct $search_status{id} => [(queued|running|done), ctl-filename]]
89 :     #
90 :     # ID here is the timelogic job identifier.
91 :     #
92 :    
93 :     my(%search_status, @ctl_files, %ids_of_type );
94 :     opendir(D, "/decypher/search") or $stage->fatal("cannot opendir /decypher/search: $!");
95 :    
96 :     while (my $f = readdir(D))
97 :     {
98 :     my $path = "/decypher/search/$f";
99 :     next unless -f $path;
100 :     next unless $f =~ /^(.*)\.(dne|bsy|tl.*)$/;
101 :     my $tlid = $1;
102 :     my $suffix = $2;
103 :    
104 :     my $status;
105 :     if ($suffix eq 'dne')
106 :     {
107 :     $status = "done";
108 :     }
109 :     elsif ($suffix eq 'bsy')
110 :     {
111 :     $status = "running";
112 :     }
113 :     else
114 :     {
115 :     $status = "queued";
116 :     }
117 :     push(@{$ids_of_type{$status}}, $tlid);
118 :    
119 :     $search_status{$tlid} = [$status, $f];
120 :     }
121 :     closedir(D);
122 :    
123 :     $status_tie->sync();
124 :    
125 :     open(TASK, "<$simdir/task.list") or $stage->fatal("Cannot open task list $simdir/task.list: $!");
126 :    
127 :     my $tasks_done_count = 0;
128 :     my $tasks_error_count = 0;
129 :     my $tasks_count= 0;
130 :    
131 :     while (<TASK>)
132 :     {
133 :     chomp;
134 :     my($id, $in, $nr, $args, $out, $err) = split(/\t/);
135 :    
136 :     $tasks_count++;
137 :    
138 :     #
139 :     # See if this task is already done.
140 :     #
141 :    
142 :     my $task_status = $status{'status', $id};
143 :     if ($task_status eq 'done')
144 :     {
145 :     $tasks_done_count++;
146 :     next;
147 :     }
148 :     elsif ($task_status eq 'error')
149 :     {
150 :     print "Task $id is in error\n";
151 :     $tasks_error_count++;
152 :     next;
153 :     }
154 :    
155 :     my $tl_id = $status{'jobid', $id};
156 :     if (!defined($tl_id))
157 :     {
158 :     $tl_id = find_tl_id($id, $in, \%status);
159 :     $status{'jobid', $id} = $tl_id;
160 :     $status_tie->sync();
161 :     }
162 :    
163 :     my $search_rec = $search_status{$tl_id};
164 :     if (!$search_rec)
165 :     {
166 :     warn "Could not find search status for task $id $tl_id\n";
167 :     next;
168 :     }
169 :    
170 :     my($search_status, $ctl_file) = @$search_rec;
171 :     print "$id $tl_id Search status=$search_status\n";
172 :    
173 :     #
174 :     # If this task is now done, do postprocessing
175 :     #
176 :     if ($search_status eq 'done')
177 :     {
178 :     eval {
179 :     postprocess_task($id, $tl_id, $out);
180 :     };
181 :     if ($@)
182 :     {
183 :     print "postproc for $id gets error: '$@'\n";
184 :     $stage->warning("Postprocess failed for id=$id tl_id=$tl_id: $@");
185 :     $tasks_error_count++;
186 :     $status{'status', $id} = 'error';
187 :     }
188 :     else
189 :     {
190 :     $tasks_done_count++;
191 :     $status{'status', $id} = 'done';
192 :     }
193 :     $status_tie->sync();
194 :     }
195 :     }
196 :    
197 :     if (($tasks_done_count + $tasks_error_count) >= $tasks_count)
198 :     {
199 :     if ($tasks_error_count > 0)
200 :     {
201 :     $stage->fatal("$tasks_error_count tasks returned with fatal errors");
202 :     }
203 :     else
204 :     {
205 :     $stage->log("completed");
206 :     $stage->set_running("no");
207 :     $stage->set_status("complete");
208 :     }
209 :     }
210 :    
211 :     sub postprocess_task
212 :     {
213 :     my($id, $tl_id, $raw_out) = @_;
214 :    
215 :     my $tl_file = "/decypher/output/$tl_id.out";
216 :     open(TL, "<$tl_file") or die "Cannot open TL sims file $tl_file: $!";
217 :    
218 :     my $pegsyn = "$jobdir/peg.synonyms.reduce_sims_index.btree";
219 :     if (! -f $pegsyn)
220 :     {
221 :     $pegsyn = "$jobdir/peg.synonyms";
222 :     }
223 :    
224 :     my $stage1 = "$FIG_Config::bin/reformat_timelogic_sims";
225 :     my $stage2 = "$FIG_Config::bin/reduce_sims $pegsyn $max_hits";
226 :     my $stage3 = "$FIG_Config::bin/reformat_sims $jobdir/nr-len.btree";
227 :    
228 :     my $proc_out = "$procdir/proc.$id";
229 :    
230 :     print "Run pipeline $stage1 | $stage2 | $stage3 > $proc_out (raw out to $raw_out)\n";
231 :    
232 :     open(PROC, "| $stage1 | $stage2 | $stage3 > $proc_out") or die "Cannot open pipeline: $! ($stage1 | $stage2 | $stage3 > $proc_out)";
233 :    
234 :     open(RAW, ">$raw_out") or die "Cannot open raw output $raw_out: $!";
235 :    
236 :     my $buf;
237 :     while (read(TL, $buf, 4096))
238 :     {
239 :     print PROC $buf;
240 :     print RAW $buf;
241 :     }
242 :     close(TL);
243 :     close(PROC);
244 :     close(RAW);
245 :     }
246 :    
247 :    
248 :     #
249 :     # Determine the timelogic identifier for this file.
250 :     # We do this by scanning the files in /decypher/search (caching the
251 :     # output in $status->{query_file, $id} = file).
252 :     #
253 :     sub find_tl_id
254 :     {
255 :     my($id, $file, $status) = @_;
256 :    
257 :     my $base = basename($file);
258 :    
259 :     my $tl_id = $status->{'query_id', $base};
260 :     if (!defined($tl_id))
261 :     {
262 :     warn "$tl_id not found\n";
263 :     for my $tlid (keys %search_status)
264 :     {
265 :     my($fstatus, $file) = @{$search_status{$tlid}};
266 :     my $path = "/decypher/search/$file";
267 :    
268 :     warn "Search $path\n";
269 :     open(F, "<$path") or $stage->fatal("cannot open $path: $!");
270 :    
271 :     while (<F>)
272 :     {
273 :     chomp;
274 :     if (/Query file on client:\s*(.*)\s*$/)
275 :     {
276 :     $status->{'query_id', basename($1)} = $tlid;
277 :     last;
278 :     }
279 :     }
280 :     close(F);
281 :     }
282 :     $status_tie->sync();
283 :     $tl_id = $status->{'query_id', $base};
284 :     }
285 :    
286 :     return $tl_id;
287 :     }

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3