[Bio] / FortyEight / SGE.pm Repository:
ViewVC logotype

Annotation of /FortyEight/SGE.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (view) (download) (as text)

1 : olson 1.1
2 :     package SGE;
3 :    
4 :     use XML::LibXML;
5 :     use strict;
6 :     use Data::Dumper;
7 : olson 1.2 use FIG_Config;
8 : olson 1.1
9 :     #
10 :     # Utilities for dealing with a SGE-enabled cluster.
11 :     #
12 :    
13 :     sub new
14 :     {
15 :     my($class) = @_;
16 :    
17 :     my $self = {
18 :     jobs => {},
19 :     };
20 :    
21 :     bless $self, $class;
22 :    
23 : olson 1.2 #
24 :     # Initialize our environment with the SGE stuff we need.
25 :     #
26 :     my $sge_root = $FIG_Config::sge_root || "/vol/sge";
27 :     my $sge_cell = $FIG_Config::sge_cell || "default";
28 :    
29 :     my %env = map { /^([^=]+)=(.*)/ } `. $sge_root/$sge_cell/common/settings.sh; set`;
30 :     for my $k (grep { /SGE/ } keys %env)
31 :     {
32 :     $ENV{$k} = $env{$k};
33 :     }
34 :     my $arch = `$sge_root/util/arch`;
35 :     chomp $arch;
36 :     $ENV{PATH} = "$sge_root/bin/$arch:$ENV{PATH}" if $arch;
37 :    
38 : olson 1.1 $self->read_qstat();
39 :    
40 :     return $self;
41 :     }
42 :    
43 :     sub read_qstat
44 :     {
45 :     my($self) = @_;
46 :     if (!open(Q, "qstat -f -s prsz -xml |"))
47 :     {
48 :     warn "Could not read queue status: $!\n";
49 :     return;
50 :     }
51 :    
52 :     my $parser = XML::LibXML->new();
53 :     my $doc = $parser->parse_fh(\*Q);
54 :    
55 :     close(Q);
56 :     if (!$doc)
57 :     {
58 :     die "Cannot parse qstat output\n";
59 :     }
60 :    
61 :     #
62 :     # Walk the joblists and populate $self->{jobs} with information about them.
63 :     #
64 :    
65 :     for my $node ($doc->findnodes('//job_list'))
66 :     {
67 :     my $job = SGE::Job->new($node);
68 :     $self->add_job($job);
69 :     }
70 :     # print Dumper($self->{jobs});
71 :     }
72 :    
73 :     sub add_job
74 :     {
75 :     my($self, $job) = @_;
76 :    
77 :     push @{$self->{jobs}->{$job->id}}, $job;
78 : olson 1.2
79 :     #
80 :     # Also push into job/task index. We need to expand tasks that show up as
81 :     # a-b:n,a-b etc
82 :     #
83 :    
84 :     my @tlist = split(/,/, $job->tasks);
85 :     for my $tent (@tlist)
86 :     {
87 :     if ($tent =~ /^\d+$/)
88 :     {
89 :     $self->{tasks}->{$job->id, $tent} = $job;
90 :     }
91 :     elsif ($tent =~ /^(\d+)-(\d+)$/)
92 :     {
93 :     map { $self->{tasks}->{$job->id, $_} = $job } $1..$2;
94 :     }
95 :     elsif ($tent =~ /^(\d+)-(\d+):(\d+)$/)
96 :     {
97 :     for (my $t = $1; $t <= $2; $t += $3)
98 :     {
99 :     $self->{tasks}->{$job->id, $t} = $job;
100 :     }
101 :     }
102 :     else
103 :     {
104 :     die "unknown task specifier '$tent'\n";
105 :     }
106 :    
107 :     }
108 : olson 1.1 }
109 :    
110 :     #
111 :     # A job is running if there are any instances that are still running.
112 :     #
113 :     # We return the list of running jobs; in a scalar context this acts correctly.
114 :     #
115 :    
116 :     sub job_running
117 :     {
118 :     my($self, $id) = @_;
119 :    
120 :     my $jobs = $self->{jobs}->{$id};
121 :     my @running = grep { $_->state eq 'running' } @$jobs;
122 :     return @running;
123 :     }
124 :    
125 :     sub job_queued
126 :     {
127 :     my($self, $id) = @_;
128 :    
129 :     my $jobs = $self->{jobs}->{$id};
130 :     my @running = grep { $_->state eq 'pending' } @$jobs;
131 :     return @running;
132 :     }
133 :    
134 : olson 1.2 sub find_task
135 :     {
136 :     my($self, $job, $task) = @_;
137 :    
138 :     return $self->{tasks}->{$job, $task};
139 :     }
140 :    
141 : olson 1.1 sub submit_job
142 :     {
143 :     my($self, $meta, $sge_args, $cmd) = @_;
144 :    
145 :     my $sge_cmd = "qsub $sge_args $cmd";
146 :    
147 :     $meta->add_log_entry($0, $sge_cmd) if $meta;
148 :    
149 :     if (!open(Q, "$sge_cmd 2>&1 |"))
150 :     {
151 :     die "Qsub failed: $!";
152 :     }
153 :     my $sge_job_id;
154 :     my $submit_output;
155 :     while (<Q>)
156 :     {
157 :     $submit_output .= $_;
158 :     print "Qsub: $_";
159 :     if (/Your\s+job\s+(\d+)/)
160 :     {
161 :     $sge_job_id = $1;
162 :     }
163 :     elsif (/Your\s+job-array\s+(\d+)/)
164 :     {
165 :     $sge_job_id = $1;
166 :     }
167 :     }
168 :     $meta->add_log_entry($0, ["qsub_output", $submit_output]) if $meta;
169 :     if (!close(Q))
170 :     {
171 :     die "Qsub close failed: $!";
172 :     }
173 :    
174 :     if (!$sge_job_id)
175 :     {
176 :     die "did not get job id from qsub";
177 :     }
178 :    
179 :     return $sge_job_id;
180 :     }
181 :    
182 :     package SGE::Job;
183 :    
184 :     use Data::Dumper;
185 :     use strict;
186 :     use base 'Class::Accessor';
187 :    
188 :     __PACKAGE__->mk_accessors(qw(id prio name owner start_time slots tasks state));
189 :    
190 :     sub new
191 :     {
192 :     my($class, $node) = @_;
193 :    
194 :    
195 :     my $self = {
196 :     node => $node,
197 :     };
198 :    
199 :     bless($self, $class);
200 :    
201 :     for my $pair ((['id', 'JB_job_number'],
202 :     [prio => 'JAT_prio'],
203 :     [name => 'JB_name'],
204 :     [owner => 'JB_owner'],
205 :     [start_time => 'JAT_start_time'],
206 :     [slots => 'slots'],
207 :     [tasks => 'tasks']))
208 :     {
209 :     my($name, $key) = @$pair;
210 :     $self->{$name} = $self->getAttr($key);
211 :     }
212 :     $self->state($node->getAttribute('state'));
213 :    
214 :     return $self;
215 :     }
216 :    
217 :     sub getAttr
218 :     {
219 :     my($self, $name) = @_;
220 :    
221 :     my $l = $self->{node}->getChildrenByTagName($name);
222 :    
223 :     if ($l)
224 :     {
225 :     return $l->item(0)->firstChild->nodeValue();
226 :     }
227 :     else
228 :     {
229 :     return undef;
230 :     }
231 :     }
232 :     1;

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3