/[Sack]/trunk/bin/lorry.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/bin/lorry.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 127 - (show annotations)
Wed Oct 7 16:21:33 2009 UTC (14 years, 8 months ago) by dpavlin
File MIME type: text/plain
File size: 2550 byte(s)
better recovery from node storable serialization errors,
faster cloud startup, 
added common pid handling,
added [u]pdate to lorry which push updated code to nodes,
tweaks to output,
version bump for Sack::Node [0.08]

1 #!/usr/bin/perl
2
3 use warnings;
4 use strict;
5
6 use Data::Dump qw(dump);
7 use File::Slurp;
8
9 use lib 'lib';
10 use Sack::From;
11 use Sack::Lorry;
12 use Sack::Color;
13 use Getopt::Long;
14 use Time::HiRes qw(time);
15
16
17 my $view = 'views/00.demo.pl';
18 my $offset = 0;
19 my $limit = 5000;
20 my $cloud_file;
21
22
23 GetOptions(
24 'view=s' => \$view,
25 'limit=i' => \$limit,
26 'cloud=s' => \$cloud_file,
27 ) or die $!;
28
29 my $t = time;
30 sub duration {
31 my $now = time;
32 my $d = $now - $t;
33 warn sprintf "%.4fs %s\n", $d, join(' ',@_);
34 $t = $now;
35 }
36
37
38 my $lorry = Sack::Lorry->new;
39
40 my @cloud = -e $cloud_file ? read_file $cloud_file : ( 'localhost' );
41 @cloud = map { chomp $_; $_; } @cloud;
42 warn "bring up cloud: ",join(' ', @cloud),$/;
43
44 my $from = Sack::From->new(
45 path => '/data/isi/full.txt',
46 limit => $limit * scalar @cloud,
47 );
48
49 duration 'load finished';
50
51 my $info;
52
53 my $port = 4000;
54
55 foreach my $host ( @cloud ) {
56 chomp $host;
57
58 if ( $lorry->start_node_port( $host, $port ) ) {
59
60 warn "started [$port] on $host\n";
61
62 my $data = $from->shard( $limit );
63 $lorry->send_to( $port, { data => $data } ) || die "can't send to $port: $!";
64
65 $info->{$port} = {
66 host => $host,
67 offset => $offset,
68 limit => $limit,
69 };
70
71 $port++;
72 $offset += $limit;
73
74 }
75 }
76
77 duration 'cloud up';
78
79 warn "info ",dump $info;
80
81 warn "load status ", dump( $lorry->get_from_all ),$/;
82
83 our $out;
84 our $repl = 1;
85
86 while ( $repl ) {
87 print "sack> ";
88 my $cmd = <STDIN>;
89 last unless defined $cmd; # CTRL+D
90 duration 'repl wait';
91 chomp($cmd);
92 if ( $cmd =~ m{^v} ) {
93 $out = $lorry->view( $view );
94 duration 'view';
95 } elsif ( $cmd =~ m{^d} ) {
96 warn dump $out;
97 duration 'dump';
98 } elsif ( $cmd =~ m{^x} ) {
99 $repl = 0;
100 } elsif ( $cmd =~ m{^r} ) {
101 $lorry->restart_nodes;
102 } elsif ( $cmd =~ m{^i} ) {
103 $lorry->send_to_all({ info => 1 });
104 my $info = $lorry->get_from_all;
105 foreach my $port ( $lorry->connected ) {
106 warn "[$port] $lorry->{port_on_host}->{$port} ", dump( $info->{$port} ), "\n";
107 }
108 } elsif ( $cmd =~ m{^u} ) {
109 my $updated;
110 foreach my $host ( @cloud ) {
111 next if $updated->{$host}++;
112 warn "update $host\n";
113 system "find /srv/Sack/ | cpio --create | ssh -F etc/lib.ssh $host cpio --extract --make-directories --unconditional";
114 }
115 } elsif ( $cmd =~ m{^sh\s+(.+)} ) {
116 $lorry->send_to_all({ sh => $1 });
117 my $sh = $lorry->get_from_all;
118 foreach my $port ( $lorry->connected ) {
119 warn "[$port]# $1\n$sh->{$port}->{sh}";
120 }
121 } else {
122 warn "UNKNOWN $cmd\n" if $cmd;
123 }
124 }
125
126 warn "exit all nodes\n";
127 $lorry->send_to_all( { exit => 1 } );
128
129 duration 'total usage';
130

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26