/[Sack]/trunk/lib/Sack/Node.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/lib/Sack/Node.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 127 - (hide annotations)
Wed Oct 7 16:21:33 2009 UTC (14 years, 7 months ago) by dpavlin
File size: 2614 byte(s)
better recovery from node storable serialization errors,
faster cloud startup, 
added common pid handling,
added [u]pdate to lorry which push updated code to nodes,
tweaks to output,
version bump for Sack::Node [0.08]

1 dpavlin 92 package Sack::Node;
2    
3     use warnings;
4     use strict;
5    
6     use IO::Socket::INET;
7     use File::Slurp;
8     use Carp qw(confess);
9     use Data::Dump qw(dump);
10     use Storable;
11 dpavlin 93 use Time::HiRes qw(time);
12 dpavlin 92
13 dpavlin 102 use lib 'lib';
14 dpavlin 127 use base qw(Sack::Pid);
15 dpavlin 102 use Sack::Color;
16 dpavlin 92
17 dpavlin 127 our $VERSION = '0.09';
18 dpavlin 116
19 dpavlin 92 sub new {
20     my $class = shift;
21     my $port = shift;
22     my $self = bless { port => $port }, $class;
23    
24 dpavlin 127 $self->port_pid( $port );
25 dpavlin 92
26     my $sock = IO::Socket::INET->new(
27     Listen => SOMAXCONN,
28     LocalAddr => '127.0.0.1',
29     LocalPort => $port,
30     Proto => 'tcp',
31     Reuse => 1,
32     ) or die "[$port] die $!";
33    
34 dpavlin 127 my $client;
35 dpavlin 92
36 dpavlin 127 while ( 1 ) {
37 dpavlin 92
38 dpavlin 127 if ( ! $client ) {
39     warn "[$port] accept $VERSION\n";
40     $client = $sock->accept();
41     warn "[$port] connect from ", $client->peerhost, $/;
42     }
43 dpavlin 92
44 dpavlin 127 my $data = eval { Storable::fd_retrieve( $client ) };
45     if ( $@ ) {
46     warn "[$port] ERROR $@\n";
47     close $client;
48     next;
49     }
50 dpavlin 92
51 dpavlin 127 if ( defined $data->{data} ) {
52     warn "# [$port] <<<< data\n";
53     } else {
54     warn "# [$port] <<<< ", dump( $data ), $/;
55     }
56 dpavlin 92
57     my $result;
58    
59     if ( $data->{view} ) {
60 dpavlin 116 $result = $self->view( $data->{view} );
61 dpavlin 92 } elsif ( $data->{data} ) {
62     $self->{data} = delete $data->{data};
63 dpavlin 119 $result = { data => 'loaded' };
64 dpavlin 92 } elsif ( $data->{exit} ) {
65 dpavlin 127 warn "[$port] exit\n";
66 dpavlin 119 close $sock;
67 dpavlin 92 exit;
68 dpavlin 119 } elsif ( $data->{restart} ) {
69     warn "[$port] restart";
70     close $sock;
71     exec "$0 $port";
72     } elsif ( $data->{info} ) {
73     $result = {
74     version => $VERSION,
75     size => $#{ $self->{data} } + 1,
76 dpavlin 127 reports => $self->{reports},
77 dpavlin 119 };
78     } elsif ( my $sh = delete $data->{sh} ) {
79     $result = { sh => scalar `$sh` };
80 dpavlin 92 } else {
81     warn "[$port] UNKNOWN ", dump( $data ), $/;
82     $result = { 'error' => $data };
83     }
84    
85 dpavlin 127 warn "# [$port] >>>>\n";
86 dpavlin 92 Storable::store_fd( $result => $client );
87     }
88    
89     }
90    
91 dpavlin 93
92 dpavlin 92 our $rec;
93     our $out;
94    
95     sub view {
96     my ( $self, $code ) = @_;
97    
98     undef $out;
99    
100     my $affected = 0;
101 dpavlin 93 my $start_t = time;
102 dpavlin 92
103     my $coderef = eval "sub { $code }";
104     if ( $@ ) {
105     warn "ABORT code: $@";
106     return;
107     }
108    
109    
110     foreach my $pos ( 0 .. $#{ $self->{data} } ) {
111 dpavlin 93 $rec = $self->{data}->[$pos];
112 dpavlin 92 if ( ! $rec ) {
113     print STDERR "END @ $pos";
114     last;
115     }
116    
117     eval { $coderef->() };
118     if ( $@ ) {
119     warn "ABORT $pos $@\n";
120     last;
121     } else {
122     $affected++;
123     }
124    
125     $pos % 10000 == 0 ? print STDERR $pos :
126     $pos % 1000 == 0 ? print STDERR "." : 0 ;
127     };
128    
129 dpavlin 93 my $dt = time - $start_t;
130 dpavlin 116 my $report = [ $self->{port}, $affected, $dt, $affected / $dt ];
131 dpavlin 93 warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report;
132 dpavlin 92
133 dpavlin 127 push @{ $self->{reports} }, "$affected in ${dt}s";
134    
135 dpavlin 99 # warn "# out ", dump( $out );
136 dpavlin 93
137 dpavlin 116 return {
138     out => $out,
139     report => $report,
140     };
141 dpavlin 92 }
142    
143     1;

  ViewVC Help
Powered by ViewVC 1.1.26