/[Sack]/trunk/bin/couchdb2shards.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/bin/couchdb2shards.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 243 by dpavlin, Thu Nov 26 18:14:43 2009 UTC revision 256 by dpavlin, Tue Jan 26 16:55:10 2010 UTC
# Line 2  Line 2 
2    
3  # http://wiki.apache.org/couchdb/HTTP_Bulk_Document_API  # http://wiki.apache.org/couchdb/HTTP_Bulk_Document_API
4    
5  use warnings;  use lib 'lib';
6  use strict;  use Sack::Shard;
7    
8  use IO::Socket::INET;  use IO::Socket::INET;
 use Storable qw();  
9  use JSON;  use JSON;
10  use Data::Dump qw(dump);  use Data::Dump qw(dump);
11  use Time::HiRes qw(time);  use autodie;
 use File::Path qw(make_path remove_tree);  
12    
13  my $name = 'pxelator';  my $name = 'pxelator';
14  my $shard_size = 5000;  my $shard_size = 5000;
15    
16    
 my $path = "/tmp/sack/$name";  
 remove_tree $path;  
 make_path $path;  
   
   
17  sub couchdb_socket {  sub couchdb_socket {
18          IO::Socket::INET->new(          IO::Socket::INET->new(
19                  PeerAddr => '10.60.0.91',                  PeerAddr => '10.60.0.91',
# Line 48  get_chunk($sock); Line 41  get_chunk($sock);
41    
42  my $total = <$sock>;  my $total = <$sock>;
43  $total =~ s{^.*total_rows\D+(\d+).+$}{$1};  $total =~ s{^.*total_rows\D+(\d+).+$}{$1};
44  warn "# total: $total\n";  warn "# $name total: $total\n";
   
 our $shard;  
 our $shard_nr = 0;  
 my $t_start = time();  
 my $total_bytes;  
   
 sub save_shard {  
         my $shard_path = sprintf("%s/%06d.%d", $path, $shard_nr++ * $shard_size, $shard_size);  
         Storable::store( $shard, $shard_path );  
   
         my $dt = time() - $t_start;  
         my $pos = $shard_nr * $shard_size;  
         my $rec_s = $pos / $dt;  
         my $end_t = ( $total / $rec_s ) - $dt;  
         my $shard_size = -s $shard_path;  
   
         warn sprintf "shard %s %d bytes\t%8.2f%% %8.2f/s  ETA %d:%02ds\n"  
                 , $shard_path, $shard_size  
                 , $pos * 100 / $total  
                 , $rec_s,  
                 , $end_t / 60, $end_t % 60  
                 ;  
45    
46          $total_bytes += $shard_size;  Sack::Shard::create( $name, $total, $shard_size );
   
         $shard = [];  
 }  
47    
48  while(<$sock>) {  while(<$sock>) {
49          if ( /"id":"([^"]+)"/ ) {          if ( /"id":"([^"]+)"/ ) {
50    
51                  s/,[\r\n]+$//; # cleanup JSON                  s/,[\r\n]+$//; # cleanup JSON
52                  my $json = from_json( $_ );                  my $json = from_json( $_ );
53                  push @$shard, $json->{doc};                  Sack::Shard::add( $json->{doc} );
                 save_shard if $#{ $shard } == $shard_size;  
54          } else {          } else {
55                  warn "UNKNOWN: $_";                  warn "UNKNOWN: $_";
56          }          }
57  }  }
58  save_shard;  Sack::Shard::finish;
59    
 warn "sharded $path $total_bytes bytes\n";  

Legend:
Removed from v.243  
changed lines
  Added in v.256

  ViewVC Help
Powered by ViewVC 1.1.26