/[Sack]/trunk/lib/Sack/Lorry.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Lorry.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 97 - (show annotations)
Sun Oct 4 13:09:28 2009 UTC (14 years, 8 months ago) by dpavlin
File size: 3319 byte(s)
re-implement merge of results
1 package Sack::Lorry;
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use Data::Dump qw(dump);
8 use Storable;
9 use File::Slurp;
10
11 our $pids;
12 our $ports;
13
14 $SIG{CHLD} = 'IGNORE';
15
16 my $port = 4000;
17
18 sub new {
19 my $class = shift;
20 my $self = bless {@_}, $class;
21 return $self;
22 }
23
24 sub start_node {
25 my ( $self, $host ) = @_;
26
27 system "rsync -rav /srv/Sack/ $host:/srv/Sack/";
28
29 if ( my $pid = fork ) {
30 # parent
31 $pids->{ "$host:$port" } = $pid;
32 $ports->{ $port } = $host;
33
34 my $sock;
35
36 print STDERR "waiting for $port";
37
38 while ( ! $sock ) {
39
40 $sock = IO::Socket::INET->new(
41 PeerAddr => '127.0.0.1',
42 PeerPort => $port,
43 Proto => 'tcp',
44 );
45
46 if ( ! $sock ) {
47 print STDERR ".";
48 sleep 1;
49 }
50
51 }
52
53 $self->{sock}->{$port} = $sock;
54
55 warn "\nconnected to $port\n";
56
57 $self->{connected}->{$port} = $host;
58
59 return $port++;
60
61 } elsif ( ! defined $pid ) {
62 warn "can't fork $host $port";
63 return;
64 } else {
65 # child
66 my $cmd = $host !~ m{^(localhost|127\.)}i ? qq|
67 ssh
68 -S /tmp/sock.$port.ssh
69 -L $port:127.0.0.1:$port
70 $host
71 | : '';
72
73 $cmd .= qq|
74 /srv/Sack/bin/node.pl $port
75 |;
76
77 $cmd =~ s{\s+}{ }sg;
78
79 warn "exec: $cmd\n";
80 exec $cmd;
81 }
82 }
83
84 sub send_to {
85 my ( $self, $port, $data ) = @_;
86 warn "send_to [$port]\n";
87 Storable::store_fd( $data => $self->{sock}->{$port} );
88 }
89
90 sub get_from {
91 my ( $self, $port ) = @_;
92 warn "get_from [$port]\n";
93 Storable::fd_retrieve( $self->{sock}->{$port} );
94 }
95
96 sub send_to_all {
97 my ( $self, $data ) = @_;
98 $self->send_to( $_, $data ) foreach keys %{ $self->{connected} };
99 }
100
101 sub get_from_all {
102 my ( $self ) = @_;
103 my $result;
104 $result->{$_} = $self->get_from( $_ ) foreach keys %{ $self->{connected} };
105 return $result;
106 }
107
108
109 our $out;
110
111 sub merge {
112 my ( $self, $new ) = @_;
113
114 my $t_merge = time();
115
116 my $tick = 0;
117
118 my $missing;
119
120 foreach my $k1 ( keys %$new ) {
121
122 foreach my $k2 ( keys %{ $new->{$k1} } ) {
123
124 my $n = delete $new->{$k1}->{$k2};
125
126 my $ref = ref $out->{$k1}->{$k2};
127
128 if ( ! defined $out->{$k1}->{$k2} ) {
129 $out->{$k1}->{$k2} = $n;
130 } elsif ( $k1 =~ m{\+} ) {
131 # warn "## agregate $k1 $k2";
132 $out->{$k1}->{$k2} += $n;
133 } elsif ( $ref eq 'ARRAY' ) {
134 if ( ref $n eq 'ARRAY' ) {
135 push @{ $out->{$k1}->{$k2} }, $_ foreach @$n;
136 } else {
137 push @{ $out->{$k1}->{$k2} }, $n;
138 }
139 } elsif ( $ref eq '' ) {
140 $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141 } else {
142 die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143 }
144
145 if ( $tick++ % 1000 == 0 ) {
146 print STDERR ".";
147 } elsif ( $tick % 10000 == 0 ) {
148 print STDERR $tick;
149 }
150 }
151 }
152
153 $t_merge = time - $t_merge;
154 warn sprintf "\nmerged %d in %.4fs %.2f/s\n", $tick, $t_merge, $t_merge / $tick ;
155
156 }
157
158
159 sub view {
160 my ( $self, $view ) = @_;
161
162 warn "run view $view ", -s $view, " bytes\n";
163
164 my $view_code = read_file($view);
165 $self->send_to_all({ view => $view_code });
166
167 foreach my $port ( keys %{ $self->{connected} } ) {
168 warn "get_from $port\n";
169 my $result = $self->get_from( $port );
170 warn dump $result;
171 if ( $result->{view} ) {
172 $self->merge( $result->{view} );
173 } else {
174 warn "no view from $port\n";
175 }
176 }
177
178 return $out;
179 }
180
181
182 sub DESTROY {
183 warn "pids ",dump( $pids );
184 foreach ( values %$pids ) {
185 warn "kill $_";
186 kill 1,$_ || kill 9, $_;
187 }
188 }
189
190 1;

  ViewVC Help
Powered by ViewVC 1.1.26