95 |
return; |
return; |
96 |
} |
} |
97 |
|
|
98 |
$info->{shard}->{$_} = 'wait' foreach @shards; |
foreach my $s ( @shards ) { |
99 |
warn "loading shard $shard from ", dump( @shards ); |
next if $info->{shard}->{$s} =~ m{^\d+$}; |
100 |
|
$info->{shard}->{$s} = 'wait'; |
101 |
push @shard_load_queue, @shards; |
push @shard_load_queue, $s; |
102 |
|
warn "queued $s for loading\n"; |
103 |
|
} |
104 |
to_all { load => $shard }; |
to_all { load => $shard }; |
105 |
} |
} |
106 |
|
|
117 |
to_all { code => $code, view => $path }; |
to_all { code => $code, view => $path }; |
118 |
}; |
}; |
119 |
|
|
120 |
|
our @responses; |
121 |
|
|
122 |
while (1) { |
while (1) { |
123 |
for my $sock ($sel->can_read(1)) { |
for my $sock ($sel->can_read(1)) { |
124 |
if ($sock == $lsn) { |
if ($sock == $lsn) { |
221 |
$shard += $affected; |
$shard += $affected; |
222 |
$total += $affected; |
$total += $affected; |
223 |
} |
} |
224 |
my $merge = $info->{merge}->{$view}->{$node}; |
my $merge = $info->{merge}->{$view}->{$node} || '?'; |
225 |
$html .= qq|<tr><td><tt>$node</tt></td><td>$shard</td><td>$h</td><td>$total</td><td>$merge</td></tr>\n|; |
$html .= qq|<tr><td><tt>$node</tt></td><td>$shard</td><td>$h</td><td>$total</td><td>$merge</td></tr>\n|; |
226 |
} |
} |
227 |
|
|
287 |
load_shard $1; |
load_shard $1; |
288 |
} elsif ( $repl =~ m{view\s*(\S+)?} ) { |
} elsif ( $repl =~ m{view\s*(\S+)?} ) { |
289 |
run_view $1; |
run_view $1; |
|
} elsif ( $repl =~ m{debug\s*(.+)?} ) { |
|
|
to_all { debug => $1 }; |
|
290 |
} elsif ( $repl =~ m{out} ) { |
} elsif ( $repl =~ m{out} ) { |
291 |
my $out = Sack::Merge->out; |
my $out = Sack::Merge->out; |
292 |
warn "out ",dump( $out ); |
warn "out ",dump( $out ); |
298 |
to_all { exit => 1 }; |
to_all { exit => 1 }; |
299 |
sleep 1; |
sleep 1; |
300 |
exit; |
exit; |
301 |
|
} elsif ( $repl eq '.' ) { |
302 |
|
$response->{'.'} = [ @responses ]; |
303 |
|
@responses = (); |
304 |
|
} elsif ( $repl =~ m{(\w+)\s*(.+)?} ) { |
305 |
|
to_all { $1 => $2 }; |
306 |
} else { |
} else { |
307 |
$response->{error}->{unknown} = $data; |
$response->{error}->{repl} = $repl; |
308 |
} |
} |
309 |
Storable::store_fd( $response, $sock ); |
Storable::store_fd( $response, $sock ); |
310 |
} elsif ( $data->{ping} ) { |
} elsif ( $data->{ping} ) { |
325 |
my $added = Sack::Merge->add( $data->{out} ) if defined $data->{out}; |
my $added = Sack::Merge->add( $data->{out} ) if defined $data->{out}; |
326 |
$info->{merge}->{ $data->{view} }->{ $data->{port} } = $added; |
$info->{merge}->{ $data->{view} }->{ $data->{port} } = $added; |
327 |
$info->{view }->{ $data->{view} }->{ $data->{port} } = $data->{on_shard}; |
$info->{view }->{ $data->{view} }->{ $data->{port} } = $data->{on_shard}; |
328 |
|
} elsif ( exists $data->{port} ) { |
329 |
|
push @responses, $data; |
330 |
|
warn "# ",dump($data),$/; |
331 |
} else { |
} else { |
332 |
warn "UNKNOWN ",dump($data); |
warn "UNKNOWN ",dump($data); |
333 |
} |
} |