1 |
dpavlin |
92 |
package Sack::Node; |
2 |
|
|
|
3 |
|
|
use warnings; |
4 |
|
|
use strict; |
5 |
|
|
|
6 |
|
|
use IO::Socket::INET; |
7 |
|
|
use File::Slurp; |
8 |
|
|
use Carp qw(confess); |
9 |
|
|
use Data::Dump qw(dump); |
10 |
|
|
use Storable; |
11 |
dpavlin |
93 |
use Time::HiRes qw(time); |
12 |
dpavlin |
92 |
|
13 |
|
|
|
14 |
|
|
sub new { |
15 |
|
|
my $class = shift; |
16 |
|
|
my $port = shift; |
17 |
|
|
my $self = bless { port => $port }, $class; |
18 |
|
|
|
19 |
|
|
my $pid_path = "/tmp/sack.$port.pid"; |
20 |
|
|
if ( -e $pid_path ) { |
21 |
|
|
my $pid = read_file $pid_path; |
22 |
|
|
kill 9, $pid && warn "[$port] kill old $pid\n"; |
23 |
|
|
} |
24 |
|
|
write_file $pid_path, $$; |
25 |
|
|
|
26 |
|
|
my $sock = IO::Socket::INET->new( |
27 |
|
|
Listen => SOMAXCONN, |
28 |
|
|
LocalAddr => '127.0.0.1', |
29 |
|
|
LocalPort => $port, |
30 |
|
|
Proto => 'tcp', |
31 |
|
|
Reuse => 1, |
32 |
|
|
) or die "[$port] die $!"; |
33 |
|
|
|
34 |
|
|
warn "[$port] accept\n"; |
35 |
|
|
|
36 |
|
|
my $client = $sock->accept(); |
37 |
|
|
|
38 |
|
|
warn "[$port] connect from ", $client->peerhost, $/; |
39 |
|
|
|
40 |
|
|
while ( 1 ) { |
41 |
|
|
|
42 |
|
|
my $data = Storable::fd_retrieve( $client ); |
43 |
dpavlin |
95 |
warn "[$port] <<<<\n"; |
44 |
|
|
warn "[$port] data = ", dump( $data ); |
45 |
dpavlin |
92 |
|
46 |
|
|
my $result; |
47 |
|
|
|
48 |
|
|
if ( $data->{view} ) { |
49 |
|
|
$result = { view => $self->view( $data->{view} ) }; |
50 |
|
|
} elsif ( $data->{data} ) { |
51 |
|
|
$self->{data} = delete $data->{data}; |
52 |
|
|
$result = { data => 'loaded' }; |
53 |
|
|
} elsif ( $data->{exit} ) { |
54 |
|
|
warn "[$port] exit"; |
55 |
|
|
exit; |
56 |
|
|
} else { |
57 |
|
|
warn "[$port] UNKNOWN ", dump( $data ), $/; |
58 |
|
|
$result = { 'error' => $data }; |
59 |
|
|
} |
60 |
|
|
|
61 |
|
|
warn "[$port] >>>>\n"; |
62 |
|
|
Storable::store_fd( $result => $client ); |
63 |
|
|
} |
64 |
|
|
|
65 |
|
|
} |
66 |
|
|
|
67 |
dpavlin |
93 |
|
68 |
dpavlin |
92 |
our $rec; |
69 |
|
|
our $out; |
70 |
|
|
|
71 |
|
|
sub view { |
72 |
|
|
my ( $self, $code ) = @_; |
73 |
|
|
|
74 |
|
|
undef $out; |
75 |
|
|
|
76 |
|
|
my $affected = 0; |
77 |
dpavlin |
93 |
my $start_t = time; |
78 |
dpavlin |
92 |
|
79 |
|
|
my $coderef = eval "sub { $code }"; |
80 |
|
|
if ( $@ ) { |
81 |
|
|
warn "ABORT code: $@"; |
82 |
|
|
return; |
83 |
|
|
} |
84 |
|
|
|
85 |
|
|
|
86 |
|
|
foreach my $pos ( 0 .. $#{ $self->{data} } ) { |
87 |
dpavlin |
93 |
$rec = $self->{data}->[$pos]; |
88 |
dpavlin |
92 |
if ( ! $rec ) { |
89 |
|
|
print STDERR "END @ $pos"; |
90 |
|
|
last; |
91 |
|
|
} |
92 |
|
|
|
93 |
|
|
eval { $coderef->() }; |
94 |
|
|
if ( $@ ) { |
95 |
|
|
warn "ABORT $pos $@\n"; |
96 |
|
|
last; |
97 |
|
|
} else { |
98 |
|
|
$affected++; |
99 |
|
|
} |
100 |
|
|
|
101 |
|
|
$pos % 10000 == 0 ? print STDERR $pos : |
102 |
|
|
$pos % 1000 == 0 ? print STDERR "." : 0 ; |
103 |
|
|
}; |
104 |
|
|
|
105 |
dpavlin |
93 |
my $dt = time - $start_t; |
106 |
|
|
my $report = [ $self->{port}, $affected, $dt, scalar $self->{data} / $dt ]; |
107 |
|
|
warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report; |
108 |
dpavlin |
92 |
|
109 |
dpavlin |
93 |
warn "out ", dump( $out ); |
110 |
|
|
|
111 |
|
|
return $out; |
112 |
dpavlin |
92 |
} |
113 |
|
|
|
114 |
|
|
1; |