106 |
push @shard_load_queue, $s; |
push @shard_load_queue, $s; |
107 |
warn "queued $s for loading\n"; |
warn "queued $s for loading\n"; |
108 |
} |
} |
109 |
|
|
110 |
|
# XXX depriciated but not removed yet |
111 |
|
if ( 0 ) { |
112 |
to_all { load => $shard }; |
to_all { load => $shard }; |
113 |
$info->{pending}->{$_} = 'load' foreach all_ports; |
$info->{pending}->{$_} = 'load' foreach all_ports; |
114 |
|
return; |
115 |
|
} |
116 |
|
|
117 |
|
my @nodes = all_ports; |
118 |
|
my $chunk = int( ( $#shard_load_queue + 1 ) / $#nodes ); |
119 |
|
|
120 |
|
my $pid_port; |
121 |
|
|
122 |
|
foreach my $port ( all_ports ) { |
123 |
|
$info->{pending}->{$port} = 'load'; |
124 |
|
my @shards = splice @shard_load_queue, 0, $chunk; |
125 |
|
|
126 |
|
my $sock = $session->{port}->{$port}; |
127 |
|
|
128 |
|
if ( my $pid = fork ) { |
129 |
|
$pid_port->{$pid} = $port; |
130 |
|
# parent |
131 |
|
} elsif ( ! defined $pid ) { |
132 |
|
die "can't fork $!"; |
133 |
|
} else { |
134 |
|
# child |
135 |
|
|
136 |
|
warn ">>>> [$port] bulk_load ", $#shards + 1, " shards\n"; |
137 |
|
Storable::store_fd( { bulk_load => [ @shards ] }, $sock ); |
138 |
|
foreach my $s ( @shards ) { |
139 |
|
warn ">>>> [$port] bulk_load $s\n"; |
140 |
|
Storable::store_fd( Storable::retrieve( $s ), $sock ); |
141 |
|
} |
142 |
|
warn ">>>> [$port] bulk_load finished\n"; |
143 |
|
exit; |
144 |
|
} |
145 |
|
|
146 |
|
} |
147 |
|
|
148 |
|
foreach my $child ( keys %$pid_port ) { |
149 |
|
warn "waitpid $child\n"; |
150 |
|
waitpid($child, 0); |
151 |
|
delete( $info->{pending}->{ $pid_port->{$child} } ); |
152 |
|
} |
153 |
|
|
154 |
|
$info->{pending}->{$_} = 'view' foreach all_ports; |
155 |
|
to_all { code => "# collect back loaded shards\n", view => 'noop' }; |
156 |
} |
} |
157 |
|
|
158 |
sub run_view { |
sub run_view { |