6 |
|
|
7 |
require Exporter; |
require Exporter; |
8 |
@ISA = qw(Exporter); |
@ISA = qw(Exporter); |
9 |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId |
10 |
|
Rollback RollbackAndQuit Connect Exec Exec2 MkInfo |
11 |
|
$debug $quiet $verbose |
12 |
|
); |
13 |
@EXPORT_OK = qw(); |
@EXPORT_OK = qw(); |
14 |
use strict; |
use strict; |
15 |
use Pg; |
use Pg; |
16 |
|
|
17 |
my $debug = 0; |
my $debug = 0; |
18 |
my $quiet = 1; |
my $quiet = 1; |
19 |
|
my $verbose = 0; |
20 |
|
|
21 |
|
$debug = 1; |
22 |
|
$quiet = 0; |
23 |
|
$verbose = 1; |
24 |
|
|
25 |
my %Mtables = (); |
my %Mtables = (); |
26 |
my %Stables = (); |
my %Stables = (); |
27 |
|
|
28 |
sub GetSlaveId |
sub GetServerId |
29 |
{ |
{ |
30 |
my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]); |
31 |
|
|
32 |
|
print STDERR "GetServerId: host $Host, database $DB\n" if ($debug); |
33 |
|
|
34 |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
35 |
" host='$slaveHost' AND dbase='$slaveDB'"); |
" host='$Host' AND dbase='$DB'"); |
36 |
|
|
37 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
38 |
{ |
{ |
42 |
|
|
43 |
if ($result->cmdTuples && $result->cmdTuples > 1) |
if ($result->cmdTuples && $result->cmdTuples > 1) |
44 |
{ |
{ |
45 |
printf STDERR "Duplicate slave definitions.\n" unless ($quiet); |
printf STDERR "Duplicate host definitions.\n" unless ($quiet); |
46 |
return(-2); |
return(-2); |
47 |
} |
} |
48 |
|
|
49 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
50 |
|
|
51 |
|
print "GetServerId($DB,$Host) == $row[0]\n" if ($debug); |
52 |
|
|
53 |
return $row[0]; |
return $row[0]; |
54 |
} |
} |
55 |
|
|
56 |
sub PrepareSnapshot |
sub PrepareSnapshot |
57 |
{ |
{ |
58 |
my ($mconn, $sconn, $outf, $sserver, $onlytables) = @_; |
my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_; |
59 |
|
|
60 |
|
print STDERR "## d: $debug v: $verbose q: $quiet\n"; |
61 |
|
|
62 |
|
if ($mserver == $sserver) { |
63 |
|
print STDERR "master and slave numbers are same [$mserver] !\n"; |
64 |
|
return(-1); |
65 |
|
} |
66 |
|
|
67 |
|
print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug); |
68 |
|
|
69 |
# first, we must know for wich tables the slave subscribed |
# first, we must know for wich tables the slave subscribed |
70 |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
79 |
$Stables{$row[0]} = 1; |
$Stables{$row[0]} = 1; |
80 |
} |
} |
81 |
|
|
82 |
|
print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug); |
83 |
|
|
84 |
$result = $mconn->exec("BEGIN"); |
$result = $mconn->exec("BEGIN"); |
85 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
86 |
{ |
{ |
118 |
} |
} |
119 |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
120 |
} |
} |
121 |
|
|
122 |
|
print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug); |
123 |
|
|
124 |
# Read last succeeded sync |
# Read last succeeded sync |
125 |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
126 |
" where server = $sserver AND syncid = (select max(syncid) from" . |
" where server = $sserver AND syncid = (select max(syncid) from" . |
137 |
} |
} |
138 |
|
|
139 |
my @lastsync = $result->fetchrow; |
my @lastsync = $result->fetchrow; |
140 |
|
print "lastsync: ",join(",",@lastsync),"\n" if ($debug); |
141 |
|
|
142 |
|
# exclude data which originated from master server |
143 |
|
my $sel_server = " and l.server = $mserver "; |
144 |
|
|
145 |
my $sinfo = ""; |
my $sinfo = ""; |
146 |
if (@lastsync && $lastsync[3] ne '') # sync info |
if (@lastsync && $lastsync[3] ne '') # sync info |
147 |
{ |
{ |
154 |
|
|
155 |
# DELETED rows |
# DELETED rows |
156 |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
157 |
" where l.delete = 1 $sinfo order by l.reloid"; |
" where l.delete = 1 $sinfo $sel_server order by l.reloid"; |
158 |
|
|
159 |
printf "$sql\n" if $debug; |
printf "DELETED: $sql\n" if $debug; |
160 |
|
|
161 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
162 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
209 |
|
|
210 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
211 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
212 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
213 |
|
$sel_server; |
214 |
|
|
215 |
printf "$sql\n" if $debug; |
printf "UPDATED: $sql\n" if $debug; |
216 |
|
|
217 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
218 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
258 |
|
|
259 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
260 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
261 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
262 |
|
$sel_server; |
263 |
|
|
264 |
printf "$sql\n" if $debug; |
printf "INSERTED: $sql\n" if $debug; |
265 |
|
|
266 |
$result = $mconn->exec($sql); |
$result = $mconn->exec($sql); |
267 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
481 |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
482 |
} |
} |
483 |
|
|
484 |
|
print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug); |
485 |
|
|
486 |
my $ok = 0; |
my $ok = 0; |
487 |
my $syncid = ''; |
my $syncid = -1; |
488 |
while(<$inpf>) |
while(<$inpf>) |
489 |
{ |
{ |
490 |
$_ =~ s/\n//; |
$_ =~ s/\n//; |
491 |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
492 |
|
die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt); |
493 |
if ($cmt ne '--') |
if ($cmt ne '--') |
494 |
{ |
{ |
495 |
printf STDERR "Invalid format\n" unless ($quiet); |
printf STDERR "Invalid format\n" unless ($quiet); |
498 |
} |
} |
499 |
if ($cmd eq 'DELETE') |
if ($cmd eq 'DELETE') |
500 |
{ |
{ |
501 |
if ($syncid eq '') |
if ($syncid == -1) |
502 |
{ |
{ |
503 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
504 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
513 |
} |
} |
514 |
elsif ($cmd eq 'INSERT') |
elsif ($cmd eq 'INSERT') |
515 |
{ |
{ |
516 |
if ($syncid eq '') |
if ($syncid == -1) |
517 |
{ |
{ |
518 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
519 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
528 |
} |
} |
529 |
elsif ($cmd eq 'UPDATE') |
elsif ($cmd eq 'UPDATE') |
530 |
{ |
{ |
531 |
if ($syncid eq '') |
if ($syncid == -1) |
532 |
{ |
{ |
533 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
534 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
543 |
} |
} |
544 |
elsif ($cmd eq 'SYNCID') |
elsif ($cmd eq 'SYNCID') |
545 |
{ |
{ |
546 |
if ($syncid ne '') |
if ($syncid != -1) |
547 |
{ |
{ |
548 |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
549 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
1012 |
return(1); |
return(1); |
1013 |
} |
} |
1014 |
|
|
1015 |
|
# stuff moved from perl scripts for better re-use |
1016 |
|
|
1017 |
|
sub Rollback { |
1018 |
|
my $conn = shift @_; |
1019 |
|
|
1020 |
|
print STDERR $conn->errorMessage; |
1021 |
|
$conn->exec("ROLLBACK"); |
1022 |
|
} |
1023 |
|
|
1024 |
|
sub RollbackAndQuit { |
1025 |
|
my $conn = shift @_; |
1026 |
|
|
1027 |
|
Rollback($conn); |
1028 |
|
exit (-1); |
1029 |
|
} |
1030 |
|
|
1031 |
|
sub Connect { |
1032 |
|
my $info = shift @_; |
1033 |
|
|
1034 |
|
print("Connecting to $info\n") if ($debug || $verbose); |
1035 |
|
my $conn = Pg::connectdb($info); |
1036 |
|
if ($conn->status != PGRES_CONNECTION_OK) { |
1037 |
|
print STDERR "Failed opening $info\n"; |
1038 |
|
exit 1; |
1039 |
|
} |
1040 |
|
return $conn; |
1041 |
|
} |
1042 |
|
|
1043 |
|
sub Exec { |
1044 |
|
my $conn = shift @_; |
1045 |
|
my $sql = shift @_; |
1046 |
|
|
1047 |
|
my $result = $conn->exec($sql); |
1048 |
|
print STDERR "$sql\n" if ($debug); |
1049 |
|
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1050 |
|
} |
1051 |
|
|
1052 |
|
sub Exec2 { |
1053 |
|
my $mconn = shift @_; |
1054 |
|
my $sconn = shift @_; |
1055 |
|
my $sql = shift @_; |
1056 |
|
|
1057 |
|
my $result = $mconn->exec($sql); |
1058 |
|
RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1059 |
|
$result = $sconn->exec($sql); |
1060 |
|
RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1061 |
|
} |
1062 |
|
|
1063 |
|
sub MkInfo { |
1064 |
|
my $db = shift || die "need database name!"; |
1065 |
|
my $host = shift; |
1066 |
|
my $port = shift; |
1067 |
|
my $user = shift; |
1068 |
|
my $password = shift; |
1069 |
|
|
1070 |
|
my $info = "dbname=$db"; |
1071 |
|
$info = "$info host=$host" if (defined($host)); |
1072 |
|
$info = "$info port=$port" if (defined($port)); |
1073 |
|
$info = "$info user=$user" if (defined($user)); |
1074 |
|
$info = "$info password=$password" if (defined($password)); |
1075 |
|
|
1076 |
|
return $info; |
1077 |
|
} |
1078 |
|
|
1079 |
1; |
1; |