/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 19 - (hide annotations)
Tue Sep 22 15:14:00 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 5610 byte(s)
added --debug and move some output to it
1 dpavlin 1 #!/usr/bin/perl
2    
3     use warnings;
4     use strict;
5    
6     use Time::HiRes qw(time);
7     use Data::Dump qw(dump);
8     use File::Slurp;
9 dpavlin 4 use Getopt::Long;
10 dpavlin 11 use IO::Socket::INET;
11 dpavlin 13 use Storable qw/freeze thaw/;
12 dpavlin 1
13 dpavlin 4
14 dpavlin 19 my $debug = 0;
15 dpavlin 4 my $path = '/data/isi/full.txt';
16 dpavlin 11 my $limit = 5000;
17 dpavlin 4 my $offset = 0;
18 dpavlin 6 my @views;
19 dpavlin 11 my $listen;
20     my @nodes;
21 dpavlin 4
22    
23     GetOptions(
24     'path=s' => \$path,
25     'offset=i' => \$offset,
26     'limit=i' => \$limit,
27 dpavlin 6 'view=s' => \@views,
28 dpavlin 11 'listen|port=i' => \$listen,
29     'connect=s' => \@nodes,
30 dpavlin 19 'debug!' => \$debug,
31 dpavlin 5 ) or die $!;
32 dpavlin 4
33 dpavlin 1 my $t = time;
34    
35 dpavlin 12
36     our $prefix;
37     BEGIN {
38     $prefix = $0;
39     if ( $prefix =~ s{^./}{} ) {
40     chomp( my $pwd = `pwd` );
41     $prefix = "$pwd/$prefix";
42     }
43 dpavlin 14 $prefix =~ s{^(.*)/srv/Sack/bin.+$}{$1};
44 dpavlin 12 warn "# prefix $prefix";
45     }
46    
47    
48     use lib "$prefix/srv/webpac2/lib/";
49 dpavlin 1 use WebPAC::Input::ISI;
50     my $input = WebPAC::Input::ISI->new(
51 dpavlin 12 path => "$prefix/$path",
52 dpavlin 4 offset => $offset,
53     limit => $limit,
54 dpavlin 1 );
55    
56    
57     sub report {
58     my $description = shift;
59     my $dt = time - $t;
60     printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
61 dpavlin 10 $t = time;
62 dpavlin 1 }
63    
64    
65     report $input->size . ' records loaded';
66    
67     mkdir 'out' unless -e 'out';
68    
69 dpavlin 5 our $out;
70    
71 dpavlin 8 our $cache;
72    
73 dpavlin 13 our $connected;
74    
75 dpavlin 11 sub send_nodes {
76 dpavlin 17 my $content = $#_ > 0 ? pop @_ : ''; # no content with just one argument!
77 dpavlin 11 my $header = length($content);
78     $header .= ' ' . join(' ', @_) if @_;
79    
80     foreach my $node ( @nodes ) {
81    
82     my $sock = IO::Socket::INET->new(
83     PeerAddr => $node,
84     Proto => 'tcp',
85 dpavlin 16 );
86 dpavlin 11
87 dpavlin 16 if ( ! $sock ) {
88     warn "can't connect to $node - $!"; # FIXME die?
89     next;
90     }
91    
92 dpavlin 11 print ">>>> $node $header\n";
93     print $sock "$header\n$content" || warn "can't send $header to $node: $!";
94    
95 dpavlin 13 $connected->{$node} = $sock;
96 dpavlin 11 }
97     }
98    
99 dpavlin 16 sub get_node {
100     my $node = shift;
101    
102     my $sock = $connected->{$node};
103     if ( ! $sock ) {
104     warn "ERROR: lost connection to $node";
105     delete $connected->{$node};
106     return;
107     }
108     chomp( my $size = <$sock> );
109     warn "<<<< $node $size bytes\n";
110     my $data;
111     read $sock, $data, $size;
112     return $data;
113     }
114    
115     sub send_sock {
116     my ( $sock, $data ) = @_;
117     my $size = length $data;
118     warn ">>>> ", $sock->peerhost, " $size bytes";
119     print $sock "$size\n$data" || warn "can't send $size bytes to ", $sock->peerhost;
120     }
121    
122 dpavlin 13 sub merge_out {
123     my $new = shift;
124    
125     foreach my $k1 ( keys %$new ) {
126    
127     foreach my $k2 ( keys %{ $new->{$k1} } ) {
128    
129     my $n = $new->{$k1}->{$k2};
130     my $ref = ref $out->{$k1}->{$k2};
131    
132     if ( ! defined $out->{$k1}->{$k2} ) {
133     $out->{$k1}->{$k2} = $n;
134     } elsif ( $k1 =~ m{\+} ) {
135 dpavlin 19 # warn "## agregate $k1 $k2";
136 dpavlin 13 $out->{$k1}->{$k2} += $n;
137     } elsif ( $ref eq 'ARRAY' ) {
138     push @{ $out->{$k1}->{$k2} }, $n;
139     } elsif ( $ref eq '' ) {
140     $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
141     } else {
142     die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
143     }
144     }
145     }
146    
147 dpavlin 19 warn "## merge out ", dump $out if $debug;
148 dpavlin 13 }
149    
150 dpavlin 11 sub run_code {
151     my ( $view, $code ) = @_;
152    
153     warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
154    
155     send_nodes view => $view => $code;
156    
157     undef $out;
158    
159     my $affected = 0;
160     $t = time;
161    
162     foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
163     my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
164     if ( ! $rec ) {
165     warn "END at $pos";
166     last;
167     }
168    
169     eval "$code";
170     if ( $@ ) {
171     warn "ERROR [$pos] $@\n";
172     } else {
173     $affected++;
174     }
175     };
176    
177     report "$affected affected records $view";
178    
179     warn "WARN no \$out defined!" unless defined $out;
180 dpavlin 13
181     if ( $connected ) {
182     warn "# get results from ", join(' ', keys %$connected );
183 dpavlin 16 merge_out( thaw( get_node( $_ ) ) ) foreach keys %$connected;
184 dpavlin 13 }
185 dpavlin 11 }
186    
187 dpavlin 1 sub run_views {
188 dpavlin 6 @views = sort glob 'views/*.pl' unless @views;
189 dpavlin 1 warn "# views ", dump @views;
190    
191     foreach my $view ( @views ) {
192    
193     next if system("perl -c $view") != 0;
194    
195     my $code = read_file $view;
196    
197 dpavlin 11 run_code $view => $code;
198 dpavlin 1
199 dpavlin 11 if ( defined $out ) {
200     my $dump = dump $out;
201     my $len = length $dump;
202 dpavlin 1
203 dpavlin 11 my $path = $view;
204     $path =~ s{views?/}{out/} || die "no view in $view";
205     $path =~ s{\.pl}{};
206 dpavlin 1
207 dpavlin 11 print "OUT $view $offset/$limit $len bytes $path"
208 dpavlin 5 , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' )
209 dpavlin 1 , "\n"
210     ;
211 dpavlin 5
212 dpavlin 9 unlink "$path.last" if -e "$path.last";
213     rename $path, "$path.last";
214 dpavlin 1 write_file $path, $dump;
215 dpavlin 11 report "SAVE $path";
216     }
217    
218     }
219    
220     }
221    
222     if ( $listen ) {
223     my $sock = IO::Socket::INET->new(
224     Listen => SOMAXCONN,
225     # LocalAddr => '0.0.0.0',
226     LocalPort => $listen,
227     Proto => 'tcp',
228     Reuse => 1,
229     ) or die $!;
230    
231     while (1) {
232    
233     warn "NODE listen on $listen\n";
234    
235     my $client = $sock->accept();
236    
237 dpavlin 16 warn "<<<< $listen connect from ", $client->peerhost, $/;
238 dpavlin 11
239     my @header = split(/\s/, <$client>);
240     warn "# header ",dump @header;
241    
242     my $size = shift @header;
243    
244     my $content;
245     read $client, $content, $size;
246    
247     if ( $header[0] eq 'view' ) {
248     run_code $header[1] => $content;
249 dpavlin 16 send_sock $client => freeze $out;
250     } elsif ( $header[0] eq 'info' ) {
251     my $info = "$listen\t$offset\t$limit\t$path";
252     warn "info $info\n";
253     send_sock $client => $info;
254     } elsif ( $header[0] eq 'exit' ) {
255     warn "exit $listen";
256     exit;
257 dpavlin 5 } else {
258 dpavlin 16 warn "WARN $listen unknown";
259 dpavlin 1 }
260    
261     }
262     }
263    
264 dpavlin 3 run_views;
265    
266 dpavlin 1 while ( 1 ) {
267    
268     print "sack> ";
269     my $cmd = <STDIN>;
270    
271 dpavlin 11 if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
272 dpavlin 3 system "vi out/*";
273 dpavlin 11 } elsif ( $cmd =~ m{^i(nfo)?}i ) {
274 dpavlin 16 print "# nodes: ", join(' ',@nodes), $/;
275    
276     my @info = (
277     "node\toffset\tlimit\tpath",
278     "----\t------\t-----\t----",
279     "here\t$offset\t$limit\t$path",
280     );
281    
282     send_nodes 'info';
283     push @info, get_node $_ foreach @nodes;
284    
285     print "$_\n" foreach @info;
286    
287     } elsif ( $cmd =~ m{^(q(uit)|e(xit))}i ) {
288     warn "# exit";
289     send_nodes 'exit';
290     exit;
291 dpavlin 3 } else {
292     run_views;
293     }
294    
295 dpavlin 1 }
296    

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26