/[Sack]/trunk/lib/Sack/View.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/Sack/View.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

trunk/lib/Sack/Node.pm revision 162 by dpavlin, Fri Oct 30 14:53:22 2009 UTC trunk/lib/Sack/View.pm revision 193 by dpavlin, Sun Nov 8 14:47:48 2009 UTC
# Line 1  Line 1 
1  package Sack::Node;  package Sack::View;
2    
3  use warnings;  use warnings;
4  use strict;  use strict;
5    
 use IO::Socket::INET;  
 use File::Slurp;  
 use Carp qw(confess);  
6  use Data::Dump qw(dump);  use Data::Dump qw(dump);
 use Storable;  
 use Time::HiRes qw(time);  
7    
8  use lib 'lib';  use lib '/srv/Sack/lib';
 use base qw(Sack::Pid);  
9  use Sack::Color;  use Sack::Color;
10  use Sack;  use Sack;
11    
12  sub new {  our $coderef;
13          my $class = shift;  our $out;
         my $port = shift;  
         my $self  = bless { port => $port }, $class;  
   
         $self->port_pid( $port );  
   
         my $sock = IO::Socket::INET->new(  
                 Listen    => SOMAXCONN,  
                 LocalAddr => '127.0.0.1',  
                 LocalPort => $port,  
                 Proto     => 'tcp',  
                 Reuse     => 1,  
         ) or die "[$port] die $!";  
   
         my $client;  
   
         while ( 1 ) {  
   
                 if ( ! $client ) {  
                         warn "[$port] accept $Sack::VERSION\n";  
                         $client = $sock->accept();  
                         warn "[$port] connect from ", $client->peerhost, $/;  
                 }  
14    
15                  my $data = Storable::fd_retrieve( $client );  sub out { $out };
16    
17                  if ( defined $data->{data} ) {  sub code {
18                          warn "# [$port] <<<< data\n" if $self->{debug};          my ( $self, $code ) = @_;
                 } else {  
                         warn "# [$port] <<<< ", dump( $data ), $/ if $self->{debug};  
                 }  
19    
20                  my $result;  warn "XX code $code";
21    
22                  if ( $data->{view} ) {          undef $out;
                         $result = $self->view( $data->{view} );  
                 } elsif ( $data->{data} ) {  
                         $self->{data} = delete $data->{data};  
                         $result = { data => 'loaded' };  
                 } elsif ( $data->{exit} ) {  
                         warn "[$port] exit\n";  
                         close $sock;  
                         exit;  
                 } elsif ( $data->{restart} ) {  
                         warn "[$port] restart";  
                         close $sock;  
                         exec "$0 $port";  
                 } elsif ( $data->{info} ) {  
                         $result = {  
                                 version => $Sack::VERSION,  
                                 size => $#{ $self->{data} } + 1,  
                                 reports => $self->{reports},  
                         };  
                 } elsif ( my $sh = delete $data->{sh} ) {  
                         $result = { sh => scalar `$sh` };  
                 } elsif ( defined $data->{debug} ) {  
                         $result = { debug => $self->{debug} = $data->{debug} };  
                 } else {  
                         warn "[$port] UNKNOWN ", dump( $data ), $/;  
                         $result = { 'error' => 'unknown', data => $data };  
                 }  
23    
24                  $result = { 'error' => 'result not reference', result => $result, data => $data } unless ref($result);          $coderef = eval "sub { my \$rec = \$_[0]; $code }";
25    
26                  warn "# [$port] >>>>\n";          if ( $@ ) {
27                  Storable::store_fd( $result => $client );                  warn "ABORT code: $@";
28                    return;
29          }          }
30    
31            ref $coderef eq 'CODE';
32  }  }
33    
34  sub view {  sub on_shard {
35          my ( $self, $code ) = @_;          my ( $self, $data ) = @_;
   
         my $affected = 0;  
         my $start_t = time;  
   
         my $out;  
36    
37          my $coderef = eval "sub { my \$rec = \$_[0]; $code }";  warn "XX data ",dump $data;
         if ( $@ ) {  
                 warn "ABORT code: $@";  
                 return;  
         }  
38    
39            my $affected = 0;
40    
41          foreach my $pos ( 0 .. $#{ $self->{data} } ) {          foreach my $pos ( 0 .. $#{ $data } ) {
42                  if ( ! defined $self->{data}->[$pos] ) {                  if ( ! defined $data->[$pos] ) {
43                          print STDERR "END @ $pos";                          print STDERR "END @ $pos";
44                          last;                          last;
45                  }                  }
46    
47                  eval { $coderef->( $self->{data}->[$pos] ) };                  eval { $coderef->( $data->[$pos] ) };
48    
49                  if ( $@ ) {                  if ( $@ ) {
50                          warn "ABORT $pos $@\n";                          warn "ABORT $pos $@\n";
# Line 120  sub view { Line 57  sub view {
57                  $pos % 1000  == 0 ? print STDERR "."  : 0 ;                  $pos % 1000  == 0 ? print STDERR "."  : 0 ;
58          };          };
59    
60          my $dt = time - $start_t;          warn "## out ", dump( $out );
         my $report = [ $self->{port}, $affected, $dt, $affected / $dt ];  
         warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report;  
   
         push @{ $self->{reports} }, "$affected in ${dt}s";  
   
         warn "[$self->{port}] out ", dump( $out ),$/ if $self->{debug};  
   
         return {  
                 out => $out,  
         };  
61  }  }
62    
63  1;  1;

Legend:
Removed from v.162  
changed lines
  Added in v.193

  ViewVC Help
Powered by ViewVC 1.1.26