--- trunk/lib/Sack/Server.pm 2009/11/08 14:12:38 192 +++ trunk/lib/Sack/Server.pm 2009/11/26 22:23:52 249 @@ -5,6 +5,8 @@ use warnings; use strict; +my $debug = 0; + use IO::Socket::INET; use IO::Select; @@ -13,25 +15,35 @@ use File::Slurp; use Cwd qw(abs_path); +use lib '/srv/Sack/lib'; +use Sack::Merge; +use Sack::Server::HTTP; +use Sack::Server::HTML; +use Sack::Server::Gnuplot; + my @cloud; -my $cloud_path = $ENV{CLOUD} || die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n"; +my $cloud_path = $ENV{CLOUD} || "etc/cloud"; +die "start with: CLOUD=etc/cloud perl -I/srv/Sack/lib $0\n" unless -e $cloud_path; @cloud = read_file $cloud_path; @cloud = map { chomp $_; $_ } @cloud; warn "# cloud ",dump( @cloud ); my $listen_port = 4444; +my $http_port = 4480; my $node_path = abs_path $0; $node_path =~ s{Server}{Client}; -my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die $!; +my $lsn = IO::Socket::INET->new(Listen => 1, LocalPort => $listen_port, Reuse => 1) or die "$listen_port $!"; +my $www = IO::Socket::INET->new(Listen => 1, LocalPort => $http_port, Reuse => 1) or die "$http_port $!"; my $sel = IO::Select->new($lsn); +$sel->add( $www ); my $info; sub info { my $port = shift; - push @{ $info->{$port} }, [ @_ ]; + push @{ $info->{node}->{$port} }, [ @_ ]; } sub fork_ssh { @@ -65,12 +77,48 @@ sub to_all { my $data = shift; foreach my $port ( keys %{ $session->{port} } ) { - warn ">>>> [$port]\n"; + warn ">>>> [$port]\n" if $debug; Storable::store_fd( $data, $session->{port}->{$port} ); } } -my @shard_paths; +our @shard_load_queue; +sub load_shard { + my $shard = shift @_ || return; + + warn "# load_shard $shard\n"; + + my @shards = glob "$shard/*"; + + if ( ! @shards ) { + warn "no shards for $shard\n"; + return; + } + + foreach my $s ( @shards ) { + next if $info->{shard}->{$s} =~ m{^\d+$}; + $info->{shard}->{$s} = 'wait'; + push @shard_load_queue, $s; + warn "queued $s for loading\n"; + } + to_all { load => $shard }; +} + +sub run_view { + my ( $path ) = @_; + if ( ! -r $path ) { + warn "ERROR view $path: $!"; + return; + } + my $code = read_file $path; + Sack::Merge->clean; + delete( $info->{view} ); + delete( $info->{merge} ); + delete( $info->{shard} ); + to_all { code => $code, view => $path }; +}; + +our @responses; while (1) { for my $sock ($sel->can_read(1)) { @@ -80,7 +128,141 @@ $session->{peerport}->{ $new->peerport } = $new; warn "[socket] connect\n"; Storable::store_fd( { ping => 1 }, $new ); - info 0 => 'ping', $new->peerport; + } elsif ( $sock == $www ) { + my $client = $www->accept; + Sack::Server::HTTP::request( $client, sub { + my ( $send, $method, $param ) = @_; + + if ( $method =~ m{views} ) { + run_view $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/tmp/sack} ) { + load_shard $method; + print $send "HTTP/1.0 302 $method\r\nLocation: /\r\n\r\n"; + return 1; + } elsif ( $method =~ m{^/out/(.+)} ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n"; + Sack::Server::HTML::send_out( $send, Sack::Merge->out, $1, $param ); + return 1; + } elsif ( $method =~ m{^/gnuplot} ) { + eval { + my $path = Sack::Server::Gnuplot::date( Sack::Merge->out, $param ); + if ( -e $path ) { + print $send "HTTP/1.0 200 OK\r\nContent-Type: image/png\r\n\r\n"; + open(my $fh, '<', $path) || die $path; + my $b; + while ( read($fh, $b, 4096) ) { + print $send $b; + } + return 1; + } else { + print $send "HTTP/1.0 404 no graph\r\n\r\n"; + return 1; + } + }; + warn "ERROR: $@" if $@; + } + + print $send "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n"; + + print $send qq| + + + +|; + + print $send qq|

Views

| + ; + + my $out = Sack::Merge->out; + if ( my $li = join("\n", map { qq|
  • $_| } keys %$out ) ) { + print $send qq|

    Results

    \n|; + } + + print $send qq|

    Nodes

    + |, join("\n", map { + my $class = join(' ', map { $_->[0] } @{ $info->{node}->{$_} }); + qq|$_|; + } sort keys %{ $info->{node} } ); + + print $send qq|

    Shards

    |; + + if ( $param->{info} ) { + print $send qq|hide info|; + print $send '
    ', dump($info), '
    ' + } else { + print $send qq|show info|; + } + return 1; + } ); } else { my $data = eval { Storable::fd_retrieve( $sock ) }; if ( $@ ) { @@ -89,7 +271,13 @@ $sel->remove($sock); $sock->close; } else { - warn "<<<< ", dump($data), $/; + warn "<<<< ", dump($data), $/ if $debug; + + if ( my $path = $data->{shard} ) { + $info->{shard}->{ $path } = $data->{port}; + # FIXME will need push for multiple copies of shards + } + if ( my $repl = $data->{repl} ) { my $response = { repl_pid => $$ }; if ( $repl =~ m/ping/ ) { @@ -97,27 +285,52 @@ } elsif ( $repl =~ m/info/ ) { $response->{info} = $info; } elsif ( $repl =~ m{load\s*(\S+)?} ) { - my $name = $1 || 'shard'; - @shard_paths = glob "/tmp/sack/$name/*"; - warn "loading shards ", dump( @shard_paths ); - to_all { load => $name }; + load_shard $1; + } elsif ( $repl =~ m{view\s*(\S+)?} ) { + run_view $1; + } elsif ( $repl =~ m{out} ) { + my $out = Sack::Merge->out; + warn "out ",dump( $out ); + $response->{out} = $out; + } elsif ( $repl =~ m{clean} ) { + delete $info->{shard}; + to_all { clean => 1 }; + } elsif ( $repl eq 'exit' ) { + to_all { exit => 1 }; + sleep 1; + exit; + } elsif ( $repl eq '.' ) { + $response->{'.'} = [ @responses ]; + @responses = (); + } elsif ( $repl =~ m{(\w+)\s*(.+)?} ) { + to_all { $1 => $2 }; } else { - $response->{error}->{unknown} = $data; + $response->{error}->{repl} = $repl; } Storable::store_fd( $response, $sock ); } elsif ( $data->{ping} ) { my $port = $data->{port}; info $port => 'ping', $port; $session->{port}->{ $data->{port} } = $sock; - } elsif ( $data->{load} eq 'shard' ) { - if ( my $path = shift @shard_paths ) { - warn "retrieve $path ", -s $path; + } elsif ( defined $data->{load} && $data->{load} eq 'shard' ) { + if ( my $path = shift @shard_load_queue ) { + $info->{shard}->{$path} = 'read'; my $shard = Storable::retrieve $path; + $info->{shard}->{$path} = 'send'; warn ">>>> [", $data->{port}, "] sending shard $path\n"; Storable::store_fd( { path => $path, shard => $shard }, $sock ); } else { warn "no more shards for [", $data->{port}, "]\n"; } + } elsif ( exists $data->{out} ) { + my $added = Sack::Merge->add( $data->{out} ) if defined $data->{out}; + $info->{merge}->{ $data->{view} }->{ $data->{port} } = $added; + $info->{view }->{ $data->{view} }->{ $data->{port} } = $data->{on_shard}; + # refresh shard allocation + $info->{shard}->{ $_ } = $data->{port} foreach keys %{ $data->{on_shard} }; + } elsif ( exists $data->{port} ) { + push @responses, $data; + warn "# ",dump($data),$/; } else { warn "UNKNOWN ",dump($data); }