6 |
|
|
7 |
require Exporter; |
require Exporter; |
8 |
@ISA = qw(Exporter); |
@ISA = qw(Exporter); |
9 |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); |
@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId |
10 |
|
Rollback RollbackAndQuit Connect Exec Exec2 MkInfo |
11 |
|
$debug $quiet $verbose |
12 |
|
); |
13 |
@EXPORT_OK = qw(); |
@EXPORT_OK = qw(); |
14 |
use strict; |
use strict; |
15 |
use Pg; |
use Pg; |
16 |
|
|
17 |
my $debug = 0; |
my $debug = 0; |
18 |
my $quiet = 1; |
my $quiet = 1; |
19 |
|
my $verbose = 0; |
20 |
|
|
21 |
|
$debug = 1; |
22 |
|
$quiet = 0; |
23 |
|
$verbose = 1; |
24 |
|
|
25 |
my %Mtables = (); |
my %Mtables = (); |
26 |
my %Stables = (); |
my %Stables = (); |
27 |
|
|
28 |
sub GetSlaveId |
sub GetServerId |
29 |
{ |
{ |
30 |
my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]); |
31 |
|
|
32 |
|
print STDERR "GetServerId: host $Host, database $DB\n" if ($debug); |
33 |
|
|
34 |
my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
35 |
" host='$slaveHost' AND dbase='$slaveDB'"); |
" host='$Host' AND dbase='$DB'"); |
36 |
|
|
37 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
38 |
{ |
{ |
39 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
40 |
return(-1); |
return(-1); |
41 |
} |
} |
42 |
|
|
43 |
if ($result->cmdTuples && $result->cmdTuples > 1) |
if ($result->cmdTuples && $result->cmdTuples > 1) |
44 |
{ |
{ |
45 |
printf STDERR "Duplicate slave definitions.\n" unless ($quiet); |
printf STDERR "Duplicate host definitions.\n" unless ($quiet); |
46 |
return(-2); |
return(-2); |
47 |
} |
} |
48 |
|
|
49 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
50 |
|
|
51 |
|
print "GetServerId($DB,$Host) == $row[0]\n" if ($debug); |
52 |
|
|
53 |
return $row[0]; |
return $row[0]; |
54 |
} |
} |
55 |
|
|
56 |
sub PrepareSnapshot |
sub PrepareSnapshot |
57 |
{ |
{ |
58 |
my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]); |
my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_; |
59 |
|
|
60 |
|
print STDERR "## d: $debug v: $verbose q: $quiet\n"; |
61 |
|
|
62 |
|
if ($mserver == $sserver) { |
63 |
|
print STDERR "master and slave numbers are same [$mserver] !\n"; |
64 |
|
return(-1); |
65 |
|
} |
66 |
|
|
67 |
|
print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug); |
68 |
|
|
69 |
# first, we must know for wich tables the slave subscribed |
# first, we must know for wich tables the slave subscribed |
70 |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
71 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
72 |
{ |
{ |
73 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
74 |
return(-1); |
return(-1); |
75 |
} |
} |
76 |
|
|
79 |
$Stables{$row[0]} = 1; |
$Stables{$row[0]} = 1; |
80 |
} |
} |
81 |
|
|
82 |
$result = $conn->exec("BEGIN"); |
print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug); |
83 |
|
|
84 |
|
$result = $mconn->exec("BEGIN"); |
85 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
86 |
{ |
{ |
87 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
88 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
89 |
return(-1); |
return(-1); |
90 |
} |
} |
91 |
$result = $conn->exec("set transaction isolation level serializable"); |
$result = $mconn->exec("set transaction isolation level serializable"); |
92 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
93 |
{ |
{ |
94 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
95 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
96 |
return(-1); |
return(-1); |
97 |
} |
} |
98 |
|
|
99 |
# MAP oid --> tabname, keyname, key_type |
# MAP oid --> tabname, keyname, key_type |
100 |
$result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . |
$result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . |
101 |
" from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . |
" from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . |
102 |
", pg_type pgt". |
", pg_type pgt". |
103 |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
104 |
" AND pga.attnum = rt.key AND pga.atttypid=pgt.oid"); |
" AND pga.attnum = rt.key AND pga.atttypid=pgt.oid"); |
105 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
106 |
{ |
{ |
107 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
108 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
109 |
return(-1); |
return(-1); |
110 |
} |
} |
111 |
|
|
112 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
113 |
{ |
{ |
114 |
# printf "$row[0], $row[1], $row[2]\n"; |
# printf "$row[0], $row[1], $row[2]\n"; |
115 |
|
if (ref($onlytables) eq 'HASH') { |
116 |
|
next unless (exists $onlytables->{$row[1]}); |
117 |
|
$onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]}); |
118 |
|
} |
119 |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3]; |
120 |
} |
} |
121 |
|
|
122 |
|
print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug); |
123 |
|
|
124 |
# Read last succeeded sync |
# Read last succeeded sync |
125 |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . |
126 |
" where server = $server AND syncid = (select max(syncid) from" . |
" where server = $sserver AND syncid = (select max(syncid) from" . |
127 |
" _RSERV_SYNC_ where server = $server AND status > 0)"; |
" _RSERV_SYNC_ where server = $sserver AND status > 0)"; |
128 |
|
|
129 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
130 |
|
|
131 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
132 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
133 |
{ |
{ |
134 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
135 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
136 |
return(-1); |
return(-1); |
137 |
} |
} |
138 |
|
|
139 |
my @lastsync = $result->fetchrow; |
my @lastsync = $result->fetchrow; |
140 |
|
print "lastsync: ",join(",",@lastsync),"\n" if ($debug); |
141 |
|
|
142 |
|
# exclude data which originated from master server |
143 |
|
my $sel_server = " and l.server = $mserver "; |
144 |
|
|
145 |
my $sinfo = ""; |
my $sinfo = ""; |
146 |
if (@lastsync && $lastsync[3] ne '') # sync info |
if (@lastsync && $lastsync[3] ne '') # sync info |
147 |
{ |
{ |
154 |
|
|
155 |
# DELETED rows |
# DELETED rows |
156 |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
$sql = "select l.reloid, l.key from _RSERV_LOG_ l" . |
157 |
" where l.delete = 1 $sinfo order by l.reloid"; |
" where l.delete = 1 $sinfo $sel_server order by l.reloid"; |
158 |
|
|
159 |
printf "$sql\n" if $debug; |
printf "DELETED: $sql\n" if $debug; |
160 |
|
|
161 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
162 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
163 |
{ |
{ |
164 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
165 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
166 |
return(-1); |
return(-1); |
167 |
} |
} |
168 |
|
|
169 |
my $lastoid = ''; |
my $lastoid = -1; |
170 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
171 |
{ |
{ |
172 |
next unless exists $Mtables{$row[0]}; |
next unless exists $Mtables{$row[0]}; |
174 |
|
|
175 |
if ($lastoid != $row[0]) |
if ($lastoid != $row[0]) |
176 |
{ |
{ |
177 |
if ($lastoid eq '') |
if ($lastoid == -1) |
178 |
{ |
{ |
179 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
180 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
181 |
$havedeal = 1; |
$havedeal = 1; |
182 |
} |
} |
190 |
if (! defined $row[1]) |
if (! defined $row[1]) |
191 |
{ |
{ |
192 |
print STDERR "NULL key\n" unless ($quiet); |
print STDERR "NULL key\n" unless ($quiet); |
193 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
194 |
return(-2); |
return(-2); |
195 |
} |
} |
196 |
printf $outf "%s\n", OutputValue($row[1]); |
printf $outf "%s\n", OutputValue($row[1]); |
197 |
} |
} |
198 |
printf $outf "\\.\n" if $lastoid ne ''; |
printf $outf "\\.\n" if ($lastoid != -1); |
199 |
|
|
200 |
# UPDATED rows |
# UPDATED rows |
201 |
|
|
209 |
|
|
210 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
211 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
212 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
213 |
|
$sel_server; |
214 |
|
|
215 |
printf "$sql\n" if $debug; |
printf "UPDATED: $sql\n" if $debug; |
216 |
|
|
217 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
218 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
219 |
{ |
{ |
220 |
printf $outf "-- ERROR\n" if $havedeal; |
printf $outf "-- ERROR\n" if $havedeal; |
221 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
222 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
223 |
return(-1); |
return(-1); |
224 |
} |
} |
225 |
next if $result->ntuples <= 0; |
next if $result->ntuples <= 0; |
226 |
if (! $havedeal) |
if (! $havedeal) |
227 |
{ |
{ |
228 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
229 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
230 |
$havedeal = 1; |
$havedeal = 1; |
231 |
} |
} |
258 |
|
|
259 |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
260 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
261 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
262 |
|
$sel_server; |
263 |
|
|
264 |
printf "$sql\n" if $debug; |
printf "INSERTED: $sql\n" if $debug; |
265 |
|
|
266 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
267 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
268 |
{ |
{ |
269 |
printf $outf "-- ERROR\n" if $havedeal; |
printf $outf "-- ERROR\n" if $havedeal; |
270 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
271 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
272 |
return(-1); |
return(-1); |
273 |
} |
} |
274 |
next if $result->ntuples <= 0; |
next if $result->ntuples <= 0; |
275 |
if (! $havedeal) |
if (! $havedeal) |
276 |
{ |
{ |
277 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
278 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
279 |
$havedeal = 1; |
$havedeal = 1; |
280 |
} |
} |
299 |
|
|
300 |
unless ($havedeal) |
unless ($havedeal) |
301 |
{ |
{ |
302 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
303 |
return(0); |
return(0); |
304 |
} |
} |
305 |
|
|
306 |
# Remember this snapshot info |
# Remember this snapshot info |
307 |
$result = $conn->exec("select _rserv_sync_($server)"); |
$result = $mconn->exec("select _rserv_sync_($sserver)"); |
308 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
309 |
{ |
{ |
310 |
printf $outf "-- ERROR\n"; |
printf $outf "-- ERROR\n"; |
311 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
312 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
313 |
return(-1); |
return(-1); |
314 |
} |
} |
315 |
|
|
316 |
$result = $conn->exec("COMMIT"); |
$result = $mconn->exec("COMMIT"); |
317 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
318 |
{ |
{ |
319 |
printf $outf "-- ERROR\n"; |
printf $outf "-- ERROR\n"; |
320 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
321 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
322 |
return(-1); |
return(-1); |
323 |
} |
} |
324 |
printf $outf "-- OK\n"; |
printf $outf "-- OK\n"; |
365 |
|
|
366 |
sub CleanLog |
sub CleanLog |
367 |
{ |
{ |
368 |
my ($conn, $howold) = @_; # (@_[0], @_[1]); |
my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]); |
369 |
|
|
370 |
my $result = $conn->exec("BEGIN"); |
my $result = $conn->exec("BEGIN"); |
371 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
407 |
my $alist = join(',', keys %active); |
my $alist = join(',', keys %active); |
408 |
my $sinfo = "logid < $maxid"; |
my $sinfo = "logid < $maxid"; |
409 |
$sinfo .= " AND logid not in ($alist)" if $alist ne ''; |
$sinfo .= " AND logid not in ($alist)" if $alist ne ''; |
410 |
|
#if (ref($onlytables) eq 'HASH') { |
411 |
|
# foreach my $onlytable (keys %{$onlytables}) { |
412 |
|
# $sinfo |
413 |
|
# } |
414 |
|
#} |
415 |
$sql = "delete from _RSERV_LOG_ where " . |
$sql = "delete from _RSERV_LOG_ where " . |
416 |
"logtime < now() - '$howold second'::interval AND $sinfo"; |
"logtime < now() - '$howold second'::interval AND $sinfo"; |
417 |
|
|
439 |
|
|
440 |
sub ApplySnapshot |
sub ApplySnapshot |
441 |
{ |
{ |
442 |
my ($conn, $inpf) = @_; # (@_[0], @_[1]); |
my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]); |
443 |
|
|
444 |
my $result = $conn->exec("BEGIN"); |
my $result = $sconn->exec("BEGIN"); |
445 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
446 |
{ |
{ |
447 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
448 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
449 |
return(-1); |
return(-1); |
450 |
} |
} |
451 |
|
|
452 |
$result = $conn->exec("SET CONSTRAINTS ALL DEFERRED"); |
$result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED"); |
453 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
454 |
{ |
{ |
455 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
456 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
457 |
return(-1); |
return(-1); |
458 |
} |
} |
459 |
|
|
463 |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
464 |
" AND pga.attnum = rt.key"; |
" AND pga.attnum = rt.key"; |
465 |
|
|
466 |
$result = $conn->exec($sql); |
$result = $sconn->exec($sql); |
467 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
468 |
{ |
{ |
469 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
470 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
471 |
return(-1); |
return(-1); |
472 |
} |
} |
473 |
%Stables = (); |
%Stables = (); |
474 |
while (my @row = $result->fetchrow) |
while (my @row = $result->fetchrow) |
475 |
{ |
{ |
476 |
# printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3]; |
# printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3]; |
477 |
|
if (ref($onlytables) eq 'HASH') { |
478 |
|
next unless (exists $onlytables->{$row[1]}); |
479 |
|
$onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]}); |
480 |
|
} |
481 |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; |
482 |
} |
} |
483 |
|
|
484 |
|
print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug); |
485 |
|
|
486 |
my $ok = 0; |
my $ok = 0; |
487 |
my $syncid = ''; |
my $syncid = -1; |
488 |
while(<$inpf>) |
while(<$inpf>) |
489 |
{ |
{ |
490 |
$_ =~ s/\n//; |
$_ =~ s/\n//; |
491 |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); |
492 |
|
die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt); |
493 |
if ($cmt ne '--') |
if ($cmt ne '--') |
494 |
{ |
{ |
495 |
printf STDERR "Invalid format\n" unless ($quiet); |
printf STDERR "Invalid format\n" unless ($quiet); |
496 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
497 |
return(-2); |
return(-2); |
498 |
} |
} |
499 |
if ($cmd eq 'DELETE') |
if ($cmd eq 'DELETE') |
500 |
{ |
{ |
501 |
if ($syncid eq '') |
if ($syncid == -1) |
502 |
{ |
{ |
503 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
504 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
505 |
return(-2); |
return(-2); |
506 |
} |
} |
507 |
$result = DoDelete($conn, $inpf, $prm); |
$result = DoDelete($sconn, $inpf, $prm); |
508 |
if ($result) |
if ($result) |
509 |
{ |
{ |
510 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
511 |
return($result); |
return($result); |
512 |
} |
} |
513 |
} |
} |
514 |
elsif ($cmd eq 'INSERT') |
elsif ($cmd eq 'INSERT') |
515 |
{ |
{ |
516 |
if ($syncid eq '') |
if ($syncid == -1) |
517 |
{ |
{ |
518 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
519 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
520 |
return(-2); |
return(-2); |
521 |
} |
} |
522 |
$result = DoInsert($conn, $inpf, $prm); |
$result = DoInsert($sconn, $inpf, $prm); |
523 |
if ($result) |
if ($result) |
524 |
{ |
{ |
525 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
526 |
return($result); |
return($result); |
527 |
} |
} |
528 |
} |
} |
529 |
elsif ($cmd eq 'UPDATE') |
elsif ($cmd eq 'UPDATE') |
530 |
{ |
{ |
531 |
if ($syncid eq '') |
if ($syncid == -1) |
532 |
{ |
{ |
533 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
534 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
535 |
return(-2); |
return(-2); |
536 |
} |
} |
537 |
$result = DoUpdate($conn, $inpf, $prm); |
$result = DoUpdate($sconn, $inpf, $prm); |
538 |
if ($result) |
if ($result) |
539 |
{ |
{ |
540 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
541 |
return($result); |
return($result); |
542 |
} |
} |
543 |
} |
} |
544 |
elsif ($cmd eq 'SYNCID') |
elsif ($cmd eq 'SYNCID') |
545 |
{ |
{ |
546 |
if ($syncid ne '') |
if ($syncid != -1) |
547 |
{ |
{ |
548 |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
549 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
550 |
return(-2); |
return(-2); |
551 |
} |
} |
552 |
if ($prm !~ /^\d+$/) |
if ($prm !~ /^\d+$/) |
553 |
{ |
{ |
554 |
printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); |
printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); |
555 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
556 |
return(-2); |
return(-2); |
557 |
} |
} |
558 |
$syncid = $prm; |
$syncid = $prm; |
559 |
|
|
560 |
printf STDERR "Sync ID $syncid\n" unless ($quiet); |
printf STDERR "Sync ID $syncid\n" unless ($quiet); |
561 |
|
|
562 |
$result = $conn->exec("select syncid, synctime from " . |
$result = $sconn->exec("select syncid, synctime from " . |
563 |
"_RSERV_SLAVE_SYNC_ where syncid = " . |
"_RSERV_SLAVE_SYNC_ where syncid = " . |
564 |
"(select max(syncid) from _RSERV_SLAVE_SYNC_)"); |
"(select max(syncid) from _RSERV_SLAVE_SYNC_)"); |
565 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
566 |
{ |
{ |
567 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
568 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
569 |
return(-1); |
return(-1); |
570 |
} |
} |
571 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
572 |
if (! defined $row[0]) |
if (! defined $row[0]) |
573 |
{ |
{ |
574 |
$result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ". |
$result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ". |
575 |
"(syncid, synctime) values ($syncid, now())"); |
"(syncid, synctime) values ($syncid, now())"); |
576 |
} |
} |
577 |
elsif ($row[0] >= $prm) |
elsif ($row[0] >= $prm) |
578 |
{ |
{ |
579 |
printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); |
printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); |
580 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
581 |
return(0); |
return(0); |
582 |
} |
} |
583 |
else |
else |
584 |
{ |
{ |
585 |
$result = $conn->exec("update _RSERV_SLAVE_SYNC_" . |
$result = $sconn->exec("update _RSERV_SLAVE_SYNC_" . |
586 |
" set syncid = $syncid, synctime = now()"); |
" set syncid = $syncid, synctime = now()"); |
587 |
} |
} |
588 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
589 |
{ |
{ |
590 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
591 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
592 |
return(-1); |
return(-1); |
593 |
} |
} |
594 |
} |
} |
600 |
elsif ($cmd eq 'ERROR') |
elsif ($cmd eq 'ERROR') |
601 |
{ |
{ |
602 |
printf STDERR "ERROR signaled\n" unless ($quiet); |
printf STDERR "ERROR signaled\n" unless ($quiet); |
603 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
604 |
return(-2); |
return(-2); |
605 |
} |
} |
606 |
else |
else |
607 |
{ |
{ |
608 |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
609 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
610 |
return(-2); |
return(-2); |
611 |
} |
} |
612 |
} |
} |
614 |
if (! $ok) |
if (! $ok) |
615 |
{ |
{ |
616 |
printf STDERR "No OK flag in input\n" unless ($quiet); |
printf STDERR "No OK flag in input\n" unless ($quiet); |
617 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
618 |
return(-2); |
return(-2); |
619 |
} |
} |
620 |
|
|
621 |
$result = $conn->exec("COMMIT"); |
$result = $sconn->exec("COMMIT"); |
622 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
623 |
{ |
{ |
624 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
625 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
626 |
return(-1); |
return(-1); |
627 |
} |
} |
628 |
|
|
631 |
|
|
632 |
sub DoDelete |
sub DoDelete |
633 |
{ |
{ |
634 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
635 |
|
|
636 |
# only delete tables that the slave wants |
# only delete tables that the slave wants |
637 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
665 |
|
|
666 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
667 |
|
|
668 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
669 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
670 |
{ |
{ |
671 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
672 |
return(-1); |
return(-1); |
673 |
} |
} |
674 |
} |
} |
685 |
|
|
686 |
sub DoUpdate |
sub DoUpdate |
687 |
{ |
{ |
688 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
689 |
|
|
690 |
# only update the tables that the slave wants |
# only update the tables that the slave wants |
691 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
707 |
my $sql = "select attnum, attname from pg_attribute" . |
my $sql = "select attnum, attname from pg_attribute" . |
708 |
" where attrelid = $Stables{$tabname}->[0] AND attnum > 0"; |
" where attrelid = $Stables{$tabname}->[0] AND attnum > 0"; |
709 |
|
|
710 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
711 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
712 |
{ |
{ |
713 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
714 |
return(-1); |
return(-1); |
715 |
} |
} |
716 |
|
|
785 |
|
|
786 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
787 |
|
|
788 |
$result = $conn->exec($sql); |
$result = $sconn->exec($sql); |
789 |
|
|
790 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
791 |
{ |
{ |
792 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
793 |
return(-1); |
return(-1); |
794 |
} |
} |
795 |
next if $result->cmdTuples == 1; # updated |
next if $result->cmdTuples == 1; # updated |
806 |
|
|
807 |
if ($CBufLen >= $CBufMax) |
if ($CBufLen >= $CBufMax) |
808 |
{ |
{ |
809 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
$result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
810 |
return($result) if $result; |
return($result) if $result; |
811 |
@CopyBuf = (); |
@CopyBuf = (); |
812 |
$CBufLen = 0; |
$CBufLen = 0; |
822 |
if ($CBufLen) |
if ($CBufLen) |
823 |
{ |
{ |
824 |
print "@CopyBuf\n" if $debug; |
print "@CopyBuf\n" if $debug; |
825 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
$result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
826 |
return($result) if $result; |
return($result) if $result; |
827 |
} |
} |
828 |
|
|
831 |
|
|
832 |
sub DoInsert |
sub DoInsert |
833 |
{ |
{ |
834 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
835 |
|
|
836 |
# only insert rows into tables that the slave wants |
# only insert rows into tables that the slave wants |
837 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
873 |
|
|
874 |
if ($CBufLen >= $CBufMax) |
if ($CBufLen >= $CBufMax) |
875 |
{ |
{ |
876 |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
877 |
return($result) if $result; |
return($result) if $result; |
878 |
@CopyBuf = (); |
@CopyBuf = (); |
879 |
$CBufLen = 0; |
$CBufLen = 0; |
889 |
if ($CBufLen) |
if ($CBufLen) |
890 |
{ |
{ |
891 |
print "@CopyBuf\n" if $debug; |
print "@CopyBuf\n" if $debug; |
892 |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
893 |
return($result) if $result; |
return($result) if $result; |
894 |
} |
} |
895 |
|
|
899 |
|
|
900 |
sub DoCopy |
sub DoCopy |
901 |
{ |
{ |
902 |
my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); |
my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); |
903 |
|
|
904 |
my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . |
my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . |
905 |
"FROM STDIN"; |
"FROM STDIN"; |
906 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
907 |
if ($result->resultStatus ne PGRES_COPY_IN) |
if ($result->resultStatus ne PGRES_COPY_IN) |
908 |
{ |
{ |
909 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
910 |
return(-1); |
return(-1); |
911 |
} |
} |
912 |
|
|
913 |
foreach my $str (@{$CBuf}) |
foreach my $str (@{$CBuf}) |
914 |
{ |
{ |
915 |
$conn->putline($str); |
$sconn->putline($str); |
916 |
} |
} |
917 |
|
|
918 |
$conn->putline("\\.\n"); |
$sconn->putline("\\.\n"); |
919 |
|
|
920 |
if ($conn->endcopy) |
if ($sconn->endcopy) |
921 |
{ |
{ |
922 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
923 |
return(-1); |
return(-1); |
924 |
} |
} |
925 |
|
|
932 |
# |
# |
933 |
sub GetSyncID |
sub GetSyncID |
934 |
{ |
{ |
935 |
my ($conn) = @_; # (@_[0]); |
my ($sconn) = @_; # (@_[0]); |
936 |
|
|
937 |
my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); |
my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); |
938 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
939 |
{ |
{ |
940 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
941 |
return(-1); |
return(-1); |
942 |
} |
} |
943 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
950 |
# |
# |
951 |
sub SyncSyncID |
sub SyncSyncID |
952 |
{ |
{ |
953 |
my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); |
my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]); |
954 |
|
|
955 |
my $result = $conn->exec("BEGIN"); |
my $result = $mconn->exec("BEGIN"); |
956 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
957 |
{ |
{ |
958 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
959 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
960 |
return(-1); |
return(-1); |
961 |
} |
} |
962 |
|
|
963 |
$result = $conn->exec("select synctime, status from _RSERV_SYNC_" . |
$result = $mconn->exec("select synctime, status from _RSERV_SYNC_" . |
964 |
" where server = $server AND syncid = $syncid" . |
" where server = $sserver AND syncid = $syncid" . |
965 |
" for update"); |
" for update"); |
966 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
967 |
{ |
{ |
968 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
969 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
970 |
return(-1); |
return(-1); |
971 |
} |
} |
972 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
973 |
if (! defined $row[0]) |
if (! defined $row[0]) |
974 |
{ |
{ |
975 |
printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); |
printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet); |
976 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
977 |
return(0); |
return(0); |
978 |
} |
} |
979 |
if ($row[1] > 0) |
if ($row[1] > 0) |
980 |
{ |
{ |
981 |
printf STDERR "SyncID $syncid for server ". |
printf STDERR "SyncID $syncid for server ". |
982 |
"$server already updated\n" unless ($quiet); |
"$sserver already updated\n" unless ($quiet); |
983 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
984 |
return(0); |
return(0); |
985 |
} |
} |
986 |
$result = $conn->exec("update _RSERV_SYNC_" . |
$result = $mconn->exec("update _RSERV_SYNC_" . |
987 |
" set synctime = now(), status = 1" . |
" set synctime = now(), status = 1" . |
988 |
" where server = $server AND syncid = $syncid"); |
" where server = $sserver AND syncid = $syncid"); |
989 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
990 |
{ |
{ |
991 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
992 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
993 |
return(-1); |
return(-1); |
994 |
} |
} |
995 |
$result = $conn->exec("delete from _RSERV_SYNC_" . |
$result = $mconn->exec("delete from _RSERV_SYNC_" . |
996 |
" where server = $server AND syncid < $syncid"); |
" where server = $sserver AND syncid < $syncid"); |
997 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
998 |
{ |
{ |
999 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
1000 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
1001 |
return(-1); |
return(-1); |
1002 |
} |
} |
1003 |
|
|
1004 |
$result = $conn->exec("COMMIT"); |
$result = $mconn->exec("COMMIT"); |
1005 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
1006 |
{ |
{ |
1007 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
1008 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
1009 |
return(-1); |
return(-1); |
1010 |
} |
} |
1011 |
|
|
1012 |
return(1); |
return(1); |
1013 |
} |
} |
1014 |
|
|
1015 |
|
# stuff moved from perl scripts for better re-use |
1016 |
|
|
1017 |
|
sub Rollback { |
1018 |
|
my $conn = shift @_; |
1019 |
|
|
1020 |
|
print STDERR $conn->errorMessage; |
1021 |
|
$conn->exec("ROLLBACK"); |
1022 |
|
} |
1023 |
|
|
1024 |
|
sub RollbackAndQuit { |
1025 |
|
my $conn = shift @_; |
1026 |
|
|
1027 |
|
Rollback($conn); |
1028 |
|
exit (-1); |
1029 |
|
} |
1030 |
|
|
1031 |
|
sub Connect { |
1032 |
|
my $info = shift @_; |
1033 |
|
|
1034 |
|
print("Connecting to $info\n") if ($debug || $verbose); |
1035 |
|
my $conn = Pg::connectdb($info); |
1036 |
|
if ($conn->status != PGRES_CONNECTION_OK) { |
1037 |
|
print STDERR "Failed opening $info\n"; |
1038 |
|
exit 1; |
1039 |
|
} |
1040 |
|
return $conn; |
1041 |
|
} |
1042 |
|
|
1043 |
|
sub Exec { |
1044 |
|
my $conn = shift @_; |
1045 |
|
my $sql = shift @_; |
1046 |
|
|
1047 |
|
my $result = $conn->exec($sql); |
1048 |
|
print STDERR "$sql\n" if ($debug); |
1049 |
|
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1050 |
|
} |
1051 |
|
|
1052 |
|
sub Exec2 { |
1053 |
|
my $mconn = shift @_; |
1054 |
|
my $sconn = shift @_; |
1055 |
|
my $sql = shift @_; |
1056 |
|
|
1057 |
|
my $result = $mconn->exec($sql); |
1058 |
|
RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1059 |
|
$result = $sconn->exec($sql); |
1060 |
|
RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK); |
1061 |
|
} |
1062 |
|
|
1063 |
|
sub MkInfo { |
1064 |
|
my $db = shift || die "need database name!"; |
1065 |
|
my $host = shift; |
1066 |
|
my $port = shift; |
1067 |
|
my $user = shift; |
1068 |
|
my $password = shift; |
1069 |
|
|
1070 |
|
my $info = "dbname=$db"; |
1071 |
|
$info = "$info host=$host" if (defined($host)); |
1072 |
|
$info = "$info port=$port" if (defined($port)); |
1073 |
|
$info = "$info user=$user" if (defined($user)); |
1074 |
|
$info = "$info password=$password" if (defined($password)); |
1075 |
|
|
1076 |
|
return $info; |
1077 |
|
} |
1078 |
|
|
1079 |
1; |
1; |