/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 144 - (hide annotations)
Wed Oct 7 20:55:14 2009 UTC (14 years, 7 months ago) by dpavlin
File size: 6759 byte(s)
split update_node out
1 dpavlin 92 package Sack::Lorry;
2    
3     use warnings;
4     use strict;
5    
6     use IO::Socket::INET;
7     use Data::Dump qw(dump);
8     use Storable;
9 dpavlin 97 use File::Slurp;
10 dpavlin 107 use Net::Ping;
11 dpavlin 127 use Time::HiRes qw(time sleep);
12 dpavlin 92
13 dpavlin 127 use lib 'lib';
14     use base qw(Sack::Pid);
15 dpavlin 137 use Sack;
16 dpavlin 127
17 dpavlin 92 our $pids;
18    
19     $SIG{CHLD} = 'IGNORE';
20    
21     sub new {
22     my $class = shift;
23     my $self = bless {@_}, $class;
24 dpavlin 119 $self->{sock} = {};
25 dpavlin 143 warn __PACKAGE__, " $Sack::VERSION\n";
26 dpavlin 92 return $self;
27     }
28    
29 dpavlin 119 sub connected {
30     sort keys %{ $_[0]->{sock} }
31     }
32    
33     sub connect_to {
34 dpavlin 138 my ( $self, $port, $retries ) = @_;
35 dpavlin 119
36 dpavlin 138 $retries ||= 30;
37     warn "# connect_to [$port] $retries times";
38 dpavlin 119
39 dpavlin 138 my $sock = $self->{sock}->{$port};
40 dpavlin 119
41 dpavlin 127 while ( ! $sock && $retries-- ) {
42    
43 dpavlin 119 $sock = IO::Socket::INET->new(
44     PeerAddr => '127.0.0.1',
45     PeerPort => $port,
46     Proto => 'tcp',
47     );
48    
49     if ( ! $sock ) {
50     print STDERR ".";
51 dpavlin 127 sleep 0.5;
52 dpavlin 138 } elsif ( $sock->connected ) {
53     $self->{sock}->{$port} = $sock;
54     warn "# connected to $port\n";
55    
56     $self->send_to( $port, { info => 1 } );
57     warn "info ", dump( $self->get_from( $port ) ), $/;
58    
59 dpavlin 127 } else {
60 dpavlin 138 close $sock;
61 dpavlin 119 }
62    
63     }
64    
65 dpavlin 127 if ( ! $retries ) {
66     warn "SKIP $port: $!";
67     return;
68 dpavlin 138 } else {
69     return $port;
70 dpavlin 127 }
71 dpavlin 119 }
72    
73    
74 dpavlin 127 sub start_node_port {
75     my ( $self, $host, $port ) = @_;
76 dpavlin 92
77 dpavlin 107 chomp $host;
78 dpavlin 96
79 dpavlin 107 my $p = Net::Ping->new;
80    
81     if ( ! $p->ping( $host ) ) {
82     warn "can't ping [$host]\n";
83     return;
84     }
85    
86 dpavlin 138 if ( $self->connect_to( $port, 1 ) ) {
87     warn "re-using existing $port";
88     }
89    
90 dpavlin 107 my $ssh_config = '-F etc/lib.ssh';
91    
92 dpavlin 127 my $pid_path = "/tmp/sack.$port.pid";
93     kill 9, read_file $pid_path if -e $pid_path;
94    
95 dpavlin 92 if ( my $pid = fork ) {
96     # parent
97    
98 dpavlin 127 $self->connect_to( $port ) || return;
99 dpavlin 92
100 dpavlin 127 $pids->{ $port } = $pid;
101 dpavlin 119 $self->{port_on_host}->{$port} = $host;
102 dpavlin 92
103 dpavlin 127 warn "start_node_port $host [$port] pid $pid\n";
104    
105 dpavlin 138 return $port;
106 dpavlin 92
107     } elsif ( ! defined $pid ) {
108     warn "can't fork $host $port";
109     return;
110     } else {
111     # child
112 dpavlin 107
113 dpavlin 92 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
114 dpavlin 107 ssh -f $ssh_config
115 dpavlin 92 -S /tmp/sock.$port.ssh
116     -L $port:127.0.0.1:$port
117     $host
118     | : '';
119    
120     $cmd .= qq|
121 dpavlin 96 /srv/Sack/bin/node.pl $port
122 dpavlin 92 |;
123    
124     $cmd =~ s{\s+}{ }sg;
125    
126 dpavlin 127 $self->port_pid( $port, $$ );
127    
128     warn "# exec: $cmd\n";
129 dpavlin 92 exec $cmd;
130     }
131     }
132    
133     sub send_to {
134     my ( $self, $port, $data ) = @_;
135 dpavlin 127 # warn "send_to [$port]\n";
136 dpavlin 92 Storable::store_fd( $data => $self->{sock}->{$port} );
137     }
138    
139     sub get_from {
140     my ( $self, $port ) = @_;
141 dpavlin 127 # warn "get_from [$port]\n";
142     my $data;
143     eval {
144     $data = Storable::fd_retrieve( $self->{sock}->{$port} );
145     };
146     warn "ERROR $@" if $@;
147     return $data;
148 dpavlin 92 }
149    
150 dpavlin 93 sub send_to_all {
151     my ( $self, $data ) = @_;
152 dpavlin 119 $self->send_to( $_, $data ) foreach $self->connected;
153 dpavlin 93 }
154    
155     sub get_from_all {
156     my ( $self ) = @_;
157     my $result;
158 dpavlin 119 $result->{$_} = $self->get_from( $_ ) foreach $self->connected;
159 dpavlin 93 return $result;
160     }
161    
162 dpavlin 119 sub restart_nodes {
163     my ( $self ) = @_;
164     foreach my $port ( $self->connected ) {
165     warn "restart [$port]\n";
166 dpavlin 127 # $self->send_to( $port, { restart => 1 } );
167     # $self->connect_to( $port );
168     $self->send_to( $port, { exit => 1 } );
169     kill 9, $pids->{$port};
170     $self->start_node_port( $self->{port_on_host}->{$port}, $port );
171 dpavlin 119 }
172     }
173 dpavlin 97
174 dpavlin 119
175 dpavlin 97 our $out;
176    
177 dpavlin 111 use Digest::MD5 qw(md5);
178     our $nr = 0;
179     our $md5_nr;
180     our $digest_fh;
181     our @digest_offset;
182    
183 dpavlin 97 sub merge {
184     my ( $self, $new ) = @_;
185    
186     my $t_merge = time();
187    
188     my $tick = 0;
189    
190     my $missing;
191    
192     foreach my $k1 ( keys %$new ) {
193    
194     foreach my $k2 ( keys %{ $new->{$k1} } ) {
195    
196     my $n = delete $new->{$k1}->{$k2};
197    
198 dpavlin 111 if ( $k1 =~ m{#} ) {
199     my $md5 = md5 $k2;
200     if ( defined $md5_nr->{$md5} ) {
201     $k2 = $md5_nr->{$md5};
202     } else {
203     open( $digest_fh, '>', '/tmp/sack.digest' ) unless $digest_fh;
204     $digest_offset[ $nr ] = tell( $digest_fh );
205     print $digest_fh "$k2\n";
206    
207     $k2 = $md5_nr->{$md5} = $nr;
208     $nr++;
209     }
210     }
211    
212 dpavlin 97 my $ref = ref $out->{$k1}->{$k2};
213    
214     if ( ! defined $out->{$k1}->{$k2} ) {
215     $out->{$k1}->{$k2} = $n;
216     } elsif ( $k1 =~ m{\+} ) {
217     # warn "## agregate $k1 $k2";
218     $out->{$k1}->{$k2} += $n;
219     } elsif ( $ref eq 'ARRAY' ) {
220     if ( ref $n eq 'ARRAY' ) {
221     push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
222     } else {
223     push @{ $out->{$k1}->{$k2} }, $n;
224     }
225     } elsif ( $ref eq '' ) {
226     $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
227     } else {
228     die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
229     }
230    
231     if ( $tick++ % 1000 == 0 ) {
232     print STDERR ".";
233     } elsif ( $tick % 10000 == 0 ) {
234     print STDERR $tick;
235     }
236     }
237     }
238    
239     $t_merge = time - $t_merge;
240 dpavlin 117 warn sprintf "\nmerged %d in %.4fs\n", $tick, $t_merge;
241 dpavlin 97
242 dpavlin 117 return $tick;
243 dpavlin 97 }
244    
245    
246     sub view {
247     my ( $self, $view ) = @_;
248    
249 dpavlin 117 my $t = time;
250 dpavlin 98 $out = {};
251    
252 dpavlin 97 warn "run view $view ", -s $view, " bytes\n";
253    
254     my $view_code = read_file($view);
255     $self->send_to_all({ view => $view_code });
256    
257 dpavlin 117 my $total;
258    
259 dpavlin 119 foreach my $port ( $self->connected ) {
260 dpavlin 97 my $result = $self->get_from( $port );
261 dpavlin 98 warn "# result ", dump $result if $self->{debug};
262 dpavlin 117 if ( my $out = delete $result->{out} ) {
263 dpavlin 127 warn "[$port] result ", dump($result), $/ if $result;
264 dpavlin 117 $total += $self->merge( $out );
265 dpavlin 97 } else {
266 dpavlin 117 warn "no out from $port in ",dump $result;
267 dpavlin 97 }
268     }
269    
270 dpavlin 117 warn sprintf "view %d in %.4fs\n", $total, time - $t;
271    
272 dpavlin 97 return $out;
273     }
274    
275 dpavlin 144 sub update_node {
276     my $self = shift;
277     my $updated;
278     foreach my $port ( @_ ) {
279     my $host = $self->{port_on_host}->{$port} || die "no port $port in ",dump $self;
280     next if $host =~ m{(localhost|127\.)};
281     next if $updated->{$host}++;
282     warn "update $host $Sack::VERSION\n";
283     system("find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional") == 0 and $self->restart_nodes( $port );
284     }
285     }
286    
287 dpavlin 137 sub command {
288     my ( $self, $cmd ) = @_;
289 dpavlin 97
290 dpavlin 137 duration "repl $cmd";
291    
292     my $repl = 1;
293    
294     if ( $cmd =~ m{^v} ) {
295     $out = $self->view( $self->{view} );
296     duration 'view';
297     } elsif ( $cmd =~ m{^d} ) {
298     warn dump $out;
299     duration 'dump';
300     } elsif ( $cmd =~ m{^x} ) {
301     $repl = 0;
302     } elsif ( $cmd =~ m{^r} ) {
303     $self->restart_nodes;
304     } elsif ( $cmd =~ m{^i} ) {
305     $self->send_to_all({ info => 1 });
306     my $info = $self->get_from_all;
307 dpavlin 144 warn "INFO view $self->{view} ", -s $self->{view}, " bytes\n";
308 dpavlin 137 foreach my $port ( $self->connected ) {
309 dpavlin 144 warn "[$port] $self->{port_on_host}->{$port} $pids->{$port} ", dump( $info->{$port} ), "\n";
310     if ( my $version = $info->{version} ) {
311     warn "# $version $Sack::VERSION\n";
312     $self->update_node( $port ) if $version ne $Sack::VERSION;
313     }
314 dpavlin 137 }
315     } elsif ( $cmd =~ m{^u} ) {
316 dpavlin 144 $self->update_node( $self->connected );
317 dpavlin 137 } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
318     $self->send_to_all({ sh => $1 });
319     my $sh = $self->get_from_all;
320     foreach my $port ( $self->connected ) {
321     warn "[$port]# $1\n$sh->{$port}->{sh}";
322     }
323     } else {
324     warn "UNKNOWN $cmd\n" if $cmd;
325     }
326    
327 dpavlin 139 return $repl;
328 dpavlin 137 }
329    
330    
331 dpavlin 92 sub DESTROY {
332     warn "pids ",dump( $pids );
333     foreach ( values %$pids ) {
334     warn "kill $_";
335     kill 1,$_ || kill 9, $_;
336     }
337     }
338    
339     1;

  ViewVC Help
Powered by ViewVC 1.1.26