1 |
package Sack::Lorry; |
2 |
|
3 |
use warnings; |
4 |
use strict; |
5 |
|
6 |
use IO::Socket::INET; |
7 |
use Data::Dump qw(dump); |
8 |
use Storable; |
9 |
use File::Slurp; |
10 |
|
11 |
our $pids; |
12 |
our $ports; |
13 |
|
14 |
$SIG{CHLD} = 'IGNORE'; |
15 |
|
16 |
my $port = 4000; |
17 |
|
18 |
sub new { |
19 |
my $class = shift; |
20 |
my $self = bless {@_}, $class; |
21 |
return $self; |
22 |
} |
23 |
|
24 |
sub start_node { |
25 |
my ( $self, $host ) = @_; |
26 |
|
27 |
system "rsync -rav /srv/Sack/ $host:/srv/Sack/"; |
28 |
|
29 |
if ( my $pid = fork ) { |
30 |
# parent |
31 |
$pids->{ "$host:$port" } = $pid; |
32 |
$ports->{ $port } = $host; |
33 |
|
34 |
my $sock; |
35 |
|
36 |
print STDERR "waiting for $port"; |
37 |
|
38 |
while ( ! $sock ) { |
39 |
|
40 |
$sock = IO::Socket::INET->new( |
41 |
PeerAddr => '127.0.0.1', |
42 |
PeerPort => $port, |
43 |
Proto => 'tcp', |
44 |
); |
45 |
|
46 |
if ( ! $sock ) { |
47 |
print STDERR "."; |
48 |
sleep 1; |
49 |
} |
50 |
|
51 |
} |
52 |
|
53 |
$self->{sock}->{$port} = $sock; |
54 |
|
55 |
warn "\nconnected to $port\n"; |
56 |
|
57 |
$self->{connected}->{$port} = $host; |
58 |
|
59 |
return $port++; |
60 |
|
61 |
} elsif ( ! defined $pid ) { |
62 |
warn "can't fork $host $port"; |
63 |
return; |
64 |
} else { |
65 |
# child |
66 |
my $cmd = $host !~ m{^(localhost|127\.)}i ? qq| |
67 |
ssh |
68 |
-S /tmp/sock.$port.ssh |
69 |
-L $port:127.0.0.1:$port |
70 |
$host |
71 |
| : ''; |
72 |
|
73 |
$cmd .= qq| |
74 |
/srv/Sack/bin/node.pl $port |
75 |
|; |
76 |
|
77 |
$cmd =~ s{\s+}{ }sg; |
78 |
|
79 |
warn "exec: $cmd\n"; |
80 |
exec $cmd; |
81 |
} |
82 |
} |
83 |
|
84 |
sub send_to { |
85 |
my ( $self, $port, $data ) = @_; |
86 |
warn "send_to [$port]\n"; |
87 |
Storable::store_fd( $data => $self->{sock}->{$port} ); |
88 |
} |
89 |
|
90 |
sub get_from { |
91 |
my ( $self, $port ) = @_; |
92 |
warn "get_from [$port]\n"; |
93 |
Storable::fd_retrieve( $self->{sock}->{$port} ); |
94 |
} |
95 |
|
96 |
sub send_to_all { |
97 |
my ( $self, $data ) = @_; |
98 |
$self->send_to( $_, $data ) foreach keys %{ $self->{connected} }; |
99 |
} |
100 |
|
101 |
sub get_from_all { |
102 |
my ( $self ) = @_; |
103 |
my $result; |
104 |
$result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} }; |
105 |
return $result; |
106 |
} |
107 |
|
108 |
|
109 |
our $out; |
110 |
|
111 |
sub merge { |
112 |
my ( $self, $new ) = @_; |
113 |
|
114 |
my $t_merge = time(); |
115 |
|
116 |
my $tick = 0; |
117 |
|
118 |
my $missing; |
119 |
|
120 |
foreach my $k1 ( keys %$new ) { |
121 |
|
122 |
foreach my $k2 ( keys %{ $new->{$k1} } ) { |
123 |
|
124 |
my $n = delete $new->{$k1}->{$k2}; |
125 |
|
126 |
my $ref = ref $out->{$k1}->{$k2}; |
127 |
|
128 |
if ( ! defined $out->{$k1}->{$k2} ) { |
129 |
$out->{$k1}->{$k2} = $n; |
130 |
} elsif ( $k1 =~ m{\+} ) { |
131 |
# warn "## agregate $k1 $k2"; |
132 |
$out->{$k1}->{$k2} += $n; |
133 |
} elsif ( $ref eq 'ARRAY' ) { |
134 |
if ( ref $n eq 'ARRAY' ) { |
135 |
push @{ $out->{$k1}->{$k2} }, $_ foreach @$n; |
136 |
} else { |
137 |
push @{ $out->{$k1}->{$k2} }, $n; |
138 |
} |
139 |
} elsif ( $ref eq '' ) { |
140 |
$out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; |
141 |
} else { |
142 |
die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); |
143 |
} |
144 |
|
145 |
if ( $tick++ % 1000 == 0 ) { |
146 |
print STDERR "."; |
147 |
} elsif ( $tick % 10000 == 0 ) { |
148 |
print STDERR $tick; |
149 |
} |
150 |
} |
151 |
} |
152 |
|
153 |
$t_merge = time - $t_merge; |
154 |
warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ; |
155 |
|
156 |
} |
157 |
|
158 |
|
159 |
sub view { |
160 |
my ( $self, $view ) = @_; |
161 |
|
162 |
$out = {}; |
163 |
|
164 |
warn "run view $view ", -s $view, " bytes\n"; |
165 |
|
166 |
my $view_code = read_file($view); |
167 |
$self->send_to_all({ view => $view_code }); |
168 |
|
169 |
foreach my $port ( keys %{ $self->{connected} } ) { |
170 |
warn "get_from $port\n"; |
171 |
my $result = $self->get_from( $port ); |
172 |
warn "# result ", dump $result if $self->{debug}; |
173 |
if ( $result->{view} ) { |
174 |
$self->merge( $result->{view} ); |
175 |
} else { |
176 |
warn "no view from $port\n"; |
177 |
} |
178 |
} |
179 |
|
180 |
return $out; |
181 |
} |
182 |
|
183 |
|
184 |
sub DESTROY { |
185 |
warn "pids ",dump( $pids ); |
186 |
foreach ( values %$pids ) { |
187 |
warn "kill $_"; |
188 |
kill 1,$_ || kill 9, $_; |
189 |
} |
190 |
} |
191 |
|
192 |
1; |