/[Sack]/trunk/lib/Sack/Server.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /trunk/lib/Sack/Server.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 175 - (hide annotations)
Tue Nov 3 18:02:33 2009 UTC (14 years, 7 months ago) by dpavlin
Original Path: trunk/experiments/protocol-v3/server.pl
File MIME type: text/plain
File size: 2341 byte(s)
added info command

1 dpavlin 163 #!/usr/bin/perl
2    
3     use warnings;
4     use strict;
5    
6     use IO::Socket::INET;
7     use IO::Select;
8    
9     use Data::Dump qw(dump);
10     use Storable qw();
11 dpavlin 169 use File::Slurp;
12 dpavlin 163
13 dpavlin 169 my @cloud = qw(localhost tab.lan llin.lan);
14    
15 dpavlin 170 my $cloud_path = $ENV{CLOUD} || '/srv/Sack/etc/lib';
16 dpavlin 169 @cloud = read_file $cloud_path;
17     @cloud = map { chomp $_; $_ } @cloud;
18    
19     warn "# cloud ",dump( @cloud );
20    
21 dpavlin 163 my $listen_port = 4444;
22    
23 dpavlin 173 my $node_path = $0;
24     $node_path =~ s{server.pl}{client.pl};
25 dpavlin 163
26 dpavlin 169 my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!;
27 dpavlin 163 my $sel = IO::Select->new($lsn);
28    
29 dpavlin 166 my $info;
30     sub info {
31     my $port = shift;
32     push @{ $info->{$port} }, [ @_ ];
33     }
34    
35 dpavlin 163 sub fork_node {
36     my ( $port, $host ) = @_;
37    
38     if ( my $pid = fork ) {
39     # parent
40 dpavlin 166 info $port => 'forked', $pid;
41 dpavlin 163 return $port;
42    
43     } elsif ( ! defined $pid ) {
44     warn "can't fork $host $port";
45     return;
46     } else {
47     # child
48 dpavlin 169 my $cmd = qq|ssh -F $cloud_path.ssh -R $port:127.0.0.1:$listen_port $host $node_path $port|;
49 dpavlin 163 warn "# exec: $cmd\n";
50     exec $cmd;
51     }
52     }
53    
54     my $node_port = 4000;
55    
56 dpavlin 169 foreach my $host ( @cloud ) {
57 dpavlin 173 system "echo $node_path | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional --verbose";
58 dpavlin 163 fork_node( $node_port++, $host );
59     }
60    
61     my $session;
62    
63     while (1) {
64     for my $sock ($sel->can_read(1)) {
65     if ($sock == $lsn) {
66     my $new = $lsn->accept;
67     $sel->add($new);
68 dpavlin 174 $session->{peerport}->{ $new->peerport } = $new;
69 dpavlin 164 warn "[socket] connect\n";
70 dpavlin 163 Storable::store_fd( { ping => 1 }, $new );
71 dpavlin 166 info 0 => 'ping', $new->peerport;
72 dpavlin 163 } else {
73 dpavlin 165 my $data = eval { Storable::fd_retrieve( $sock ) };
74     if ( $@ ) {
75     delete $session->{$sock};
76     warn "[socket] disconnect: $@\n";
77     $sel->remove($sock);
78     $sock->close;
79     } else {
80 dpavlin 163 warn "<<<< ", dump($data), $/;
81 dpavlin 164 if ( $data->{repl} ) {
82 dpavlin 175 my $response = { repl => $$ };
83 dpavlin 174 if ( $data->{repl} =~ m/ping/ ) {
84     foreach my $port ( keys %{ $session->{port} } ) {
85     warn ">>>> [$port]\n";
86     Storable::store_fd( { ping => 1 }, $session->{port}->{$port} );
87     }
88 dpavlin 175 } elsif ( $data->{repl} =~ m/info/ ) {
89     $response->{info} = $info;
90 dpavlin 174 }
91 dpavlin 166 Storable::store_fd( $response, $sock );
92     } elsif ( $data->{ping} ) {
93 dpavlin 174 my $port = $data->{port};
94     info $port => 'ping', $port;
95     $session->{port}->{ $data->{port} } = $sock;
96 dpavlin 164 }
97 dpavlin 163 }
98     }
99     }
100     }

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26