--- trunk/scripts/estcp.pl 2006/01/17 15:00:50 85 +++ trunk/scripts/estcp-mt.pl 2006/01/21 17:37:07 87 @@ -5,6 +5,9 @@ use URI::Escape qw/uri_escape/; use Time::HiRes; use POSIX qw/strftime/; +use Config; +use threads; +use Thread::Queue; =head1 NAME @@ -12,6 +15,9 @@ =cut +die "Your perl isn't compiled with support for ithreads\n" unless ($Config{useithreads}); + + my ($from,$to) = @ARGV; die "usage: $0 http://localhost:1978/node/from http://remote.example.com:1978/node/to\n" unless ($from && $to); @@ -32,7 +38,7 @@ ); unless(eval{ $to_n->name }) { - if ($to =~ m#^(http://.+)/node/(\w+)$#) { + if ($to =~ m#^(http://.+)/node/([^/]+)$#) { my ($url,$name) = ($1,$2); print "Creating '$name' on $url\n"; $to_n->shuttle_url( $url . '/master?action=nodeadd', @@ -45,16 +51,55 @@ } } -print "Copy from ",$from_n->name," (",$from_n->label,") to ",$to_n->name," (",$to_n->label,") - ",$from_n->doc_num," documents (",$from_n->word_num," words, ",$from_n->size," bytes)\n"; +# total processed elements +my $i : shared = 1; + +my $q_id = Thread::Queue->new; +my $q_drafts = Thread::Queue->new; + +my $get_thr = threads->new( sub { + while (my $id = $q_id->dequeue) { + #warn "get ", $id || 'undef',"\n"; + if ($id < 0) { + $q_drafts->enqueue( '' ); # abort put thread + last; + }; + print STDERR "get_thr, id: $id\n" if ($debug); + my $doc_draft = $from_n->_fetch_doc( id => $id, chomp_resbody => 1 ); + $q_drafts->enqueue( $doc_draft ); + } +} ); +my $t = time(); +my $t_refresh = time(); my $doc_num = $from_n->doc_num || 1; +my $put_thr = threads->new( sub { + while (my $doc_draft = $q_drafts->dequeue) { + last unless ($doc_draft); + print STDERR "put_thr, $doc_draft\n" if ($debug); + $to_n->shuttle_url( $to_n->{url} . '/put_doc', 'text/x-estraier-draft', $doc_draft, undef) == 200 or die "can't insert $doc_draft\n"; + + $i++; + if (time() - $t_refresh > 3) { + my $rate = ( $i / ((time() - $t) || 1) ); + printf("%d records, %1.2f%% [%1.2f rec/s] estimated finish: %s\n", + $i, + ($i * 100 / $doc_num), + $rate, + strftime("%Y-%m-%d %H:%M:%S", localtime( time() + int(($doc_num-$i) / $rate))), + ); + $t_refresh = time(); + } + + } +} ); + +print "Copy from ",$from_n->name," (",$from_n->label,") to ",$to_n->name," (",$to_n->label,") - ",$from_n->doc_num," documents (",$from_n->word_num," words, ",$from_n->size," bytes)\n"; + my $prev; -my $i = 0; my $more = 1; -my $t = time(); - while($more) { my $res; $from_n->shuttle_url( $from_n->{url} . '/list', @@ -71,22 +116,19 @@ #$to_n->put_doc( $from_n->get_doc( $id )); - my $doc_draft = $from_n->_fetch_doc( id => $id, chomp_resbody => 1 ); - $to_n->shuttle_url( $to_n->{url} . '/put_doc', 'text/x-estraier-draft', $doc_draft, undef) == 200 or die "can't insert $doc_draft\n"; + #my $doc_draft = $from_n->_fetch_doc( id => $id, chomp_resbody => 1 ); + #$to_n->shuttle_url( $to_n->{url} . '/put_doc', 'text/x-estraier-draft', $doc_draft, undef) == 200 or die "can't insert $doc_draft\n"; - $i++; + $q_id->enqueue( $id ); } - warn "$prev\n"; - - my $rate = ( $i / (time() - $t) ); - printf("%d records, %1.2f%% [%1.2f rec/s] estimated finish: %s\n", - $i, - ($i * 100 / $doc_num), - $rate, - strftime("%Y-%m-%d %H:%M:%S", localtime( time() + int(($doc_num-$i) / $rate))), - ); + warn "$prev\n" if ($debug); } +$q_id->enqueue( -1 ); # last one + +$get_thr->join; +$put_thr->join; -print "Copy completed.\n"; +printf "Copy of %d records completed [%1.2f rec/s]\n", $i, + ( $i / ((time() - $t) || 1) );