--- share/RServ.pm 2003/10/28 19:55:25 1.6 +++ share/RServ.pm 2003/10/31 00:07:37 1.11 @@ -6,23 +6,29 @@ require Exporter; @ISA = qw(Exporter); -@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId); +@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId + Rollback RollbackAndQuit Connect Exec Exec2 + $debug $quiet $verbose + ); @EXPORT_OK = qw(); use strict; use Pg; my $debug = 0; my $quiet = 1; +my $verbose = 0; my %Mtables = (); my %Stables = (); -sub GetSlaveId +sub GetServerId { - my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); + my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]); + + print STDERR "GetServerId: host $Host, database $DB\n" if ($debug); my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". - " host='$slaveHost' AND dbase='$slaveDB'"); + " host='$Host' AND dbase='$DB'"); if ($result->resultStatus ne PGRES_TUPLES_OK) { @@ -32,18 +38,27 @@ if ($result->cmdTuples && $result->cmdTuples > 1) { - printf STDERR "Duplicate slave definitions.\n" unless ($quiet); + printf STDERR "Duplicate host definitions.\n" unless ($quiet); return(-2); } my @row = $result->fetchrow; - + + print "GetServerId($DB,$Host) == $row[0]\n" if ($debug); + return $row[0]; } sub PrepareSnapshot { - my ($mconn, $sconn, $outf, $server, $onlytables) = @_; + my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_; + + if ($mserver == $sserver) { + print STDERR "master and slave numbers are same [$mserver] !\n"; + return(-1); + } + + print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug); # first, we must know for wich tables the slave subscribed my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); @@ -98,8 +113,8 @@ # Read last succeeded sync my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . - " where server = $server AND syncid = (select max(syncid) from" . - " _RSERV_SYNC_ where server = $server AND status > 0)"; + " where server = $sserver AND syncid = (select max(syncid) from" . + " _RSERV_SYNC_ where server = $sserver AND status > 0)"; printf "$sql\n" if $debug; @@ -112,7 +127,10 @@ } my @lastsync = $result->fetchrow; - + + # exclude data which originated from master server + my $sel_server = " and l.server = $mserver "; + my $sinfo = ""; if (@lastsync && $lastsync[3] ne '') # sync info { @@ -125,9 +143,9 @@ # DELETED rows $sql = "select l.reloid, l.key from _RSERV_LOG_ l" . - " where l.delete = 1 $sinfo order by l.reloid"; + " where l.delete = 1 $sinfo $sel_server order by l.reloid"; - printf "$sql\n" if $debug; + printf "DELETED: $sql\n" if $debug; $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) @@ -137,7 +155,7 @@ return(-1); } - my $lastoid = ''; + my $lastoid = -1; while (@row = $result->fetchrow) { next unless exists $Mtables{$row[0]}; @@ -145,7 +163,7 @@ if ($lastoid != $row[0]) { - if ($lastoid eq '') + if ($lastoid == -1) { my $syncid = GetSYNCID($mconn, $outf); return($syncid) if $syncid < 0; @@ -166,7 +184,7 @@ } printf $outf "%s\n", OutputValue($row[1]); } - printf $outf "\\.\n" if $lastoid ne ''; + printf $outf "\\.\n" if ($lastoid != -1); # UPDATED rows @@ -180,9 +198,10 @@ $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1". - " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; + " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". + $sel_server; - printf "$sql\n" if $debug; + printf "UPDATED: $sql\n" if $debug; $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) @@ -228,9 +247,10 @@ $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ". "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1". - " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}"; + " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}". + $sel_server; - printf "$sql\n" if $debug; + printf "INSERTED: $sql\n" if $debug; $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) @@ -273,7 +293,7 @@ } # Remember this snapshot info - $result = $mconn->exec("select _rserv_sync_($server)"); + $result = $mconn->exec("select _rserv_sync_($sserver)"); if ($result->resultStatus ne PGRES_TUPLES_OK) { printf $outf "-- ERROR\n"; @@ -451,11 +471,12 @@ } my $ok = 0; - my $syncid = ''; + my $syncid = -1; while(<$inpf>) { $_ =~ s/\n//; my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); + die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt); if ($cmt ne '--') { printf STDERR "Invalid format\n" unless ($quiet); @@ -464,7 +485,7 @@ } if ($cmd eq 'DELETE') { - if ($syncid eq '') + if ($syncid == -1) { printf STDERR "Sync ID unspecified\n" unless ($quiet); $sconn->exec("ROLLBACK"); @@ -479,7 +500,7 @@ } elsif ($cmd eq 'INSERT') { - if ($syncid eq '') + if ($syncid == -1) { printf STDERR "Sync ID unspecified\n" unless ($quiet); $sconn->exec("ROLLBACK"); @@ -494,7 +515,7 @@ } elsif ($cmd eq 'UPDATE') { - if ($syncid eq '') + if ($syncid == -1) { printf STDERR "Sync ID unspecified\n" unless ($quiet); $sconn->exec("ROLLBACK"); @@ -509,7 +530,7 @@ } elsif ($cmd eq 'SYNCID') { - if ($syncid ne '') + if ($syncid != -1) { printf STDERR "Second Sync ID ?!\n" unless ($quiet); $sconn->exec("ROLLBACK"); @@ -916,7 +937,7 @@ # sub SyncSyncID { - my ($mconn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); + my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]); my $result = $mconn->exec("BEGIN"); if ($result->resultStatus ne PGRES_COMMAND_OK) @@ -927,7 +948,7 @@ } $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" . - " where server = $server AND syncid = $syncid" . + " where server = $sserver AND syncid = $syncid" . " for update"); if ($result->resultStatus ne PGRES_TUPLES_OK) { @@ -938,20 +959,20 @@ my @row = $result->fetchrow; if (! defined $row[0]) { - printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); + printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet); $mconn->exec("ROLLBACK"); return(0); } if ($row[1] > 0) { printf STDERR "SyncID $syncid for server ". - "$server already updated\n" unless ($quiet); + "$sserver already updated\n" unless ($quiet); $mconn->exec("ROLLBACK"); return(0); } $result = $mconn->exec("update _RSERV_SYNC_" . " set synctime = now(), status = 1" . - " where server = $server AND syncid = $syncid"); + " where server = $sserver AND syncid = $syncid"); if ($result->resultStatus ne PGRES_COMMAND_OK) { print STDERR $mconn->errorMessage unless ($quiet); @@ -959,7 +980,7 @@ return(-1); } $result = $mconn->exec("delete from _RSERV_SYNC_" . - " where server = $server AND syncid < $syncid"); + " where server = $sserver AND syncid < $syncid"); if ($result->resultStatus ne PGRES_COMMAND_OK) { print STDERR $mconn->errorMessage unless ($quiet); @@ -978,4 +999,52 @@ return(1); } +# stuff moved from perl scripts for better re-use + +sub Rollback { + my $conn = shift @_; + + print STDERR $conn->errorMessage; + $conn->exec("ROLLBACK"); +} + +sub RollbackAndQuit { + my $conn = shift @_; + + Rollback($conn); + exit (-1); +} + +sub Connect { + my $info = shift @_; + + print("Connecting to $info\n") if ($debug || $verbose); + my $conn = Pg::connectdb($info); + if ($conn->status != PGRES_CONNECTION_OK) { + print STDERR "Failed opening $info\n"; + exit 1; + } + return $conn; +} + +sub Exec { + my $conn = shift @_; + my $sql = shift @_; + + my $result = $conn->exec($sql); + print STDERR "$sql\n" if ($debug); + RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); +} + +sub Exec2 { + my $mconn = shift @_; + my $sconn = shift @_; + my $sql = shift @_; + + my $result = $mconn->exec($sql); + RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK); + $result = $sconn->exec($sql); + RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK); +} + 1;