/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 20 - (hide annotations)
Tue Sep 22 16:02:49 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 5619 byte(s)
abort on first error
1 dpavlin 1 #!/usr/bin/perl
2    
3     use warnings;
4     use strict;
5    
6     use Time::HiRes qw(time);
7     use Data::Dump qw(dump);
8     use File::Slurp;
9 dpavlin 4 use Getopt::Long;
10 dpavlin 11 use IO::Socket::INET;
11 dpavlin 13 use Storable qw/freeze thaw/;
12 dpavlin 1
13 dpavlin 4
14 dpavlin 19 my $debug = 0;
15 dpavlin 4 my $path = '/data/isi/full.txt';
16 dpavlin 11 my $limit = 5000;
17 dpavlin 4 my $offset = 0;
18 dpavlin 6 my @views;
19 dpavlin 11 my $listen;
20     my @nodes;
21 dpavlin 4
22    
23     GetOptions(
24     'path=s' => \$path,
25     'offset=i' => \$offset,
26     'limit=i' => \$limit,
27 dpavlin 6 'view=s' => \@views,
28 dpavlin 11 'listen|port=i' => \$listen,
29     'connect=s' => \@nodes,
30 dpavlin 19 'debug!' => \$debug,
31 dpavlin 5 ) or die $!;
32 dpavlin 4
33 dpavlin 1 my $t = time;
34    
35 dpavlin 12
36     our $prefix;
37     BEGIN {
38     $prefix = $0;
39     if ( $prefix =~ s{^./}{} ) {
40     chomp( my $pwd = `pwd` );
41     $prefix = "$pwd/$prefix";
42     }
43 dpavlin 14 $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
44 dpavlin 12 warn "# prefix $prefix";
45     }
46    
47    
48     use lib "$prefix/srv/webpac2/lib/";
49 dpavlin 1 use WebPAC::Input::ISI;
50     my $input = WebPAC::Input::ISI->new(
51 dpavlin 12 path => "$prefix/$path",
52 dpavlin 4 offset => $offset,
53     limit => $limit,
54 dpavlin 1 );
55    
56    
57     sub report {
58     my $description = shift;
59     my $dt = time - $t;
60     printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
61 dpavlin 10 $t = time;
62 dpavlin 1 }
63    
64    
65     report $input->size . ' records loaded';
66    
67     mkdir 'out' unless -e 'out';
68    
69 dpavlin 5 our $out;
70    
71 dpavlin 8 our $cache;
72    
73 dpavlin 13 our $connected;
74    
75 dpavlin 11 sub send_nodes {
76 dpavlin 17 my $content = $#_ > 0 ? pop @_ : ''; # no content with just one argument!
77 dpavlin 11 my $header = length($content);
78     $header .= ' ' . join(' ', @_) if @_;
79    
80     foreach my $node ( @nodes ) {
81    
82     my $sock = IO::Socket::INET->new(
83     PeerAddr => $node,
84     Proto => 'tcp',
85 dpavlin 16 );
86 dpavlin 11
87 dpavlin 16 if ( ! $sock ) {
88     warn "can't connect to $node - $!"; # FIXME die?
89     next;
90     }
91    
92 dpavlin 11 print ">>>> $node $header\n";
93     print $sock "$header\n$content" || warn "can't send $header to $node: $!";
94    
95 dpavlin 13 $connected->{$node} = $sock;
96 dpavlin 11 }
97     }
98    
99 dpavlin 16 sub get_node {
100     my $node = shift;
101    
102     my $sock = $connected->{$node};
103     if ( ! $sock ) {
104     warn "ERROR: lost connection to $node";
105     delete $connected->{$node};
106     return;
107     }
108     chomp( my $size = <$sock> );
109     warn "<<<< $node $size bytes\n";
110     my $data;
111     read $sock, $data, $size;
112     return $data;
113     }
114    
115     sub send_sock {
116     my ( $sock, $data ) = @_;
117     my $size = length $data;
118     warn ">>>> ", $sock->peerhost, " $size bytes";
119     print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
120     }
121    
122 dpavlin 13 sub merge_out {
123     my $new = shift;
124    
125     foreach my $k1 ( keys %$new ) {
126    
127     foreach my $k2 ( keys %{ $new->{$k1} } ) {
128    
129     my $n = $new->{$k1}->{$k2};
130     my $ref = ref $out->{$k1}->{$k2};
131    
132     if ( ! defined $out->{$k1}->{$k2} ) {
133     $out->{$k1}->{$k2} = $n;
134     } elsif ( $k1 =~ m{\+} ) {
135 dpavlin 19 # warn "## agregate $k1 $k2";
136 dpavlin 13 $out->{$k1}->{$k2} += $n;
137     } elsif ( $ref eq 'ARRAY' ) {
138     push @{ $out->{$k1}->{$k2} }, $n;
139     } elsif ( $ref eq '' ) {
140     $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141     } else {
142     die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143     }
144     }
145     }
146    
147 dpavlin 19 warn "## merge out ", dump $out if $debug;
148 dpavlin 13 }
149    
150 dpavlin 11 sub run_code {
151     my ( $view, $code ) = @_;
152    
153     warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
154    
155     send_nodes view => $view => $code;
156    
157     undef $out;
158    
159     my $affected = 0;
160     $t = time;
161    
162     foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
163     my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
164     if ( ! $rec ) {
165     warn "END at $pos";
166     last;
167     }
168    
169     eval "$code";
170     if ( $@ ) {
171 dpavlin 20 warn "ABORT [$pos] $@\n";
172     last;
173 dpavlin 11 } else {
174     $affected++;
175     }
176     };
177    
178     report "$affected affected records $view";
179    
180     warn "WARN no \$out defined!" unless defined $out;
181 dpavlin 13
182     if ( $connected ) {
183     warn "# get results from ", join(' ', keys %$connected );
184 dpavlin 16 merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected;
185 dpavlin 13 }
186 dpavlin 11 }
187    
188 dpavlin 1 sub run_views {
189 dpavlin 6 @views = sort glob 'views/*.pl' unless @views;
190 dpavlin 1 warn "# views ", dump @views;
191    
192     foreach my $view ( @views ) {
193    
194     next if system("perl -c $view") != 0;
195    
196     my $code = read_file $view;
197    
198 dpavlin 11 run_code $view => $code;
199 dpavlin 1
200 dpavlin 11 if ( defined $out ) {
201     my $dump = dump $out;
202     my $len = length $dump;
203 dpavlin 1
204 dpavlin 11 my $path = $view;
205     $path =~ s{views?/}{out/} || die "no view in $view";
206     $path =~ s{\.pl}{};
207 dpavlin 1
208 dpavlin 11 print "OUT $view $offset/$limit $len bytes $path"
209 dpavlin 5 , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' )
210 dpavlin 1 , "\n"
211     ;
212 dpavlin 5
213 dpavlin 9 unlink "$path.last" if -e "$path.last";
214     rename $path, "$path.last";
215 dpavlin 1 write_file $path, $dump;
216 dpavlin 11 report "SAVE $path";
217     }
218    
219     }
220    
221     }
222    
223     if ( $listen ) {
224     my $sock = IO::Socket::INET->new(
225     Listen => SOMAXCONN,
226     # LocalAddr => '0.0.0.0',
227     LocalPort => $listen,
228     Proto => 'tcp',
229     Reuse => 1,
230     ) or die $!;
231    
232     while (1) {
233    
234     warn "NODE listen on $listen\n";
235    
236     my $client = $sock->accept();
237    
238 dpavlin 16 warn "<<<< $listen connect from ", $client->peerhost, $/;
239 dpavlin 11
240     my @header = split(/\s/, <$client>);
241     warn "# header ",dump @header;
242    
243     my $size = shift @header;
244    
245     my $content;
246     read $client, $content, $size;
247    
248     if ( $header[0] eq 'view' ) {
249     run_code $header[1] => $content;
250 dpavlin 16 send_sock $client => freeze $out;
251     } elsif ( $header[0] eq 'info' ) {
252     my $info = "$listen\t$offset\t$limit\t$path";
253     warn "info $info\n";
254     send_sock $client => $info;
255     } elsif ( $header[0] eq 'exit' ) {
256     warn "exit $listen";
257     exit;
258 dpavlin 5 } else {
259 dpavlin 16 warn "WARN $listen unknown";
260 dpavlin 1 }
261    
262     }
263     }
264    
265 dpavlin 3 run_views;
266    
267 dpavlin 1 while ( 1 ) {
268    
269     print "sack> ";
270     my $cmd = <STDIN>;
271    
272 dpavlin 11 if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
273 dpavlin 3 system "vi out/*";
274 dpavlin 11 } elsif ( $cmd =~ m{^i(nfo)?}i ) {
275 dpavlin 16 print "# nodes: ", join(' ',@nodes), $/;
276    
277     my @info = (
278     "node\toffset\tlimit\tpath",
279     "----\t------\t-----\t----",
280     "here\t$offset\t$limit\t$path",
281     );
282    
283     send_nodes 'info';
284     push @info, get_node $_ foreach @nodes;
285    
286     print "$_\n" foreach @info;
287    
288     } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) {
289     warn "# exit";
290     send_nodes 'exit';
291     exit;
292 dpavlin 3 } else {
293     run_views;
294     }
295    
296 dpavlin 1 }
297    

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26