1 |
package Sack::Node; |
package Sack::View; |
2 |
|
|
3 |
use warnings; |
use warnings; |
4 |
use strict; |
use strict; |
5 |
|
|
|
use IO::Socket::INET; |
|
|
use File::Slurp; |
|
|
use Carp qw(confess); |
|
6 |
use Data::Dump qw(dump); |
use Data::Dump qw(dump); |
|
use Storable; |
|
|
use Time::HiRes qw(time); |
|
7 |
|
|
8 |
use lib 'lib'; |
use lib '/srv/Sack/lib'; |
|
use base qw(Sack::Pid); |
|
9 |
use Sack::Color; |
use Sack::Color; |
10 |
use Sack; |
use Sack; |
11 |
|
|
12 |
sub new { |
our $coderef; |
13 |
my $class = shift; |
our $out; |
|
my $port = shift; |
|
|
my $self = bless { port => $port }, $class; |
|
|
|
|
|
$self->port_pid( $port ); |
|
|
|
|
|
my $sock = IO::Socket::INET->new( |
|
|
Listen => SOMAXCONN, |
|
|
LocalAddr => '127.0.0.1', |
|
|
LocalPort => $port, |
|
|
Proto => 'tcp', |
|
|
Reuse => 1, |
|
|
) or die "[$port] die $!"; |
|
|
|
|
|
my $client; |
|
|
|
|
|
while ( 1 ) { |
|
|
|
|
|
if ( ! $client ) { |
|
|
warn "[$port] accept $Sack::VERSION\n"; |
|
|
$client = $sock->accept(); |
|
|
warn "[$port] connect from ", $client->peerhost, $/; |
|
|
} |
|
14 |
|
|
15 |
my $data = Storable::fd_retrieve( $client ); |
sub out { $out }; |
16 |
|
|
17 |
if ( defined $data->{data} ) { |
sub code { |
18 |
warn "# [$port] <<<< data\n" if $self->{debug}; |
my ( $self, $code ) = @_; |
|
} else { |
|
|
warn "# [$port] <<<< ", dump( $data ), $/ if $self->{debug}; |
|
|
} |
|
19 |
|
|
20 |
my $result; |
warn "XX code $code"; |
21 |
|
|
22 |
if ( $data->{view} ) { |
undef $out; |
|
$result = $self->view( $data->{view} ); |
|
|
} elsif ( $data->{data} ) { |
|
|
$self->{data} = delete $data->{data}; |
|
|
$result = { data => 'loaded' }; |
|
|
} elsif ( $data->{exit} ) { |
|
|
warn "[$port] exit\n"; |
|
|
close $sock; |
|
|
exit; |
|
|
} elsif ( $data->{restart} ) { |
|
|
warn "[$port] restart"; |
|
|
close $sock; |
|
|
exec "$0 $port"; |
|
|
} elsif ( $data->{info} ) { |
|
|
$result = { |
|
|
version => $Sack::VERSION, |
|
|
size => $#{ $self->{data} } + 1, |
|
|
reports => $self->{reports}, |
|
|
}; |
|
|
} elsif ( my $sh = delete $data->{sh} ) { |
|
|
$result = { sh => scalar `$sh` }; |
|
|
} elsif ( defined $data->{debug} ) { |
|
|
$result = { debug => $self->{debug} = $data->{debug} }; |
|
|
} else { |
|
|
warn "[$port] UNKNOWN ", dump( $data ), $/; |
|
|
$result = { 'error' => 'unknown', data => $data }; |
|
|
} |
|
23 |
|
|
24 |
$result = { 'error' => 'result not reference', result => $result, data => $data } unless ref($result); |
$coderef = eval "sub { my \$rec = \$_[0]; $code }"; |
25 |
|
|
26 |
warn "# [$port] >>>>\n"; |
if ( $@ ) { |
27 |
Storable::store_fd( $result => $client ); |
warn "ABORT code: $@"; |
28 |
|
return; |
29 |
} |
} |
30 |
|
|
31 |
|
ref $coderef eq 'CODE'; |
32 |
} |
} |
33 |
|
|
34 |
sub view { |
sub on_shard { |
35 |
my ( $self, $code ) = @_; |
my ( $self, $data ) = @_; |
|
|
|
|
my $affected = 0; |
|
|
my $start_t = time; |
|
|
|
|
|
my $out; |
|
36 |
|
|
37 |
my $coderef = eval "sub { my \$rec = \$_[0]; $code }"; |
warn "XX data ",dump $data; |
|
if ( $@ ) { |
|
|
warn "ABORT code: $@"; |
|
|
return; |
|
|
} |
|
38 |
|
|
39 |
|
my $affected = 0; |
40 |
|
|
41 |
foreach my $pos ( 0 .. $#{ $self->{data} } ) { |
foreach my $pos ( 0 .. $#{ $data } ) { |
42 |
if ( ! defined $self->{data}->[$pos] ) { |
if ( ! defined $data->[$pos] ) { |
43 |
print STDERR "END @ $pos"; |
print STDERR "END @ $pos"; |
44 |
last; |
last; |
45 |
} |
} |
46 |
|
|
47 |
eval { $coderef->( $self->{data}->[$pos] ) }; |
eval { $coderef->( $data->[$pos] ) }; |
48 |
|
|
49 |
if ( $@ ) { |
if ( $@ ) { |
50 |
warn "ABORT $pos $@\n"; |
warn "ABORT $pos $@\n"; |
57 |
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
58 |
}; |
}; |
59 |
|
|
60 |
my $dt = time - $start_t; |
warn "## out ", dump( $out ); |
|
my $report = [ $self->{port}, $affected, $dt, $affected / $dt ]; |
|
|
warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report; |
|
|
|
|
|
push @{ $self->{reports} }, "$affected in ${dt}s"; |
|
|
|
|
|
warn "[$self->{port}] out ", dump( $out ),$/ if $self->{debug}; |
|
|
|
|
|
return { |
|
|
out => $out, |
|
|
}; |
|
61 |
} |
} |
62 |
|
|
63 |
1; |
1; |