6 |
|
|
7 |
require Exporter; |
require Exporter; |
8 |
@ISA = qw(Exporter); |
@ISA = qw(Exporter); |
9 |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId |
10 |
|
Rollback RollbackAndQuit Connect Exec Exec2 MkInfo |
11 |
|
$debug $quiet $verbose |
12 |
|
); |
13 |
@EXPORT_OK = qw(); |
@EXPORT_OK = qw(); |
14 |
use strict; |
use strict; |
15 |
use Pg; |
use Pg; |
16 |
|
|
17 |
my $debug = 0; |
my $debug = 0; |
18 |
my $quiet = 1; |
my $quiet = 1; |
19 |
|
my $verbose = 0; |
20 |
|
|
21 |
|
$debug = 1; |
22 |
|
$quiet = 0; |
23 |
|
$verbose = 1; |
24 |
|
|
25 |
my %Mtables = (); |
my %Mtables = (); |
26 |
my %Stables = (); |
my %Stables = (); |
27 |
|
|
28 |
sub GetSlaveId |
sub GetServerId |
29 |
{ |
{ |
30 |
my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]); |
31 |
|
|
32 |
|
print STDERR "GetServerId: host $Host, database $DB\n" if ($debug); |
33 |
|
|
34 |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
35 |
" host='$slaveHost' AND dbase='$slaveDB'"); |
" host='$Host' AND dbase='$DB'"); |
36 |
|
|
37 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
38 |
{ |
{ |
42 |
|
|
43 |
if ($result->cmdTuples && $result->cmdTuples > 1) |
if ($result->cmdTuples && $result->cmdTuples > 1) |
44 |
{ |
{ |
45 |
printf STDERR "Duplicate slave definitions.\n" unless ($quiet); |
printf STDERR "Duplicate host definitions.\n" unless ($quiet); |
46 |
return(-2); |
return(-2); |
47 |
} |
} |
48 |
|
|
49 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
50 |
|
|
51 |
|
print "GetServerId($DB,$Host) == $row[0]\n" if ($debug); |
52 |
|
|
53 |
return $row[0]; |
return $row[0]; |
54 |
} |
} |
55 |
|
|
56 |
sub PrepareSnapshot |
sub PrepareSnapshot |
57 |
{ |
{ |
58 |
my ($mconn, $sconn, $outf, $sserver, $onlytables) = @_; |
my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_; |
59 |
|
|
60 |
|
print STDERR "## d: $debug v: $verbose q: $quiet\n"; |
61 |
|
|
62 |
|
if ($mserver == $sserver) { |
63 |
|
print STDERR "master and slave numbers are same [$mserver] !\n"; |
64 |
|
return(-1); |
65 |
|
} |
66 |
|
|
67 |
|
print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug); |
68 |
|
|
69 |
# first, we must know for wich tables the slave subscribed |
# first, we must know for wich tables the slave subscribed |
70 |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
116 |
} |
} |
117 |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
118 |
} |
} |
119 |
|
|
120 |
|
print "Prepare snapshot for tables with oid: ",join(",",keys %Mtables),"\n" if ($debug); |
121 |
|
|
122 |
# Read last succeeded sync |
# Read last succeeded sync |
123 |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
124 |
" where server = $sserver AND syncid = (select max(syncid) from" . |
" where server = $sserver AND syncid = (select max(syncid) from" . |
135 |
} |
} |
136 |
|
|
137 |
my @lastsync = $result->fetchrow; |
my @lastsync = $result->fetchrow; |
138 |
|
|
139 |
|
# exclude data which originated from master server |
140 |
|
my $sel_server = " and l.server = $mserver "; |
141 |
|
|
142 |
my $sinfo = ""; |
my $sinfo = ""; |
143 |
if (@lastsync && $lastsync[3] ne '') # sync info |
if (@lastsync && $lastsync[3] ne '') # sync info |
144 |
{ |
{ |
151 |
|
|
152 |
# DELETED rows |
# DELETED rows |
153 |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
154 |
" where l.delete = 1 $sinfo order by l.reloid"; |
" where l.delete = 1 $sinfo $sel_server order by l.reloid"; |
155 |
|
|
156 |
printf "$sql\n" if $debug; |
printf "DELETED: $sql\n" if $debug; |
157 |
|
|
158 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
159 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
206 |
|
|
207 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
208 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
209 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
210 |
|
$sel_server; |
211 |
|
|
212 |
printf "$sql\n" if $debug; |
printf "UPDATED: $sql\n" if $debug; |
213 |
|
|
214 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
215 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
255 |
|
|
256 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
257 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
258 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
259 |
|
$sel_server; |
260 |
|
|
261 |
printf "$sql\n" if $debug; |
printf "INSERTED: $sql\n" if $debug; |
262 |
|
|
263 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
264 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
478 |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
479 |
} |
} |
480 |
|
|
481 |
|
print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug); |
482 |
|
|
483 |
my $ok = 0; |
my $ok = 0; |
484 |
my $syncid = ''; |
my $syncid = -1; |
485 |
while(<$inpf>) |
while(<$inpf>) |
486 |
{ |
{ |
487 |
$_ =~ s/\n//; |
$_ =~ s/\n//; |
488 |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
489 |
|
die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt); |
490 |
if ($cmt ne '--') |
if ($cmt ne '--') |
491 |
{ |
{ |
492 |
printf STDERR "Invalid format\n" unless ($quiet); |
printf STDERR "Invalid format\n" unless ($quiet); |
495 |
} |
} |
496 |
if ($cmd eq 'DELETE') |
if ($cmd eq 'DELETE') |
497 |
{ |
{ |
498 |
if ($syncid eq '') |
if ($syncid == -1) |
499 |
{ |
{ |
500 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
501 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
510 |
} |
} |
511 |
elsif ($cmd eq 'INSERT') |
elsif ($cmd eq 'INSERT') |
512 |
{ |
{ |
513 |
if ($syncid eq '') |
if ($syncid == -1) |
514 |
{ |
{ |
515 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
516 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
525 |
} |
} |
526 |
elsif ($cmd eq 'UPDATE') |
elsif ($cmd eq 'UPDATE') |
527 |
{ |
{ |
528 |
if ($syncid eq '') |
if ($syncid == -1) |
529 |
{ |
{ |
530 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
531 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
540 |
} |
} |
541 |
elsif ($cmd eq 'SYNCID') |
elsif ($cmd eq 'SYNCID') |
542 |
{ |
{ |
543 |
if ($syncid ne '') |
if ($syncid != -1) |
544 |
{ |
{ |
545 |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
546 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
1009 |
return(1); |
return(1); |
1010 |
} |
} |
1011 |
|
|
1012 |
|
# stuff moved from perl scripts for better re-use |
1013 |
|
|
1014 |
|
sub Rollback { |
1015 |
|
my $conn = shift @_; |
1016 |
|
|
1017 |
|
print STDERR $conn->errorMessage; |
1018 |
|
$conn->exec("ROLLBACK"); |
1019 |
|
} |
1020 |
|
|
1021 |
|
sub RollbackAndQuit { |
1022 |
|
my $conn = shift @_; |
1023 |
|
|
1024 |
|
Rollback($conn); |
1025 |
|
exit (-1); |
1026 |
|
} |
1027 |
|
|
1028 |
|
sub Connect { |
1029 |
|
my $info = shift @_; |
1030 |
|
|
1031 |
|
print("Connecting to $info\n") if ($debug || $verbose); |
1032 |
|
my $conn = Pg::connectdb($info); |
1033 |
|
if ($conn->status != PGRES_CONNECTION_OK) { |
1034 |
|
print STDERR "Failed opening $info\n"; |
1035 |
|
exit 1; |
1036 |
|
} |
1037 |
|
return $conn; |
1038 |
|
} |
1039 |
|
|
1040 |
|
sub Exec { |
1041 |
|
my $conn = shift @_; |
1042 |
|
my $sql = shift @_; |
1043 |
|
|
1044 |
|
my $result = $conn->exec($sql); |
1045 |
|
print STDERR "$sql\n" if ($debug); |
1046 |
|
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1047 |
|
} |
1048 |
|
|
1049 |
|
sub Exec2 { |
1050 |
|
my $mconn = shift @_; |
1051 |
|
my $sconn = shift @_; |
1052 |
|
my $sql = shift @_; |
1053 |
|
|
1054 |
|
my $result = $mconn->exec($sql); |
1055 |
|
RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1056 |
|
$result = $sconn->exec($sql); |
1057 |
|
RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1058 |
|
} |
1059 |
|
|
1060 |
|
sub MkInfo { |
1061 |
|
my $db = shift || die "need database name!"; |
1062 |
|
my $host = shift; |
1063 |
|
my $port = shift; |
1064 |
|
my $user = shift; |
1065 |
|
my $password = shift; |
1066 |
|
|
1067 |
|
my $info = "dbname=$db"; |
1068 |
|
$info = "$info host=$host" if (defined($host)); |
1069 |
|
$info = "$info port=$port" if (defined($port)); |
1070 |
|
$info = "$info user=$user" if (defined($user)); |
1071 |
|
$info = "$info password=$password" if (defined($password)); |
1072 |
|
|
1073 |
|
return $info; |
1074 |
|
} |
1075 |
|
|
1076 |
1; |
1; |