/[Sack]/trunk/lib/Sack/Node.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/lib/Sack/Node.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 93 - (hide annotations)
Sat Oct 3 21:43:34 2009 UTC (14 years, 8 months ago) by dpavlin
File size: 2048 byte(s)
run simple view
1 dpavlin 92 package Sack::Node;
2    
3     use warnings;
4     use strict;
5    
6     use IO::Socket::INET;
7     use File::Slurp;
8     use Carp qw(confess);
9     use Data::Dump qw(dump);
10     use Storable;
11 dpavlin 93 use Time::HiRes qw(time);
12 dpavlin 92
13    
14     sub new {
15     my $class = shift;
16     my $port = shift;
17     my $self = bless { port => $port }, $class;
18    
19     my $pid_path = "/tmp/sack.$port.pid";
20     if ( -e $pid_path ) {
21     my $pid = read_file $pid_path;
22     kill 9, $pid && warn "[$port] kill old $pid\n";
23     }
24     write_file $pid_path, $$;
25    
26     my $sock = IO::Socket::INET->new(
27     Listen => SOMAXCONN,
28     LocalAddr => '127.0.0.1',
29     LocalPort => $port,
30     Proto => 'tcp',
31     Reuse => 1,
32     ) or die "[$port] die $!";
33    
34     warn "[$port] accept\n";
35    
36     my $client = $sock->accept();
37    
38     warn "[$port] connect from ", $client->peerhost, $/;
39    
40     while ( 1 ) {
41    
42     my $data = Storable::fd_retrieve( $client );
43 dpavlin 93 warn "[$port] <<<< ", dump( $data );
44 dpavlin 92
45     my $result;
46    
47     if ( $data->{view} ) {
48     $result = { view => $self->view( $data->{view} ) };
49     } elsif ( $data->{data} ) {
50     $self->{data} = delete $data->{data};
51     $result = { data => 'loaded' };
52     } elsif ( $data->{exit} ) {
53     warn "[$port] exit";
54     exit;
55     } else {
56     warn "[$port] UNKNOWN ", dump( $data ), $/;
57     $result = { 'error' => $data };
58     }
59    
60     warn "[$port] >>>>\n";
61     Storable::store_fd( $result => $client );
62     }
63    
64     }
65    
66 dpavlin 93
67 dpavlin 92 our $rec;
68     our $out;
69    
70     sub view {
71     my ( $self, $code ) = @_;
72    
73     undef $out;
74    
75     my $affected = 0;
76 dpavlin 93 my $start_t = time;
77 dpavlin 92
78     my $coderef = eval "sub { $code }";
79     if ( $@ ) {
80     warn "ABORT code: $@";
81     return;
82     }
83    
84    
85     foreach my $pos ( 0 .. $#{ $self->{data} } ) {
86 dpavlin 93 $rec = $self->{data}->[$pos];
87 dpavlin 92 if ( ! $rec ) {
88     print STDERR "END @ $pos";
89     last;
90     }
91    
92     eval { $coderef->() };
93     if ( $@ ) {
94     warn "ABORT $pos $@\n";
95     last;
96     } else {
97     $affected++;
98     }
99    
100     $pos % 10000 == 0 ? print STDERR $pos :
101     $pos % 1000 == 0 ? print STDERR "." : 0 ;
102     };
103    
104 dpavlin 93 my $dt = time - $start_t;
105     my $report = [ $self->{port}, $affected, $dt, scalar $self->{data} / $dt ];
106     warn sprintf "[%d] %d affected in %1.4fs %.2f/s\n", @$report;
107 dpavlin 92
108 dpavlin 93 warn "out ", dump( $out );
109    
110     return $out;
111 dpavlin 92 }
112    
113     1;

  ViewVC Help
Powered by ViewVC 1.1.26