1 |
package Sack::Node; |
2 |
|
3 |
use warnings; |
4 |
use strict; |
5 |
|
6 |
use IO::Socket::INET; |
7 |
use File::Slurp; |
8 |
use Carp qw(confess); |
9 |
use Data::Dump qw(dump); |
10 |
use Storable; |
11 |
use Time::HiRes qw(time); |
12 |
|
13 |
use lib 'lib'; |
14 |
use base qw(Sack::Pid); |
15 |
use Sack::Color; |
16 |
use Sack; |
17 |
|
18 |
sub new { |
19 |
my $class = shift; |
20 |
my $port = shift; |
21 |
my $self = bless { port => $port }, $class; |
22 |
|
23 |
$self->port_pid( $port ); |
24 |
|
25 |
my $sock = IO::Socket::INET->new( |
26 |
Listen => SOMAXCONN, |
27 |
LocalAddr => '127.0.0.1', |
28 |
LocalPort => $port, |
29 |
Proto => 'tcp', |
30 |
Reuse => 1, |
31 |
) or die "[$port] die $!"; |
32 |
|
33 |
my $client; |
34 |
|
35 |
while ( 1 ) { |
36 |
|
37 |
if ( ! $client ) { |
38 |
warn "[$port] accept $Sack::VERSION\n"; |
39 |
$client = $sock->accept(); |
40 |
warn "[$port] connect from ", $client->peerhost, $/; |
41 |
} |
42 |
|
43 |
my $data = Storable::fd_retrieve( $client ); |
44 |
|
45 |
if ( defined $data->{data} ) { |
46 |
warn "# [$port] <<<< data\n" if $self->{debug}; |
47 |
} else { |
48 |
warn "# [$port] <<<< ", dump( $data ), $/ if $self->{debug}; |
49 |
} |
50 |
|
51 |
my $result; |
52 |
|
53 |
if ( $data->{view} ) { |
54 |
$result = $self->view( $data->{view} ); |
55 |
} elsif ( $data->{data} ) { |
56 |
$self->{data} = delete $data->{data}; |
57 |
$result = { data => 'loaded' }; |
58 |
} elsif ( $data->{exit} ) { |
59 |
warn "[$port] exit\n"; |
60 |
close $sock; |
61 |
exit; |
62 |
} elsif ( $data->{restart} ) { |
63 |
warn "[$port] restart"; |
64 |
close $sock; |
65 |
exec "$0 $port"; |
66 |
} elsif ( $data->{info} ) { |
67 |
$result = { |
68 |
version => $Sack::VERSION, |
69 |
size => $#{ $self->{data} } + 1, |
70 |
reports => $self->{reports}, |
71 |
}; |
72 |
} elsif ( my $sh = delete $data->{sh} ) { |
73 |
$result = { sh => scalar `$sh` }; |
74 |
} elsif ( defined $data->{debug} ) { |
75 |
$result = { debug => $self->{debug} = $data->{debug} }; |
76 |
} else { |
77 |
warn "[$port] UNKNOWN ", dump( $data ), $/; |
78 |
$result = { 'error' => 'unknown', data => $data }; |
79 |
} |
80 |
|
81 |
$result = { 'error' => 'result not reference', result => $result, data => $data } unless ref($result); |
82 |
|
83 |
warn "# [$port] >>>>\n"; |
84 |
Storable::store_fd( $result => $client ); |
85 |
} |
86 |
|
87 |
} |
88 |
|
89 |
sub view { |
90 |
my ( $self, $code ) = @_; |
91 |
|
92 |
my $affected = 0; |
93 |
my $start_t = time; |
94 |
|
95 |
my $out; |
96 |
|
97 |
my $coderef = eval "sub { my \$rec = \$_[0]; $code }"; |
98 |
if ( $@ ) { |
99 |
warn "ABORT code: $@"; |
100 |
return; |
101 |
} |
102 |
|
103 |
|
104 |
foreach my $pos ( 0 .. $#{ $self->{data} } ) { |
105 |
if ( ! defined $self->{data}->[$pos] ) { |
106 |
print STDERR "END @ $pos"; |
107 |
last; |
108 |
} |
109 |
|
110 |
eval { $coderef->( $self->{data}->[$pos] ) }; |
111 |
|
112 |
if ( $@ ) { |
113 |
warn "ABORT $pos $@\n"; |
114 |
last; |
115 |
} else { |
116 |
$affected++; |
117 |
} |
118 |
|
119 |
$pos % 10000 == 0 ? print STDERR $pos : |
120 |
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
121 |
}; |
122 |
|
123 |
my $dt = time - $start_t; |
124 |
my $report = [ $self->{port}, $affected, $dt, $affected / $dt ]; |
125 |
warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report; |
126 |
|
127 |
push @{ $self->{reports} }, "$affected in ${dt}s"; |
128 |
|
129 |
warn "[$self->{port}] out ", dump( $out ),$/ if $self->{debug}; |
130 |
|
131 |
return { |
132 |
out => $out, |
133 |
}; |
134 |
} |
135 |
|
136 |
1; |