116 |
|
|
117 |
my $sock = node_sock($node) || next; |
my $sock = node_sock($node) || next; |
118 |
|
|
119 |
warn "[$port] >>>> $node $header\n"; |
warn "[$port] >>>> [$node] $header\n"; |
120 |
print $sock "$header\n$content" || warn "can't send $header to $node: $!"; |
print $sock "$header\n$content" || warn "can't send $header to $node: $!"; |
121 |
|
|
122 |
$connected->{$node} = $sock; |
$connected->{$node} = $sock; |
133 |
return; |
return; |
134 |
} |
} |
135 |
chomp( my $size = <$sock> ); |
chomp( my $size = <$sock> ); |
136 |
warn "[$port] <<<< $node $size bytes\n" if $debug || $size > 1024; |
warn "[$port] <<<< [$node] $size bytes\n" if $debug || $size > 1024; |
137 |
my $data; |
my $data; |
138 |
read $sock, $data, $size; |
read $sock, $data, $size; |
139 |
return $data; |
return $data; |
173 |
sub merge_out { |
sub merge_out { |
174 |
my ( $from_node, $new ) = @_; |
my ( $from_node, $new ) = @_; |
175 |
|
|
176 |
|
my $t_merge = time(); |
177 |
|
|
178 |
pull_node_file $from_node => 'nr_md5'; |
pull_node_file $from_node => 'nr_md5'; |
179 |
pull_node_file $from_node => 'md5'; |
pull_node_file $from_node => 'md5'; |
180 |
|
|
182 |
my ( $local, $remote ) = ( 0, 0 ); |
my ( $local, $remote ) = ( 0, 0 ); |
183 |
|
|
184 |
my $tick = 0; |
my $tick = 0; |
185 |
print STDERR "[$port] merge $from_node"; |
print STDERR "[$port] merge [$from_node]"; |
186 |
|
|
187 |
my $missing; |
my $missing; |
188 |
|
|
198 |
my $md5 = $remote_digest->{nr_md5}->[$k2]; |
my $md5 = $remote_digest->{nr_md5}->[$k2]; |
199 |
|
|
200 |
if ( ! $md5 ) { |
if ( ! $md5 ) { |
201 |
$missing->{$from_node}++; # FIXME die? |
$missing->{nr_md5}->{$from_node}++; # FIXME die? |
202 |
next; |
next; |
203 |
} |
} |
204 |
|
|
205 |
if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) { |
if ( my $local_k2 = $digest->{md5_nr}->{$md5} ) { |
206 |
$k2 = $local_k2; |
$k2 = $local_k2; |
207 |
$local++; |
$local++; |
208 |
} else { |
} elsif ( my $full = $remote_digest->{md5}->{$md5} ) { |
209 |
$k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); |
$k2 = $digest->to_int( $remote_digest->{md5}->{$md5} ); |
210 |
$remote++; |
$remote++; |
211 |
|
} else { |
212 |
|
$missing->{md5}->{$from_node}++; |
213 |
} |
} |
214 |
} |
} |
215 |
|
|
240 |
} |
} |
241 |
} |
} |
242 |
|
|
243 |
warn "$tick $local/$remote\n"; |
$t_merge = time - $t_merge; |
244 |
|
printf STDERR "%d in %.4fs %.2f/s local %.1f%% %d/%d\n", $tick, $t_merge, $t_merge / $tick, $local * 100 / $tick, $local, $remote; |
245 |
|
|
246 |
warn "[$port] missing ", dump $missing if $missing; |
warn "[$port] missing ", dump $missing if $missing; |
247 |
|
|
294 |
$o = thaw $o; |
$o = thaw $o; |
295 |
warn "[$port] merge $node $s bytes\n"; |
warn "[$port] merge $node $s bytes\n"; |
296 |
merge_out $node => $o; |
merge_out $node => $o; |
|
report "[$port] merged $node"; |
|
297 |
} |
} |
298 |
} |
} |
299 |
} |
} |
389 |
while ( read $fh, $buff, $block ) { |
while ( read $fh, $buff, $block ) { |
390 |
print $client $buff; |
print $client $buff; |
391 |
} |
} |
392 |
|
$digest->open; |
393 |
} else { |
} else { |
394 |
warn "[$port] UNKNOWN $header[0]"; |
warn "[$port] UNKNOWN $header[0]"; |
395 |
} |
} |