8 |
@ISA = qw(Exporter); |
@ISA = qw(Exporter); |
9 |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); |
10 |
@EXPORT_OK = qw(); |
@EXPORT_OK = qw(); |
11 |
|
use strict; |
12 |
use Pg; |
use Pg; |
13 |
|
|
14 |
$debug = 0; |
my $debug = 0; |
15 |
$quiet = 1; |
my $quiet = 1; |
16 |
|
|
17 |
my %Mtables = (); |
my %Mtables = (); |
18 |
my %Stables = (); |
my %Stables = (); |
21 |
{ |
{ |
22 |
my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
23 |
|
|
24 |
$result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
25 |
" host='$slaveHost' AND dbase='$slaveDB'"); |
" host='$slaveHost' AND dbase='$slaveDB'"); |
26 |
|
|
27 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
28 |
{ |
{ |
29 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $conn->errorMessage unless ($quiet); |
30 |
return(-1); |
return(-1); |
31 |
} |
} |
32 |
|
|
33 |
if ($result->cmdTuples > 1) |
if ($result->cmdTuples && $result->cmdTuples > 1) |
34 |
{ |
{ |
35 |
printf STDERR "Duplicate slave definitions.\n" unless ($quiet); |
printf STDERR "Duplicate slave definitions.\n" unless ($quiet); |
36 |
return(-2); |
return(-2); |
93 |
} |
} |
94 |
|
|
95 |
# Read last succeeded sync |
# Read last succeeded sync |
96 |
$sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
97 |
" where server = $server AND syncid = (select max(syncid) from" . |
" where server = $server AND syncid = (select max(syncid) from" . |
98 |
" _RSERV_SYNC_ where server = $server AND status > 0)"; |
" _RSERV_SYNC_ where server = $server AND status > 0)"; |
99 |
|
|
110 |
my @lastsync = $result->fetchrow; |
my @lastsync = $result->fetchrow; |
111 |
|
|
112 |
my $sinfo = ""; |
my $sinfo = ""; |
113 |
if ($lastsync[3] ne '') # sync info |
if (@lastsync && $lastsync[3] ne '') # sync info |
114 |
{ |
{ |
115 |
$sinfo = "and (l.logid >= $lastsync[3]"; |
$sinfo = "and (l.logid >= $lastsync[3]"; |
116 |
$sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne ''; |
$sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne ''; |
133 |
return(-1); |
return(-1); |
134 |
} |
} |
135 |
|
|
136 |
$lastoid = ''; |
my $lastoid = ''; |
137 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
138 |
{ |
{ |
139 |
next unless exists $Mtables{$row[0]}; |
next unless exists $Mtables{$row[0]}; |
169 |
my ($taboid, $tabname, $tabkey); |
my ($taboid, $tabname, $tabkey); |
170 |
foreach $taboid (keys %Mtables) |
foreach $taboid (keys %Mtables) |
171 |
{ |
{ |
172 |
($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}}; |
my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}}; |
173 |
next unless exists $Stables{$tabname}; |
next unless exists $Stables{$tabname}; |
174 |
|
|
175 |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
199 |
printf "-- UPDATE $tabname\n" if $debug; |
printf "-- UPDATE $tabname\n" if $debug; |
200 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
201 |
{ |
{ |
202 |
for ($i = 0; $i <= $#row; $i++) |
for (my $i = 0; $i <= $#row; $i++) |
203 |
{ |
{ |
204 |
printf $outf " " if $i; |
printf $outf " " if $i; |
205 |
printf " " if $i && $debug; |
printf " " if $i && $debug; |
215 |
|
|
216 |
# INSERTED rows |
# INSERTED rows |
217 |
|
|
|
my ($taboid, $tabname, $tabkey); |
|
218 |
foreach $taboid (keys %Mtables) |
foreach $taboid (keys %Mtables) |
219 |
{ |
{ |
220 |
($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}}; |
my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}}; |
221 |
next unless exists $Stables{$tabname}; |
next unless exists $Stables{$tabname}; |
222 |
|
|
223 |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
247 |
printf "-- INSERT $tabname\n" if $debug; |
printf "-- INSERT $tabname\n" if $debug; |
248 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
249 |
{ |
{ |
250 |
for ($i = 0; $i <= $#row; $i++) |
for (my $i = 0; $i <= $#row; $i++) |
251 |
{ |
{ |
252 |
printf $outf " " if $i; |
printf $outf " " if $i; |
253 |
printf " " if $i && $debug; |
printf " " if $i && $debug; |
359 |
$maxid = $row[0] if $maxid eq ''; |
$maxid = $row[0] if $maxid eq ''; |
360 |
last if $row[0] > $maxid; |
last if $row[0] > $maxid; |
361 |
my @ids = split(/[ ]+,[ ]+/, $row[1]); |
my @ids = split(/[ ]+,[ ]+/, $row[1]); |
362 |
foreach $aid (@ids) |
foreach my $aid (@ids) |
363 |
{ |
{ |
364 |
$active{$aid} = 1 unless exists $active{$aid}; |
$active{$aid} = 1 unless exists $active{$aid}; |
365 |
} |
} |
432 |
return(-1); |
return(-1); |
433 |
} |
} |
434 |
%Stables = (); |
%Stables = (); |
435 |
while (@row = $result->fetchrow) |
while (my @row = $result->fetchrow) |
436 |
{ |
{ |
437 |
# printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3]; |
# printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3]; |
438 |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
591 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
592 |
print "Not configured to delete rows from table $tabname\n" unless $quiet; |
print "Not configured to delete rows from table $tabname\n" unless $quiet; |
593 |
while (<$inpf>) { |
while (<$inpf>) { |
594 |
$istring = $_; |
my $istring = $_; |
595 |
$istring =~ s/\n//; |
$istring =~ s/\n//; |
596 |
last if ($istring eq '\.'); |
last if ($istring eq '\.'); |
597 |
} |
} |
645 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
646 |
print "Not configured to update rows from table $tabname\n" unless $quiet; |
print "Not configured to update rows from table $tabname\n" unless $quiet; |
647 |
while (<$inpf>) { |
while (<$inpf>) { |
648 |
$istring = $_; |
my $istring = $_; |
649 |
$istring =~ s/\n//; |
$istring =~ s/\n//; |
650 |
last if ($istring eq '\.'); |
last if ($istring eq '\.'); |
651 |
} |
} |
669 |
} |
} |
670 |
|
|
671 |
my @anames = (); |
my @anames = (); |
672 |
while (@row = $result->fetchrow) |
while (my @row = $result->fetchrow) |
673 |
{ |
{ |
674 |
$anames[$row[0]] = $row[1]; |
$anames[$row[0]] = $row[1]; |
675 |
} |
} |
791 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
792 |
print "Not configured to insert rows from table $tabname\n" unless $quiet; |
print "Not configured to insert rows from table $tabname\n" unless $quiet; |
793 |
while (<$inpf>) { |
while (<$inpf>) { |
794 |
$istring = $_; |
my $istring = $_; |
795 |
$istring =~ s/\n//; |
$istring =~ s/\n//; |
796 |
last if ($istring eq '\.'); |
last if ($istring eq '\.'); |
797 |
} |
} |
827 |
|
|
828 |
if ($CBufLen >= $CBufMax) |
if ($CBufLen >= $CBufMax) |
829 |
{ |
{ |
830 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
831 |
return($result) if $result; |
return($result) if $result; |
832 |
@CopyBuf = (); |
@CopyBuf = (); |
833 |
$CBufLen = 0; |
$CBufLen = 0; |
843 |
if ($CBufLen) |
if ($CBufLen) |
844 |
{ |
{ |
845 |
print "@CopyBuf\n" if $debug; |
print "@CopyBuf\n" if $debug; |
846 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
847 |
return($result) if $result; |
return($result) if $result; |
848 |
} |
} |
849 |
|
|
864 |
return(-1); |
return(-1); |
865 |
} |
} |
866 |
|
|
867 |
foreach $str (@{$CBuf}) |
foreach my $str (@{$CBuf}) |
868 |
{ |
{ |
869 |
$conn->putline($str); |
$conn->putline($str); |
870 |
} |
} |