/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 144 - (show annotations)
Wed Oct 7 20:55:14 2009 UTC (14 years, 7 months ago) by dpavlin
File size: 6759 byte(s)
split update_node out
1 package Sack::Lorry;
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use Data::Dump qw(dump);
8 use Storable;
9 use File::Slurp;
10 use Net::Ping;
11 use Time::HiRes qw(time sleep);
12
13 use lib 'lib';
14 use base qw(Sack::Pid);
15 use Sack;
16
17 our $pids;
18
19 $SIG{CHLD} = 'IGNORE';
20
21 sub new {
22 my $class = shift;
23 my $self = bless {@_}, $class;
24 $self->{sock} = {};
25 warn __PACKAGE__, " $Sack::VERSION\n";
26 return $self;
27 }
28
29 sub connected {
30 sort keys %{ $_[0]->{sock} }
31 }
32
33 sub connect_to {
34 my ( $self, $port, $retries ) = @_;
35
36 $retries ||= 30;
37 warn "# connect_to [$port] $retries times";
38
39 my $sock = $self->{sock}->{$port};
40
41 while ( ! $sock && $retries-- ) {
42
43 $sock = IO::Socket::INET->new(
44 PeerAddr => '127.0.0.1',
45 PeerPort => $port,
46 Proto => 'tcp',
47 );
48
49 if ( ! $sock ) {
50 print STDERR ".";
51 sleep 0.5;
52 } elsif ( $sock->connected ) {
53 $self->{sock}->{$port} = $sock;
54 warn "# connected to $port\n";
55
56 $self->send_to( $port, { info => 1 } );
57 warn "info ", dump( $self->get_from( $port ) ), $/;
58
59 } else {
60 close $sock;
61 }
62
63 }
64
65 if ( ! $retries ) {
66 warn "SKIP $port: $!";
67 return;
68 } else {
69 return $port;
70 }
71 }
72
73
74 sub start_node_port {
75 my ( $self, $host, $port ) = @_;
76
77 chomp $host;
78
79 my $p = Net::Ping->new;
80
81 if ( ! $p->ping( $host ) ) {
82 warn "can't ping [$host]\n";
83 return;
84 }
85
86 if ( $self->connect_to( $port, 1 ) ) {
87 warn "re-using existing $port";
88 }
89
90 my $ssh_config = '-F etc/lib.ssh';
91
92 my $pid_path = "/tmp/sack.$port.pid";
93 kill 9, read_file $pid_path if -e $pid_path;
94
95 if ( my $pid = fork ) {
96 # parent
97
98 $self->connect_to( $port ) || return;
99
100 $pids->{ $port } = $pid;
101 $self->{port_on_host}->{$port} = $host;
102
103 warn "start_node_port $host [$port] pid $pid\n";
104
105 return $port;
106
107 } elsif ( ! defined $pid ) {
108 warn "can't fork $host $port";
109 return;
110 } else {
111 # child
112
113 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
114 ssh -f $ssh_config
115 -S /tmp/sock.$port.ssh
116 -L $port:127.0.0.1:$port
117 $host
118 | : '';
119
120 $cmd .= qq|
121 /srv/Sack/bin/node.pl $port
122 |;
123
124 $cmd =~ s{\s+}{ }sg;
125
126 $self->port_pid( $port, $$ );
127
128 warn "# exec: $cmd\n";
129 exec $cmd;
130 }
131 }
132
133 sub send_to {
134 my ( $self, $port, $data ) = @_;
135 # warn "send_to [$port]\n";
136 Storable::store_fd( $data => $self->{sock}->{$port} );
137 }
138
139 sub get_from {
140 my ( $self, $port ) = @_;
141 # warn "get_from [$port]\n";
142 my $data;
143 eval {
144 $data = Storable::fd_retrieve( $self->{sock}->{$port} );
145 };
146 warn "ERROR $@" if $@;
147 return $data;
148 }
149
150 sub send_to_all {
151 my ( $self, $data ) = @_;
152 $self->send_to( $_, $data ) foreach $self->connected;
153 }
154
155 sub get_from_all {
156 my ( $self ) = @_;
157 my $result;
158 $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
159 return $result;
160 }
161
162 sub restart_nodes {
163 my ( $self ) = @_;
164 foreach my $port ( $self->connected ) {
165 warn "restart [$port]\n";
166 # $self->send_to( $port, { restart => 1 } );
167 # $self->connect_to( $port );
168 $self->send_to( $port, { exit => 1 } );
169 kill 9, $pids->{$port};
170 $self->start_node_port( $self->{port_on_host}->{$port}, $port );
171 }
172 }
173
174
175 our $out;
176
177 use Digest::MD5 qw(md5);
178 our $nr = 0;
179 our $md5_nr;
180 our $digest_fh;
181 our @digest_offset;
182
183 sub merge {
184 my ( $self, $new ) = @_;
185
186 my $t_merge = time();
187
188 my $tick = 0;
189
190 my $missing;
191
192 foreach my $k1 ( keys %$new ) {
193
194 foreach my $k2 ( keys %{ $new->{$k1} } ) {
195
196 my $n = delete $new->{$k1}->{$k2};
197
198 if ( $k1 =~ m{#} ) {
199 my $md5 = md5 $k2;
200 if ( defined $md5_nr->{$md5} ) {
201 $k2 = $md5_nr->{$md5};
202 } else {
203 open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
204 $digest_offset[ $nr ] = tell( $digest_fh );
205 print $digest_fh "$k2\n";
206
207 $k2 = $md5_nr->{$md5} = $nr;
208 $nr++;
209 }
210 }
211
212 my $ref = ref $out->{$k1}->{$k2};
213
214 if ( ! defined $out->{$k1}->{$k2} ) {
215 $out->{$k1}->{$k2} = $n;
216 } elsif ( $k1 =~ m{\+} ) {
217 # warn "## agregate $k1 $k2";
218 $out->{$k1}->{$k2} += $n;
219 } elsif ( $ref eq 'ARRAY' ) {
220 if ( ref $n eq 'ARRAY' ) {
221 push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
222 } else {
223 push @{ $out->{$k1}->{$k2} }, $n;
224 }
225 } elsif ( $ref eq '' ) {
226 $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
227 } else {
228 die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
229 }
230
231 if ( $tick++ % 1000 == 0 ) {
232 print STDERR ".";
233 } elsif ( $tick % 10000 == 0 ) {
234 print STDERR $tick;
235 }
236 }
237 }
238
239 $t_merge = time - $t_merge;
240 warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
241
242 return $tick;
243 }
244
245
246 sub view {
247 my ( $self, $view ) = @_;
248
249 my $t = time;
250 $out = {};
251
252 warn "run view $view ", -s $view, " bytes\n";
253
254 my $view_code = read_file($view);
255 $self->send_to_all({ view => $view_code });
256
257 my $total;
258
259 foreach my $port ( $self->connected ) {
260 my $result = $self->get_from( $port );
261 warn "# result ", dump $result if $self->{debug};
262 if ( my $out = delete $result->{out} ) {
263 warn "[$port] result ", dump($result), $/ if $result;
264 $total += $self->merge( $out );
265 } else {
266 warn "no out from $port in ",dump $result;
267 }
268 }
269
270 warn sprintf "view %d in %.4fs\n", $total, time - $t;
271
272 return $out;
273 }
274
275 sub update_node {
276 my $self = shift;
277 my $updated;
278 foreach my $port ( @_ ) {
279 my $host = $self->{port_on_host}->{$port} || die "no port $port in ",dump $self;
280 next if $host =~ m{(localhost|127\.)};
281 next if $updated->{$host}++;
282 warn "update $host $Sack::VERSION\n";
283 system("find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional") == 0 and $self->restart_nodes( $port );
284 }
285 }
286
287 sub command {
288 my ( $self, $cmd ) = @_;
289
290 duration "repl $cmd";
291
292 my $repl = 1;
293
294 if ( $cmd =~ m{^v} ) {
295 $out = $self->view( $self->{view} );
296 duration 'view';
297 } elsif ( $cmd =~ m{^d} ) {
298 warn dump $out;
299 duration 'dump';
300 } elsif ( $cmd =~ m{^x} ) {
301 $repl = 0;
302 } elsif ( $cmd =~ m{^r} ) {
303 $self->restart_nodes;
304 } elsif ( $cmd =~ m{^i} ) {
305 $self->send_to_all({ info => 1 });
306 my $info = $self->get_from_all;
307 warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
308 foreach my $port ( $self->connected ) {
309 warn "[$port] $self->{port_on_host}->{$port} $pids->{$port} ", dump( $info->{$port} ), "\n";
310 if ( my $version = $info->{version} ) {
311 warn "# $version $Sack::VERSION\n";
312 $self->update_node( $port ) if $version ne $Sack::VERSION;
313 }
314 }
315 } elsif ( $cmd =~ m{^u} ) {
316 $self->update_node( $self->connected );
317 } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
318 $self->send_to_all({ sh => $1 });
319 my $sh = $self->get_from_all;
320 foreach my $port ( $self->connected ) {
321 warn "[$port]# $1\n$sh->{$port}->{sh}";
322 }
323 } else {
324 warn "UNKNOWN $cmd\n" if $cmd;
325 }
326
327 return $repl;
328 }
329
330
331 sub DESTROY {
332 warn "pids ",dump( $pids );
333 foreach ( values %$pids ) {
334 warn "kill $_";
335 kill 1,$_ || kill 9, $_;
336 }
337 }
338
339 1;

  ViewVC Help
Powered by ViewVC 1.1.26