[Bio] / FigKernelPackages / ServerThing.pm Repository:
ViewVC logotype

Diff of /FigKernelPackages/ServerThing.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.78, Mon May 2 22:10:31 2011 UTC revision 1.79, Tue May 3 17:07:53 2011 UTC
# Line 1  Line 1 
1  #!/usr/bin/perl -w  #!/usr/bin/perl -w
2    
3    
4  package ServerThing;  package ServerThing;
5      use Data::Dumper;      use Data::Dumper;
6      use strict;      use strict;
# Line 8  Line 9 
9      use JSON::Any;      use JSON::Any;
10      use ERDB;      use ERDB;
11      use TestUtils;      use TestUtils;
12      use Time::HiRes;      use Time::HiRes 'gettimeofday';
13      use File::Temp;      use File::Temp;
14      use ErrorMessage;      use ErrorMessage;
15      use CGI;      use CGI;
16    
17        use bytes;
18    
19      no warnings qw(once);      no warnings qw(once);
20    
21      # Maximum number of requests to run per invocation.      # Maximum number of requests to run per invocation.
22      use constant MAX_REQUESTS => 50;      use constant MAX_REQUESTS => 50;
23    
24    #
25    # Carefully import Log4perl.
26    #
27    BEGIN {
28        eval {
29            require Log::Log4perl;
30            Log::Log4perl->import('get_logger');
31            require Try::Tiny;
32            Try::Tiny->import;
33        };
34    };
35    
36  =head1 General Server Helper  =head1 General Server Helper
37    
38  This package provides a method-- I<RunServer>-- that can be called from a CGI  This package provides a method-- I<RunServer>-- that can be called from a CGI
# Line 57  Line 73 
73          } else {          } else {
74              # No error, so now we can process the request. First, get the method list.              # No error, so now we can process the request. First, get the method list.
75              my $methods = $serverThing->methods();              my $methods = $serverThing->methods();
76    
77                my $raw_methods = [];
78                if ($serverThing->can("raw_methods"))
79                {
80                    $raw_methods = $serverThing->raw_methods();
81                }
82              # Store it in the object so we can use it to validate methods.              # Store it in the object so we can use it to validate methods.
83              my %methodHash = map { $_ => 1 } @$methods;              my %methodHash = map { $_ => 1 } @$methods;
84              $serverThing->{methods} = \%methodHash;              $serverThing->{methods} = \%methodHash;
85                $serverThing->{raw_methods} => { map { $_ => 1 } @$raw_methods };
86              my $cgi;              my $cgi;
87              if (! defined $key) {              if (! defined $key) {
88                  # No tracing key, so presume we're a web service. Check for Fast CGI.                  # No tracing key, so presume we're a web service. Check for Fast CGI.
# Line 200  Line 223 
223      require UUID;      require UUID;
224      require CGI::Fast;      require CGI::Fast;
225    
226        my $logger = get_logger("FCGI::RunRabbitMQClientAsync");
227    
228      my $loop = IO::Async::Loop->new();      my $loop = IO::Async::Loop->new();
229    
230      my $conn = Net::RabbitMQ->new();      my $conn = Net::RabbitMQ->new();
# Line 213  Line 238 
238      my $exchange_name = "svr.$serverName";      my $exchange_name = "svr.$serverName";
239    
240      my $queue_name = $conn->queue_declare($channel,'', { durable => 0, exclusive => 1, auto_delete => 1 });      my $queue_name = $conn->queue_declare($channel,'', { durable => 0, exclusive => 1, auto_delete => 1 });
241      print "Created $queue_name $conf->{fcgi_port}\n";      $logger->info("Created $queue_name fcgi_port=$conf->{fcgi_port}");
242    
243      $conn->consume($channel, $queue_name, { no_ack => 1 });      $conn->consume($channel, $queue_name, { no_ack => 1 });
244    
245      my $waiting = {};      my $waiting = {};
246      my $global = { messages => 0 };      my $global = { messages => 0,
247                       queue_size => 0,
248                       };
249      my $rabbit_fh = IO::Handle->new();      my $rabbit_fh = IO::Handle->new();
250      $rabbit_fh->fdopen($rabbit_fd, "r");      $rabbit_fh->fdopen($rabbit_fd, "r");
251    
252      my $timer = IO::Async::Timer::Periodic->new(interval => 10,      my $timer = IO::Async::Timer::Periodic->new(interval => 60,
253                                                  on_tick => sub {                                                  on_tick => sub {
254                                                      my $last = $global->{last_time};                                                      my $last = $global->{last_time};
255                                                      my $now = time;                                                      my $now = gettimeofday;
256                                                      if (defined($last))                                                      if (defined($last))
257                                                      {                                                      {
258                                                          my $int = $now - $last;                                                          my $int = $now - $last;
259                                                          my $rate = $global->{messages} / $int;                                                          my $rate = $global->{messages} / $int;
260                                                          print "$rate\n";                                                          $logger->debug("$rate $global->{queue_size}");
261                                                            for my $ent (values %$waiting)
262                                                            {
263                                                                my $dur = $now - $ent->{start_time};
264                                                                my $ip = $ent->{request}->param("REMOTE_ADDR");
265                                                                $logger->debug(join("\t", '', $dur, $ip, $ent->{function}));
266                                                            }
267                                                      }                                                      }
268                                                      $global->{last_time} = $now;                                                      $global->{last_time} = $now;
269                                                      $global->{messages} = 0;                                                      $global->{messages} = 0;
# Line 241  Line 274 
274                                                   on_read_ready => sub {                                                   on_read_ready => sub {
275                                                       AsyncRabbitCheck($loop, $channel, $conn,                                                       AsyncRabbitCheck($loop, $channel, $conn,
276                                                                        $exchange_name, $queue_name,                                                                        $exchange_name, $queue_name,
277                                                                        $waiting);                                                                        $waiting, $global);
278                                                   });                                                   });
279      $loop->add($rabbit_listener);      $loop->add($rabbit_listener);
280    
# Line 249  Line 282 
282                                              my($fcgi, $req) = @_;                                              my($fcgi, $req) = @_;
283                                              $global->{messages}++;                                              $global->{messages}++;
284                                              AsyncFcgiReq($loop, $fcgi, $req, $channel, $conn, $queue_name,                                              AsyncFcgiReq($loop, $fcgi, $req, $channel, $conn, $queue_name,
285                                                           $exchange_name, $waiting);                                                           $exchange_name, $waiting, $global);
286                                          });                                          });
287      $loop->add( $fcgi );      $loop->add( $fcgi );
288    
289      $fcgi->listen(service  => $conf->{fcgi_port},      $fcgi->listen(service  => $conf->{fcgi_port},
290                    socktype => 'stream',                    socktype => 'stream',
291                    host => '0.0.0.0',                    host => '0.0.0.0',
292                     on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; },                     on_resolve_error => sub { $logger->error("Cannot resolve - $_[0]"); },
293                     on_listen_error  => sub { print STDERR "Cannot listen\n"; },                     on_listen_error  => sub { $logger->error("Cannot listen"); },
294                    );                    );
295    
296      $loop->loop_forever();      $loop->loop_forever();
# Line 265  Line 298 
298    
299  sub AsyncRabbitCheck  sub AsyncRabbitCheck
300  {  {
301      my($loop, $channel, $conn, $exchange_name, $queue_name, $waiting) = @_;      my($loop, $channel, $conn, $exchange_name, $queue_name, $waiting, $global) = @_;
302  #    print "Rabbit check!\n";  
303      my $msg = $conn->recv();      my $msg = $conn->recv();
304    
305        my $logger = get_logger("FCGI::QueueRead");
306        $logger->debug("AsyncRabbitCheck start");
307    
308      my $corr= $msg->{props}->{correlation_id};      my $corr= $msg->{props}->{correlation_id};
309    
310      my $slot = delete $waiting->{$corr};      my $slot = delete $waiting->{$corr};
311      if ($slot)      if ($slot)
312      {      {
 #       print "Finish request\n";  
313          my $req = $slot->{request};          my $req = $slot->{request};
314            my $start = $slot->{start_time};
315    
316          eval {          #
317              $req->print_stdout("Status: 200 OK\r\n" .          # Unpack body.
318            #
319            my($code, $msg, $body) = unpack("nN/aN/a", $msg->{body});
320    
321            my $now = gettimeofday;
322            my $elap = $now - $start;
323            $logger->info(sprintf("Evaluation of method $slot->{function} complete code=$code time=%.6f ip=%s corr=$corr",
324                                  $elap, $req->{params}->{REMOTE_ADDR}));
325    
326            try {
327                $req->print_stdout("Status: $code $msg\r\n" .
328                                 "Content-type: application/octet-stream\r\n" .                                 "Content-type: application/octet-stream\r\n" .
329                                 "\r\n");                                 "\r\n");
330              $req->print_stdout($msg->{body});              $req->print_stdout($body);
331              $req->finish();              $req->finish();
332          };              $global->{queue_size}--;
333            }
334            catch {
335                $logger->error("Error caught while returning response: $_");
336            }
337      }      }
338      else      else
339      {      {
340          print "No match for $corr\n";          $logger->error("No matching request found for correlation_id=$corr");
341      }      }
342  }  }
343    
344  sub AsyncFcgiReq  sub AsyncFcgiReq
345  {  {
346      my($loop, $fcgi, $req, $channel, $conn, $queue_name, $exchange_name, $waiting) = @_;      my($loop, $fcgi, $req, $channel, $conn, $queue_name, $exchange_name, $waiting, $global) = @_;
347    
348      # print STDERR "Working...\n";      my $logger = get_logger("FCGI::Handler");
349    
350      my $params = $req->params;      my $params = $req->params;
351      my $cgi = CGI->new();      my $cgi = CGI->new();
# Line 309  Line 359 
359      UUID::generate($uuid);      UUID::generate($uuid);
360      UUID::unparse($uuid, $uuid_str);      UUID::unparse($uuid, $uuid_str);
361    
362        $logger->debug("Request received for $function correlation_id=$uuid_str");
363    
364      my $encoding = $cgi->param('encoding') || 'yaml';      my $encoding = $cgi->param('encoding') || 'yaml';
365      my $type = $encoding eq 'yaml' ? 'application/yaml' : 'application/json';      my $type = $encoding eq 'yaml' ? 'application/yaml' : 'application/json';
366    
367      # print STDERR "publish request to $exchange_name rpc.$function\n";      my $now = gettimeofday;
368      $conn->publish($channel, "rpc.$function", $cgi->param('args'),  
369        my $s = YAML::Dump($params);
370    #    if ($s =~ /CONTENT/ )
371    #    {
372    #       $s = $s . ('-'  x (944 - length($s)));
373    #    }
374    #    print "pack length " . length($s) . "\n";
375        my $packed_data = pack("N/aN/a", $s, $in);
376    
377        $conn->publish($channel, "rpc.$function",
378                       $packed_data,
379                 { exchange => $exchange_name },                 { exchange => $exchange_name },
380                     {                     {
381                         content_type => $type,                         content_type => $type,
# Line 321  Line 383 
383                         reply_to => $queue_name,                         reply_to => $queue_name,
384                     });                     });
385    
386      # print STDERR "await resp\n";      $global->{queue_size}++;
387    
388      $waiting->{$uuid_str} = { request => $req };      $waiting->{$uuid_str} = { request => $req,
389                                  start_time => $now,
390                                  function => $function,
391                                  };
392  }  }
393    
394  =head2 RunRabbitMQServer  =head2 RunRabbitMQServer
# Line 338  Line 403 
403      # Get the parameters.      # Get the parameters.
404      my ($serverName, $conf) = @_;      my ($serverName, $conf) = @_;
405    
406        my $logger = get_logger("Server");
407        $logger->info("RunRabbitMQServer startup");
408    
409      eval {      eval {
410          my $output = $serverName;          my $output = $serverName;
411          $output =~ s/::/\//;          $output =~ s/::/\//;
# Line 345  Line 413 
413      };      };
414    
415      if ($@) {      if ($@) {
416          die "Could not load server module $serverName";          $logger->logdie("Could not load server module $serverName");
417      }      }
418      # Having successfully loaded the server code, we create the object.      # Having successfully loaded the server code, we create the object.
419      my $serverThing = $serverName->new();      my $serverThing = $serverName->new();
420    
421        my $methodsL = $serverThing->methods;
422        my $raw_methodsL = $serverThing->can("raw_methods") ? $serverThing->raw_methods : [];
423        my %methods = (methods => 1 );
424        my %raw_methods;
425        $methods{$_} = 1 for @$methodsL;
426        $raw_methods{$_} = 1 for @$raw_methodsL;
427    
428      require Net::RabbitMQ;      require Net::RabbitMQ;
429      require UUID;      require UUID;
430      require CGI::Fast;      require CGI::Fast;
# Line 368  Line 443 
443    
444      my $queue_name = "q.$exchange_name";      my $queue_name = "q.$exchange_name";
445    
 #    $conn->queue_unbind($channel, $queue_name, $exchange_name, 'rpc.*');  
 #    $conn->queue_unbind($channel, $queue_name, $exchange_name, 'rpc.#');  
 #    $conn->exchange_delete($channel, $exchange_name);  
   
446      $conn->exchange_declare($channel, $exchange_name, { exchange_type => "topic", durable => 1,      $conn->exchange_declare($channel, $exchange_name, { exchange_type => "topic", durable => 1,
447                                                              auto_delete => 0 });                                                              auto_delete => 0 });
448    
# Line 382  Line 453 
453      $conn->consume($channel, $queue_name, { no_ack => 0 } );      $conn->consume($channel, $queue_name, { no_ack => 0 } );
454      while (1)      while (1)
455      {      {
456          #print STDERR "Await message\n";          $logger->debug("Await message");
457    
458          my $msg = $conn->recv();          my $msg = $conn->recv();
459            $conn->ack($channel, $msg->{delivery_tag});
460    
461          my $key = $msg->{routing_key};          my $key = $msg->{routing_key};
462    
463          my $args = [];          my $args = [];
464    
465          my $method;          if ($key !~ /^rpc.(.*)/)
         my $encoding;  
         my $reply_to;  
         my $corr;  
         if ($key =~ /^rpc.(.*)/)  
466          {          {
467              $method = $1;              $logger->error("invalid message key '$key'");
468    #           $conn->ack($channel, $msg->{delivery_tag});
469                next;
470            }
471            my $method = $1;
472    
473              my $props = $msg->{props};              my $props = $msg->{props};
474              $encoding = $props->{content_type};          my $encoding = $props->{content_type};
475              $corr = $props->{correlation_id};          my $corr = $props->{correlation_id};
476              $reply_to = $props->{reply_to};          my $reply_to = $props->{reply_to};
477    
478            my $raw_body = $msg->{body};
479    
480            my($param_json, $body);
481            my $param;
482    
483            try {
484                ($param_json, $body) = unpack("N/aN/a", $raw_body);
485            } catch {
486                $logger->error("Error unpacking body: $!");
487                next;
488            };
489    
490              my $body = $msg->{body};          try {
491                $param = YAML::Load($param_json);
492            } catch {
493    
494                $logger->error("Error parsing JSON for method $method: $_");
495                $param = {};
496            };
497    
498            my $cgi = CGI->new();
499            $cgi->parse_params($body);
500    
501            my @res = ();
502            my $err;
503            my $enc_res = '';
504            my $start = gettimeofday;
505    
506            try {
507                if ($raw_methods{$method})
508                {
509                    $logger->debug("Raw evaluation of method $method");
510                    @res = $serverThing->$method($cgi);
511                }
512                elsif ($methods{$method})
513                {
514                    $logger->debug("Normal evaluation of method $method");
515                    my $arg_raw = $cgi->param('args');
516    
             eval {  
517                  if ($encoding eq 'application/json')                  if ($encoding eq 'application/json')
518                  {                  {
519                      $args = JSON::Any->jsonToObj($body);                      $args = JSON::Any->jsonToObj($arg_raw);
520                  }                  }
521                  elsif ($encoding eq 'application/yaml')                  elsif ($encoding eq 'application/yaml')
522                  {                  {
523                      $args = YAML::Load($body);                      $args = YAML::Load($arg_raw);
524                  }                  }
525                  else                  else
526                  {                  {
527                      warn "Invalid encoding $encoding";                      $logger->logwarn("Invalid encoding $encoding");
528                      $args = [];                      $args = [];
529                  }                  }
530              };                  @res = eval { $serverThing->$method($args) };
531          }          }
532          else          else
533          {          {
534              die "invalid method key\n";                  $logger->error("No method defined for $method");
535                    die new ServerReturn(500, "Undefined method", "No method defined for $method");
536          }          }
537                my $end = gettimeofday;
538                my $elap = $end - $start;
539                $logger->info(sprintf("Evaluation of method $method complete time=%.6f corr=$corr", $elap));
540    
         #print "$method $corr\n";  
         print "$$ $method\n";  
   
         my $res = eval { $serverThing->$method($args) };  
541    
         my $enc_res = '';  
         eval {  
542              if ($encoding eq 'application/json')              if ($encoding eq 'application/json')
543              {              {
544                  $enc_res = JSON::Any->objToJson($res);                  $enc_res = JSON::Any->objToJson(@res);
545              }              }
546              elsif ($encoding eq 'application/yaml')              elsif ($encoding eq 'application/yaml')
547              {              {
548                  $enc_res = YAML::Dump($res);                  $enc_res = YAML::Dump(@res);
549                }
550            }
551            catch
552            {
553                $logger->error("Error encountered in method evaluation: $_");
554                if (ref($_))
555                {
556                    $err = $_;
557                }
558                else
559                {
560                    $err = new ServerReturn(500, "Evaluation error", $_);
561              }              }
562          };          };
563            # print Dumper($encoding, $enc_res);
564    
565          $conn->ack($channel, $msg->{delivery_tag});          #
566          $conn->publish($channel, $reply_to, $enc_res, { exchange => '' }, { correlation_id => $corr });          # The returned message consists of a response code, response message,
567            # and the body of the response. These map currently to the HTTP return code,
568            # the short message, and the body of the reply. The FCGI management code that
569            # receives these responses does not touch the data in the body.
570            #
571    
572            my $ret;
573    
574            if ($err)
575            {
576                $ret = $err;
577            }
578            else
579            {
580                $ret = ServerReturn->new(200, "OK", $enc_res);
581            }
582    
583            $conn->publish($channel, $reply_to, $ret->package_response(), { exchange => '' }, { correlation_id => $corr });
584    
585      }      }
586  }  }
# Line 1536  Line 1672 
1672                      ChangeDB($serverThing, $dbName);                      ChangeDB($serverThing, $dbName);
1673                  }                  }
1674                  # Run the request.                  # Run the request.
1675                    if ($serverThing->{raw_methods}->{$function})
1676                    {
1677                        $document = eval { $serverThing->$function($cgi) };
1678                    }
1679                    else
1680                    {
1681                  $document = eval { $serverThing->$function($args) };                  $document = eval { $serverThing->$function($args) };
1682                    }
1683                  # If we have an error, create an error document.                  # If we have an error, create an error document.
1684                  if ($@) {                  if ($@) {
1685                      SendError($@, "Error detected by service.");                      SendError($@, "Error detected by service.");
# Line 1811  Line 1954 
1954      if ($message =~ /DBServer Error:\s+/) {      if ($message =~ /DBServer Error:\s+/) {
1955          $realStatus = "503 $status";          $realStatus = "503 $status";
1956      } else {      } else {
1957          $realStatus = "512 $status";          $realStatus = "500 $status";
1958      }      }
1959      # Print the header and the status message.      # Print the header and the status message.
1960      print CGI::header(-type => 'text/plain',      print CGI::header(-type => 'text/plain',
# Line 1847  Line 1990 
1990      close $oh;      close $oh;
1991  }  }
1992    
1993    package ServerReturn;
1994    
1995    =head1 ServerReturn
1996    
1997    ServerReturn is a little class used to encapsulate  responses to be
1998    sent back toclients. It holds an code code (to be pushed into
1999    a HTTP response response), a short message, and long details.
2000    
2001    =cut
2002    
2003    use strict;
2004    use base 'Class::Accessor';
2005    
2006    __PACKAGE__->mk_accessors(qw(code msg body));
2007    
2008    sub new
2009    {
2010        my($class, $code, $msg, $body) = @_;
2011        my $self = {
2012            code => $code,
2013            msg => $msg,
2014            body => $body,
2015        };
2016        return bless $self, $class;
2017    }
2018    
2019    sub package_response
2020    {
2021        my($self) = @_;
2022        return pack("nN/aN/a", @{$self}{qw(code msg body)});
2023    }
2024    
2025  1;  1;

Legend:
Removed from v.1.78  
changed lines
  Added in v.1.79

MCS Webmaster
ViewVC Help
Powered by ViewVC 1.0.3