132 |
$sinfo .= ")"; |
$sinfo .= ")"; |
133 |
} |
} |
134 |
|
|
|
my @keys; # keys in this snapshot |
|
|
|
|
135 |
my $havedeal = 0; |
my $havedeal = 0; |
136 |
|
|
137 |
# DELETED rows |
# DELETED rows |
169 |
return(-2); |
return(-2); |
170 |
} |
} |
171 |
printf $outf "%s\n", OutputValue($row[1]); |
printf $outf "%s\n", OutputValue($row[1]); |
|
push @keys,OutputKey($row[2],$Mtables{$row[0]}[2]); |
|
172 |
} |
} |
173 |
printf $outf "\\.\n" if ($lastoid != -1); |
printf $outf "\\.\n" if ($lastoid != -1); |
174 |
|
|
182 |
|
|
183 |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
184 |
|
|
185 |
$sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\",$oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
186 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". |
187 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
188 |
$sel_server; |
$sel_server; |
208 |
printf "-- UPDATE $tabname\n" if $debug; |
printf "-- UPDATE $tabname\n" if $debug; |
209 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
210 |
{ |
{ |
|
push @keys,OutputKey(shift @row,$keytype); |
|
211 |
for (my $i = 0; $i <= $#row; $i++) |
for (my $i = 0; $i <= $#row; $i++) |
212 |
{ |
{ |
213 |
printf $outf " " if $i; |
printf $outf " " if $i; |
231 |
|
|
232 |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; |
233 |
|
|
234 |
$sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\", $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
$sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". |
235 |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
"\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". |
236 |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
" $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". |
237 |
$sel_server; |
$sel_server; |
257 |
printf "-- INSERT $tabname\n" if $debug; |
printf "-- INSERT $tabname\n" if $debug; |
258 |
while (@row = $result->fetchrow) |
while (@row = $result->fetchrow) |
259 |
{ |
{ |
|
push @keys,OutputKey(shift @row,$keytype); |
|
260 |
for (my $i = 0; $i <= $#row; $i++) |
for (my $i = 0; $i <= $#row; $i++) |
261 |
{ |
{ |
262 |
printf $outf " " if $i; |
printf $outf " " if $i; |
298 |
return(-1); |
return(-1); |
299 |
} |
} |
300 |
|
|
|
if ($multimaster) { |
|
|
# save keys |
|
|
# my $key_out = "-- KEYS ".($#keys+1)."\n(\"key\" like ".join(") or (\"key\" like ",@keys).")\n"; |
|
|
my $key_out = "-- KEYS ".($#keys+1)."\n".join(",",@keys)."\n"; |
|
|
print $outf $key_out; |
|
|
print $key_out if ($debug); |
|
|
} |
|
|
|
|
301 |
printf $outf "-- OK\n"; |
printf $outf "-- OK\n"; |
302 |
printf "-- OK\n" if $debug; |
printf "-- OK\n" if $debug; |
303 |
|
|
319 |
return($val); |
return($val); |
320 |
} |
} |
321 |
|
|
|
sub OutputKey { |
|
|
my $val = shift; |
|
|
my $cast = shift || ''; |
|
|
# $cast = "::$cast" if ($cast); |
|
|
$cast = "::text"; |
|
|
# $cast = ""; |
|
|
|
|
|
return "null" if (! defined($val)); |
|
|
|
|
|
print STDERR "Key: ${val}${cast}\n" if ($debug); |
|
|
|
|
|
if ($val =~ m/^\d+$/) { |
|
|
return "${val}${cast}"; |
|
|
} else { |
|
|
return "'$val'${cast}"; |
|
|
} |
|
|
} |
|
|
|
|
322 |
# Get syncid for new snapshot |
# Get syncid for new snapshot |
323 |
sub GetSYNCID |
sub GetSYNCID |
324 |
{ |
{ |
460 |
|
|
461 |
print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug); |
print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug); |
462 |
|
|
|
# save keys from snapshot because we want to update _rserv_log_ with |
|
|
# correct source server later... |
|
|
my @keys; |
|
|
my $keys_sql; |
|
|
|
|
463 |
my $ok = 0; |
my $ok = 0; |
464 |
my $syncid = -1; |
my $syncid = -1; |
465 |
while(<$inpf>) { |
while(<$inpf>) { |
554 |
} |
} |
555 |
} elsif ($cmd eq 'OK') { |
} elsif ($cmd eq 'OK') { |
556 |
$ok = 1; |
$ok = 1; |
557 |
|
if ($multimaster) { |
558 |
|
# now, update server in _rserv_log_ based on transaction xid |
559 |
|
ExecFatch($sconn,"select count(*) from _rserv_log_"); |
560 |
|
ExecDebug($sconn,"select * from _rserv_log_"); |
561 |
|
my $keys_sql = qq{ |
562 |
|
update _rserv_log_ set server=$serverId |
563 |
|
where logid = (select _rserv_xid_()) |
564 |
|
}; |
565 |
|
|
566 |
|
Exec($sconn,$keys_sql); |
567 |
|
} |
568 |
last; |
last; |
569 |
} elsif ($cmd eq 'ERROR') { |
} elsif ($cmd eq 'ERROR') { |
570 |
printf STDERR "ERROR signaled\n" unless ($quiet); |
printf STDERR "ERROR signaled\n" unless ($quiet); |
578 |
} |
} |
579 |
$serverId = $prm; |
$serverId = $prm; |
580 |
print STDERR "Server ID $serverId\n" unless ($quiet); |
print STDERR "Server ID $serverId\n" unless ($quiet); |
|
} elsif ($cmd eq 'KEYS') { |
|
|
if ($prm !~ /^\d+$/) { |
|
|
printf STDERR "Invalid numer of keys $prm\n" unless ($quiet); |
|
|
$sconn->exec("ROLLBACK"); |
|
|
return(-2); |
|
|
} |
|
|
my $keys = <$inpf>; |
|
|
chomp($keys); |
|
|
if ($multimaster) { |
|
|
ExecFatch($sconn,"select count(*) from _rserv_log_"); |
|
|
ExecDebug($sconn,"select * from _rserv_log_"); |
|
|
my ($logid) = ExecFatch($sconn,"select distinct logid from _rserv_log_ where key in ($keys)"); |
|
|
$keys_sql = qq{ |
|
|
update _rserv_log_ set server=$serverId |
|
|
where key in ($keys) |
|
|
}; |
|
|
$keys_sql = qq{ |
|
|
update _rserv_log_ set server=$serverId |
|
|
where logid = $logid |
|
|
}; |
|
|
|
|
|
print STDERR "$keys_sql\n" if ($debug); |
|
|
$result = $sconn->exec($keys_sql); |
|
|
ExecDebug($sconn,"explain analyze $keys_sql"); |
|
|
print STDERR "expected $prm updates, got ",$result->ntuples,"\n" if ($result->ntuples != $prm); |
|
|
# if ($result->resultStatus ne PGRES_COMMAND_OK || $result->ntuples != $prm) { |
|
|
if (0) { |
|
|
print STDERR "FATAL: Cannot update source server in _rserv_log_: ",$sconn->errorMessage,"\n"; |
|
|
$sconn->exec("ROLLBACK"); |
|
|
return(-1); |
|
|
} |
|
|
} |
|
581 |
} else { |
} else { |
582 |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
583 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
585 |
} |
} |
586 |
} |
} |
587 |
|
|
|
ExecFatch($sconn,"select count(*) from _rserv_log_"); |
|
588 |
if (! $ok) { |
if (! $ok) { |
589 |
printf STDERR "No OK flag in input\n" unless ($quiet); |
printf STDERR "No OK flag in input\n" unless ($quiet); |
590 |
$sconn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
1009 |
# XXX TODO: return results?! |
# XXX TODO: return results?! |
1010 |
} |
} |
1011 |
|
|
1012 |
|
# exec sql query and return one row from it |
1013 |
sub ExecFatch { |
sub ExecFatch { |
1014 |
my $conn = shift || die "ExecFatch need conn!"; |
my $conn = shift || die "ExecFatch need conn!"; |
1015 |
my $sql = shift || die "ExecFatch need SQL!"; |
my $sql = shift || die "ExecFatch need SQL!"; |
1016 |
|
|
1017 |
print STDERR "ExecFatch: $sql\n" if ($debug); |
if ($debug) { |
1018 |
|
# re-format SQL in one line (for nicer output) |
1019 |
|
$sql =~ s/[\s\n\r]+/ /gs; |
1020 |
|
print STDERR "Exec: $sql\n"; |
1021 |
|
} |
1022 |
|
|
1023 |
my $result = $conn->exec($sql); |
my $result = $conn->exec($sql); |
1024 |
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK); |
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK); |
1030 |
return @row; |
return @row; |
1031 |
} |
} |
1032 |
|
|
1033 |
|
# exec sql query and dump all rows retured to STDERR (great for debugging) |
1034 |
sub ExecDebug { |
sub ExecDebug { |
1035 |
|
return if (! $debug); |
1036 |
|
|
1037 |
my $conn = shift || die "ExecDebug need conn!"; |
my $conn = shift || die "ExecDebug need conn!"; |
1038 |
my $sql = shift || die "ExecDebug need SQL!"; |
my $sql = shift || die "ExecDebug need SQL!"; |
1039 |
|
|
1040 |
print STDERR "ExecDebug: $sql\n" if ($debug); |
if ($debug) { |
1041 |
|
# re-format SQL in one line (for nicer output) |
1042 |
|
$sql =~ s/[\s\n\r]+/ /gs; |
1043 |
|
print STDERR "Exec: $sql\n"; |
1044 |
|
} |
1045 |
|
|
1046 |
my $result = $conn->exec($sql); |
my $result = $conn->exec($sql); |
1047 |
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK); |
RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK); |