/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 11 - (hide annotations)
Mon Sep 21 23:21:34 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 3294 byte(s)
implement --listen port and --connect host:port to create master-slave cluster
(err, cloud) of nodes and implement send_nodes to send view to each node

added info to see connected views to commands and allow abbrivitation to single character

1 dpavlin 1 #!/usr/bin/perl
2    
3     use warnings;
4     use strict;
5    
6     use Time::HiRes qw(time);
7     use Data::Dump qw(dump);
8     use File::Slurp;
9 dpavlin 4 use Getopt::Long;
10 dpavlin 11 use IO::Socket::INET;
11 dpavlin 1
12 dpavlin 4
13     my $path = '/data/isi/full.txt';
14 dpavlin 11 my $limit = 5000;
15 dpavlin 4 my $offset = 0;
16 dpavlin 6 my @views;
17 dpavlin 11 my $listen;
18     my @nodes;
19 dpavlin 4
20    
21     GetOptions(
22     'path=s' => \$path,
23     'offset=i' => \$offset,
24     'limit=i' => \$limit,
25 dpavlin 6 'view=s' => \@views,
26 dpavlin 11 'listen|port=i' => \$listen,
27     'connect=s' => \@nodes,
28 dpavlin 5 ) or die $!;
29 dpavlin 4
30 dpavlin 1 my $t = time;
31    
32     use lib '/srv/webpac2/lib/';
33     use WebPAC::Input::ISI;
34     my $input = WebPAC::Input::ISI->new(
35 dpavlin 4 path => $path,
36     offset => $offset,
37     limit => $limit,
38 dpavlin 1 );
39    
40    
41     sub report {
42     my $description = shift;
43     my $dt = time - $t;
44     printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
45 dpavlin 10 $t = time;
46 dpavlin 1 }
47    
48    
49     report $input->size . ' records loaded';
50    
51     mkdir 'out' unless -e 'out';
52    
53 dpavlin 5 our $out;
54    
55 dpavlin 8 our $cache;
56    
57 dpavlin 11 sub send_nodes {
58     my $content = pop @_;
59     my $header = length($content);
60     $header .= ' ' . join(' ', @_) if @_;
61    
62     foreach my $node ( @nodes ) {
63    
64     my $sock = IO::Socket::INET->new(
65     PeerAddr => $node,
66     Proto => 'tcp',
67     ) or die "can't connect to $node - $!";
68    
69     print ">>>> $node $header\n";
70    
71     print $sock "$header\n$content" || warn "can't send $header to $node: $!";
72    
73     }
74     }
75    
76     sub run_code {
77     my ( $view, $code ) = @_;
78    
79     warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
80    
81     send_nodes view => $view => $code;
82    
83     undef $out;
84    
85     my $affected = 0;
86     $t = time;
87    
88     foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
89     my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
90     if ( ! $rec ) {
91     warn "END at $pos";
92     last;
93     }
94    
95     eval "$code";
96     if ( $@ ) {
97     warn "ERROR [$pos] $@\n";
98     } else {
99     $affected++;
100     }
101     };
102    
103     report "$affected affected records $view";
104    
105     warn "WARN no \$out defined!" unless defined $out;
106     }
107    
108 dpavlin 1 sub run_views {
109 dpavlin 6 @views = sort glob 'views/*.pl' unless @views;
110 dpavlin 1 warn "# views ", dump @views;
111    
112     foreach my $view ( @views ) {
113    
114     next if system("perl -c $view") != 0;
115    
116     my $code = read_file $view;
117    
118 dpavlin 11 run_code $view => $code;
119 dpavlin 1
120 dpavlin 11 if ( defined $out ) {
121     my $dump = dump $out;
122     my $len = length $dump;
123 dpavlin 1
124 dpavlin 11 my $path = $view;
125     $path =~ s{views?/}{out/} || die "no view in $view";
126     $path =~ s{\.pl}{};
127 dpavlin 1
128 dpavlin 11 print "OUT $view $offset/$limit $len bytes $path"
129 dpavlin 5 , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' )
130 dpavlin 1 , "\n"
131     ;
132 dpavlin 5
133 dpavlin 9 unlink "$path.last" if -e "$path.last";
134     rename $path, "$path.last";
135 dpavlin 1 write_file $path, $dump;
136 dpavlin 11 report "SAVE $path";
137     }
138    
139     }
140    
141     }
142    
143     if ( $listen ) {
144     my $sock = IO::Socket::INET->new(
145     Listen => SOMAXCONN,
146     # LocalAddr => '0.0.0.0',
147     LocalPort => $listen,
148     Proto => 'tcp',
149     Reuse => 1,
150     ) or die $!;
151    
152     while (1) {
153    
154     warn "NODE listen on $listen\n";
155    
156     my $client = $sock->accept();
157    
158     warn "<<<< connect from ", $client->peerhost, $/;
159    
160     my @header = split(/\s/, <$client>);
161     warn "# header ",dump @header;
162    
163     my $size = shift @header;
164    
165     my $content;
166     read $client, $content, $size;
167    
168     if ( $header[0] eq 'view' ) {
169     run_code $header[1] => $content;
170 dpavlin 5 } else {
171 dpavlin 11 warn "WARN unknown";
172 dpavlin 1 }
173    
174     }
175     }
176    
177 dpavlin 3 run_views;
178    
179 dpavlin 1 while ( 1 ) {
180    
181     print "sack> ";
182     my $cmd = <STDIN>;
183    
184 dpavlin 11 if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
185 dpavlin 3 system "vi out/*";
186 dpavlin 11 } elsif ( $cmd =~ m{^i(nfo)?}i ) {
187     print "nodes: ", dump @nodes, $/;
188 dpavlin 3 } else {
189     run_views;
190     }
191    
192 dpavlin 1 }
193    

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26