8 |
use File::Slurp; |
use File::Slurp; |
9 |
use Getopt::Long; |
use Getopt::Long; |
10 |
use IO::Socket::INET; |
use IO::Socket::INET; |
11 |
|
use Storable qw/freeze thaw/; |
12 |
|
|
13 |
|
|
14 |
my $path = '/data/isi/full.txt'; |
my $path = '/data/isi/full.txt'; |
68 |
|
|
69 |
our $cache; |
our $cache; |
70 |
|
|
71 |
|
our $connected; |
72 |
|
|
73 |
sub send_nodes { |
sub send_nodes { |
74 |
my $content = pop @_; |
my $content = pop @_; |
75 |
my $header = length($content); |
my $header = length($content); |
86 |
|
|
87 |
print $sock "$header\n$content" || warn "can't send $header to $node: $!"; |
print $sock "$header\n$content" || warn "can't send $header to $node: $!"; |
88 |
|
|
89 |
|
$connected->{$node} = $sock; |
90 |
|
} |
91 |
|
} |
92 |
|
|
93 |
|
sub merge_out { |
94 |
|
my $new = shift; |
95 |
|
|
96 |
|
warn "## merge $new\n"; |
97 |
|
|
98 |
|
foreach my $k1 ( keys %$new ) { |
99 |
|
|
100 |
|
foreach my $k2 ( keys %{ $new->{$k1} } ) { |
101 |
|
|
102 |
|
my $n = $new->{$k1}->{$k2}; |
103 |
|
my $ref = ref $out->{$k1}->{$k2}; |
104 |
|
|
105 |
|
if ( ! defined $out->{$k1}->{$k2} ) { |
106 |
|
$out->{$k1}->{$k2} = $n; |
107 |
|
} elsif ( $k1 =~ m{\+} ) { |
108 |
|
warn "# agregate $k1 $k2"; |
109 |
|
$out->{$k1}->{$k2} += $n; |
110 |
|
} elsif ( $ref eq 'ARRAY' ) { |
111 |
|
push @{ $out->{$k1}->{$k2} }, $n; |
112 |
|
} elsif ( $ref eq '' ) { |
113 |
|
$out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ]; |
114 |
|
} else { |
115 |
|
die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2}); |
116 |
|
} |
117 |
|
} |
118 |
} |
} |
119 |
|
|
120 |
|
warn "## merge out ", dump $out; |
121 |
} |
} |
122 |
|
|
123 |
sub run_code { |
sub run_code { |
150 |
report "$affected affected records $view"; |
report "$affected affected records $view"; |
151 |
|
|
152 |
warn "WARN no \$out defined!" unless defined $out; |
warn "WARN no \$out defined!" unless defined $out; |
153 |
|
|
154 |
|
if ( $connected ) { |
155 |
|
warn "# get results from ", join(' ', keys %$connected ); |
156 |
|
|
157 |
|
foreach my $node ( keys %$connected ) { |
158 |
|
my $sock = $connected->{$node}; |
159 |
|
my $size = <$sock>; |
160 |
|
warn "<<<< $node $size bytes\n"; |
161 |
|
my $part; |
162 |
|
read $sock, $part, $size; |
163 |
|
merge_out( thaw $part ); |
164 |
|
} |
165 |
|
} |
166 |
} |
} |
167 |
|
|
168 |
sub run_views { |
sub run_views { |
227 |
|
|
228 |
if ( $header[0] eq 'view' ) { |
if ( $header[0] eq 'view' ) { |
229 |
run_code $header[1] => $content; |
run_code $header[1] => $content; |
230 |
|
|
231 |
|
my $frozen = freeze $out; |
232 |
|
my $size = length $frozen; |
233 |
|
warn ">>>> $size bytes"; |
234 |
|
print $client "$size\n$frozen"; |
235 |
|
|
236 |
} else { |
} else { |
237 |
warn "WARN unknown"; |
warn "WARN unknown"; |
238 |
} |
} |