/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 13 - (hide annotations)
Tue Sep 22 10:32:59 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 4685 byte(s)
ship results from node using Storable, merge results
(with support of + in key name to make sum)

1 dpavlin 1 #!/usr/bin/perl
2    
3     use warnings;
4     use strict;
5    
6     use Time::HiRes qw(time);
7     use Data::Dump qw(dump);
8     use File::Slurp;
9 dpavlin 4 use Getopt::Long;
10 dpavlin 11 use IO::Socket::INET;
11 dpavlin 13 use Storable qw/freeze thaw/;
12 dpavlin 1
13 dpavlin 4
14     my $path = '/data/isi/full.txt';
15 dpavlin 11 my $limit = 5000;
16 dpavlin 4 my $offset = 0;
17 dpavlin 6 my @views;
18 dpavlin 11 my $listen;
19     my @nodes;
20 dpavlin 4
21    
22     GetOptions(
23     'path=s' => \$path,
24     'offset=i' => \$offset,
25     'limit=i' => \$limit,
26 dpavlin 6 'view=s' => \@views,
27 dpavlin 11 'listen|port=i' => \$listen,
28     'connect=s' => \@nodes,
29 dpavlin 5 ) or die $!;
30 dpavlin 4
31 dpavlin 1 my $t = time;
32    
33 dpavlin 12
34     our $prefix;
35     BEGIN {
36     $prefix = $0;
37     if ( $prefix =~ s{^./}{} ) {
38     chomp( my $pwd = `pwd` );
39     $prefix = "$pwd/$prefix";
40     }
41     $prefix =~ s{^(.+)/srv/Sack/bin.+$}{$1};
42     warn "# prefix $prefix";
43     }
44    
45    
46     use lib "$prefix/srv/webpac2/lib/";
47 dpavlin 1 use WebPAC::Input::ISI;
48     my $input = WebPAC::Input::ISI->new(
49 dpavlin 12 path => "$prefix/$path",
50 dpavlin 4 offset => $offset,
51     limit => $limit,
52 dpavlin 1 );
53    
54    
55     sub report {
56     my $description = shift;
57     my $dt = time - $t;
58     printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
59 dpavlin 10 $t = time;
60 dpavlin 1 }
61    
62    
63     report $input->size . ' records loaded';
64    
65     mkdir 'out' unless -e 'out';
66    
67 dpavlin 5 our $out;
68    
69 dpavlin 8 our $cache;
70    
71 dpavlin 13 our $connected;
72    
73 dpavlin 11 sub send_nodes {
74     my $content = pop @_;
75     my $header = length($content);
76     $header .= ' ' . join(' ', @_) if @_;
77    
78     foreach my $node ( @nodes ) {
79    
80     my $sock = IO::Socket::INET->new(
81     PeerAddr => $node,
82     Proto => 'tcp',
83     ) or die "can't connect to $node - $!";
84    
85     print ">>>> $node $header\n";
86    
87     print $sock "$header\n$content" || warn "can't send $header to $node: $!";
88    
89 dpavlin 13 $connected->{$node} = $sock;
90 dpavlin 11 }
91     }
92    
93 dpavlin 13 sub merge_out {
94     my $new = shift;
95    
96     warn "## merge $new\n";
97    
98     foreach my $k1 ( keys %$new ) {
99    
100     foreach my $k2 ( keys %{ $new->{$k1} } ) {
101    
102     my $n = $new->{$k1}->{$k2};
103     my $ref = ref $out->{$k1}->{$k2};
104    
105     if ( ! defined $out->{$k1}->{$k2} ) {
106     $out->{$k1}->{$k2} = $n;
107     } elsif ( $k1 =~ m{\+} ) {
108     warn "# agregate $k1 $k2";
109     $out->{$k1}->{$k2} += $n;
110     } elsif ( $ref eq 'ARRAY' ) {
111     push @{ $out->{$k1}->{$k2} }, $n;
112     } elsif ( $ref eq '' ) {
113     $out->{$k1}->{$k2} = [ $out->{$k1}->{$k2}, $n ];
114     } else {
115     die "can't merge $k2 [$ref] from ",dump($n), " into ", dump($out->{$k1}->{$k2});
116     }
117     }
118     }
119    
120     warn "## merge out ", dump $out;
121     }
122    
123 dpavlin 11 sub run_code {
124     my ( $view, $code ) = @_;
125    
126     warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
127    
128     send_nodes view => $view => $code;
129    
130     undef $out;
131    
132     my $affected = 0;
133     $t = time;
134    
135     foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
136     my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
137     if ( ! $rec ) {
138     warn "END at $pos";
139     last;
140     }
141    
142     eval "$code";
143     if ( $@ ) {
144     warn "ERROR [$pos] $@\n";
145     } else {
146     $affected++;
147     }
148     };
149    
150     report "$affected affected records $view";
151    
152     warn "WARN no \$out defined!" unless defined $out;
153 dpavlin 13
154     if ( $connected ) {
155     warn "# get results from ", join(' ', keys %$connected );
156    
157     foreach my $node ( keys %$connected ) {
158     my $sock = $connected->{$node};
159     my $size = <$sock>;
160     warn "<<<< $node $size bytes\n";
161     my $part;
162     read $sock, $part, $size;
163     merge_out( thaw $part );
164     }
165     }
166 dpavlin 11 }
167    
168 dpavlin 1 sub run_views {
169 dpavlin 6 @views = sort glob 'views/*.pl' unless @views;
170 dpavlin 1 warn "# views ", dump @views;
171    
172     foreach my $view ( @views ) {
173    
174     next if system("perl -c $view") != 0;
175    
176     my $code = read_file $view;
177    
178 dpavlin 11 run_code $view => $code;
179 dpavlin 1
180 dpavlin 11 if ( defined $out ) {
181     my $dump = dump $out;
182     my $len = length $dump;
183 dpavlin 1
184 dpavlin 11 my $path = $view;
185     $path =~ s{views?/}{out/} || die "no view in $view";
186     $path =~ s{\.pl}{};
187 dpavlin 1
188 dpavlin 11 print "OUT $view $offset/$limit $len bytes $path"
189 dpavlin 5 , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' )
190 dpavlin 1 , "\n"
191     ;
192 dpavlin 5
193 dpavlin 9 unlink "$path.last" if -e "$path.last";
194     rename $path, "$path.last";
195 dpavlin 1 write_file $path, $dump;
196 dpavlin 11 report "SAVE $path";
197     }
198    
199     }
200    
201     }
202    
203     if ( $listen ) {
204     my $sock = IO::Socket::INET->new(
205     Listen => SOMAXCONN,
206     # LocalAddr => '0.0.0.0',
207     LocalPort => $listen,
208     Proto => 'tcp',
209     Reuse => 1,
210     ) or die $!;
211    
212     while (1) {
213    
214     warn "NODE listen on $listen\n";
215    
216     my $client = $sock->accept();
217    
218     warn "<<<< connect from ", $client->peerhost, $/;
219    
220     my @header = split(/\s/, <$client>);
221     warn "# header ",dump @header;
222    
223     my $size = shift @header;
224    
225     my $content;
226     read $client, $content, $size;
227    
228     if ( $header[0] eq 'view' ) {
229     run_code $header[1] => $content;
230 dpavlin 13
231     my $frozen = freeze $out;
232     my $size = length $frozen;
233     warn ">>>> $size bytes";
234     print $client "$size\n$frozen";
235    
236 dpavlin 5 } else {
237 dpavlin 11 warn "WARN unknown";
238 dpavlin 1 }
239    
240     }
241     }
242    
243 dpavlin 3 run_views;
244    
245 dpavlin 1 while ( 1 ) {
246    
247     print "sack> ";
248     my $cmd = <STDIN>;
249    
250 dpavlin 11 if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
251 dpavlin 3 system "vi out/*";
252 dpavlin 11 } elsif ( $cmd =~ m{^i(nfo)?}i ) {
253     print "nodes: ", dump @nodes, $/;
254 dpavlin 3 } else {
255     run_views;
256     }
257    
258 dpavlin 1 }
259    

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26