/[Sack]/trunk/bin/sack.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/bin/sack.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 11 - (show annotations)
Mon Sep 21 23:21:34 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 3294 byte(s)
implement --listen port and --connect host:port to create master-slave cluster
(err, cloud) of nodes and implement send_nodes to send view to each node

added info to see connected views to commands and allow abbrivitation to single character

1 #!/usr/bin/perl
2
3 use warnings;
4 use strict;
5
6 use Time::HiRes qw(time);
7 use Data::Dump qw(dump);
8 use File::Slurp;
9 use Getopt::Long;
10 use IO::Socket::INET;
11
12
13 my $path = '/data/isi/full.txt';
14 my $limit = 5000;
15 my $offset = 0;
16 my @views;
17 my $listen;
18 my @nodes;
19
20
21 GetOptions(
22 'path=s' => \$path,
23 'offset=i' => \$offset,
24 'limit=i' => \$limit,
25 'view=s' => \@views,
26 'listen|port=i' => \$listen,
27 'connect=s' => \@nodes,
28 ) or die $!;
29
30 my $t = time;
31
32 use lib '/srv/webpac2/lib/';
33 use WebPAC::Input::ISI;
34 my $input = WebPAC::Input::ISI->new(
35 path => $path,
36 offset => $offset,
37 limit => $limit,
38 );
39
40
41 sub report {
42 my $description = shift;
43 my $dt = time - $t;
44 printf "%s in %1.4fs %.2f/s\n", $description, $dt, $input->size / $dt;
45 $t = time;
46 }
47
48
49 report $input->size . ' records loaded';
50
51 mkdir 'out' unless -e 'out';
52
53 our $out;
54
55 our $cache;
56
57 sub send_nodes {
58 my $content = pop @_;
59 my $header = length($content);
60 $header .= ' ' . join(' ', @_) if @_;
61
62 foreach my $node ( @nodes ) {
63
64 my $sock = IO::Socket::INET->new(
65 PeerAddr => $node,
66 Proto => 'tcp',
67 ) or die "can't connect to $node - $!";
68
69 print ">>>> $node $header\n";
70
71 print $sock "$header\n$content" || warn "can't send $header to $node: $!";
72
73 }
74 }
75
76 sub run_code {
77 my ( $view, $code ) = @_;
78
79 warn "\n#### CODE $view START ####\n$code\n#### CODE $view END ####\n";
80
81 send_nodes view => $view => $code;
82
83 undef $out;
84
85 my $affected = 0;
86 $t = time;
87
88 foreach my $pos ( $offset + 1 .. $offset + $input->size ) {
89 my $rec = $cache->{$pos} ||= $input->fetch_rec( $pos );
90 if ( ! $rec ) {
91 warn "END at $pos";
92 last;
93 }
94
95 eval "$code";
96 if ( $@ ) {
97 warn "ERROR [$pos] $@\n";
98 } else {
99 $affected++;
100 }
101 };
102
103 report "$affected affected records $view";
104
105 warn "WARN no \$out defined!" unless defined $out;
106 }
107
108 sub run_views {
109 @views = sort glob 'views/*.pl' unless @views;
110 warn "# views ", dump @views;
111
112 foreach my $view ( @views ) {
113
114 next if system("perl -c $view") != 0;
115
116 my $code = read_file $view;
117
118 run_code $view => $code;
119
120 if ( defined $out ) {
121 my $dump = dump $out;
122 my $len = length $dump;
123
124 my $path = $view;
125 $path =~ s{views?/}{out/} || die "no view in $view";
126 $path =~ s{\.pl}{};
127
128 print "OUT $view $offset/$limit $len bytes $path"
129 , ( $len < 10000 ? " \$out = $dump" : ' SAVED ONLY' )
130 , "\n"
131 ;
132
133 unlink "$path.last" if -e "$path.last";
134 rename $path, "$path.last";
135 write_file $path, $dump;
136 report "SAVE $path";
137 }
138
139 }
140
141 }
142
143 if ( $listen ) {
144 my $sock = IO::Socket::INET->new(
145 Listen => SOMAXCONN,
146 # LocalAddr => '0.0.0.0',
147 LocalPort => $listen,
148 Proto => 'tcp',
149 Reuse => 1,
150 ) or die $!;
151
152 while (1) {
153
154 warn "NODE listen on $listen\n";
155
156 my $client = $sock->accept();
157
158 warn "<<<< connect from ", $client->peerhost, $/;
159
160 my @header = split(/\s/, <$client>);
161 warn "# header ",dump @header;
162
163 my $size = shift @header;
164
165 my $content;
166 read $client, $content, $size;
167
168 if ( $header[0] eq 'view' ) {
169 run_code $header[1] => $content;
170 } else {
171 warn "WARN unknown";
172 }
173
174 }
175 }
176
177 run_views;
178
179 while ( 1 ) {
180
181 print "sack> ";
182 my $cmd = <STDIN>;
183
184 if ( $cmd =~ m{^(vi?|\\e|o(?:ut)?)}i ) {
185 system "vi out/*";
186 } elsif ( $cmd =~ m{^i(nfo)?}i ) {
187 print "nodes: ", dump @nodes, $/;
188 } else {
189 run_views;
190 }
191
192 }
193

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26