/[Sack]/trunk/lib/Sack/Server.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Server.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 175 - (show annotations)
Tue Nov 3 18:02:33 2009 UTC (14 years, 6 months ago) by dpavlin
Original Path: trunk/experiments/protocol-v3/server.pl
File MIME type: text/plain
File size: 2341 byte(s)
added info command

1 #!/usr/bin/perl
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use IO::Select;
8
9 use Data::Dump qw(dump);
10 use Storable qw();
11 use File::Slurp;
12
13 my @cloud = qw(localhost tab.lan llin.lan);
14
15 my $cloud_path = $ENV{CLOUD} || '/srv/Sack/etc/lib';
16 @cloud = read_file $cloud_path;
17 @cloud = map { chomp $_; $_ } @cloud;
18
19 warn "# cloud ",dump( @cloud );
20
21 my $listen_port = 4444;
22
23 my $node_path = $0;
24 $node_path =~ s{server.pl}{client.pl};
25
26 my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!;
27 my $sel = IO::Select->new($lsn);
28
29 my $info;
30 sub info {
31 my $port = shift;
32 push @{ $info->{$port} }, [ @_ ];
33 }
34
35 sub fork_node {
36 my ( $port, $host ) = @_;
37
38 if ( my $pid = fork ) {
39 # parent
40 info $port => 'forked', $pid;
41 return $port;
42
43 } elsif ( ! defined $pid ) {
44 warn "can't fork $host $port";
45 return;
46 } else {
47 # child
48 my $cmd = qq|ssh -F $cloud_path.ssh -R $port:127.0.0.1:$listen_port $host $node_path $port|;
49 warn "# exec: $cmd\n";
50 exec $cmd;
51 }
52 }
53
54 my $node_port = 4000;
55
56 foreach my $host ( @cloud ) {
57 system "echo $node_path | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional --verbose";
58 fork_node( $node_port++, $host );
59 }
60
61 my $session;
62
63 while (1) {
64 for my $sock ($sel->can_read(1)) {
65 if ($sock == $lsn) {
66 my $new = $lsn->accept;
67 $sel->add($new);
68 $session->{peerport}->{ $new->peerport } = $new;
69 warn "[socket] connect\n";
70 Storable::store_fd( { ping => 1 }, $new );
71 info 0 => 'ping', $new->peerport;
72 } else {
73 my $data = eval { Storable::fd_retrieve( $sock ) };
74 if ( $@ ) {
75 delete $session->{$sock};
76 warn "[socket] disconnect: $@\n";
77 $sel->remove($sock);
78 $sock->close;
79 } else {
80 warn "<<<< ", dump($data), $/;
81 if ( $data->{repl} ) {
82 my $response = { repl => $$ };
83 if ( $data->{repl} =~ m/ping/ ) {
84 foreach my $port ( keys %{ $session->{port} } ) {
85 warn ">>>> [$port]\n";
86 Storable::store_fd( { ping => 1 }, $session->{port}->{$port} );
87 }
88 } elsif ( $data->{repl} =~ m/info/ ) {
89 $response->{info} = $info;
90 }
91 Storable::store_fd( $response, $sock );
92 } elsif ( $data->{ping} ) {
93 my $port = $data->{port};
94 info $port => 'ping', $port;
95 $session->{port}->{ $data->{port} } = $sock;
96 }
97 }
98 }
99 }
100 }

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26