/[Sack]/trunk/lib/Sack/Server.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/lib/Sack/Server.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 178 - (show annotations)
Sun Nov 8 12:45:43 2009 UTC (14 years, 6 months ago) by dpavlin
File size: 2362 byte(s)
begin integration of protocol v3

1 #!/usr/bin/perl
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use IO::Select;
8
9 use Data::Dump qw(dump);
10 use Storable qw();
11 use File::Slurp;
12 use Cwd qw(abs_path);
13
14 my @cloud = qw(localhost tab.lan llin.lan);
15
16 my $cloud_path = $ENV{CLOUD} || '/srv/Sack/etc/lib';
17 @cloud = read_file $cloud_path;
18 @cloud = map { chomp $_; $_ } @cloud;
19
20 warn "# cloud ",dump( @cloud );
21
22 my $listen_port = 4444;
23
24 my $node_path = abs_path $0;
25 $node_path =~ s{server.pl}{client.pl};
26
27 my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!;
28 my $sel = IO::Select->new($lsn);
29
30 my $info;
31 sub info {
32 my $port = shift;
33 push @{ $info->{$port} }, [ @_ ];
34 }
35
36 sub fork_node {
37 my ( $port, $host ) = @_;
38
39 if ( my $pid = fork ) {
40 # parent
41 info $port => 'forked', $pid;
42 return $port;
43
44 } elsif ( ! defined $pid ) {
45 warn "can't fork $host $port";
46 return;
47 } else {
48 # child
49 my $cmd = qq|ssh -F $cloud_path.ssh -R $port:127.0.0.1:$listen_port $host $node_path $port|;
50 warn "# exec: $cmd\n";
51 exec $cmd;
52 }
53 }
54
55 my $node_port = 4000;
56
57 foreach my $host ( @cloud ) {
58 system "find /srv/Sack/ | cpio --create --dereference | ssh -T -F $cloud_path.ssh $host cpio --extract --make-directories --unconditional";
59 fork_node( $node_port++, $host );
60 }
61
62 my $session;
63
64 while (1) {
65 for my $sock ($sel->can_read(1)) {
66 if ($sock == $lsn) {
67 my $new = $lsn->accept;
68 $sel->add($new);
69 $session->{peerport}->{ $new->peerport } = $new;
70 warn "[socket] connect\n";
71 Storable::store_fd( { ping => 1 }, $new );
72 info 0 => 'ping', $new->peerport;
73 } else {
74 my $data = eval { Storable::fd_retrieve( $sock ) };
75 if ( $@ ) {
76 delete $session->{$sock};
77 warn "[socket] disconnect: $@\n";
78 $sel->remove($sock);
79 $sock->close;
80 } else {
81 warn "<<<< ", dump($data), $/;
82 if ( $data->{repl} ) {
83 my $response = { repl => $$ };
84 if ( $data->{repl} =~ m/ping/ ) {
85 foreach my $port ( keys %{ $session->{port} } ) {
86 warn ">>>> [$port]\n";
87 Storable::store_fd( { ping => 1 }, $session->{port}->{$port} );
88 }
89 } elsif ( $data->{repl} =~ m/info/ ) {
90 $response->{info} = $info;
91 }
92 Storable::store_fd( $response, $sock );
93 } elsif ( $data->{ping} ) {
94 my $port = $data->{port};
95 info $port => 'ping', $port;
96 $session->{port}->{ $data->{port} } = $sock;
97 }
98 }
99 }
100 }
101 }

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26