--- share/RServ.pm 2003/10/26 23:43:54 1.5 +++ share/RServ.pm 2003/10/28 19:55:25 1.6 @@ -19,14 +19,14 @@ sub GetSlaveId { - my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); + my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); - my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". + my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". " host='$slaveHost' AND dbase='$slaveDB'"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $mconn->errorMessage unless ($quiet); return(-1); } @@ -43,13 +43,13 @@ sub PrepareSnapshot { - my ($conn, $sconn, $outf, $server, $onlytables) = @_; + my ($mconn, $sconn, $outf, $server, $onlytables) = @_; # first, we must know for wich tables the slave subscribed my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $mconn->errorMessage unless ($quiet); return(-1); } @@ -58,31 +58,31 @@ $Stables{$row[0]} = 1; } - $result = $conn->exec("BEGIN"); + $result = $mconn->exec("BEGIN"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("set transaction isolation level serializable"); + $result = $mconn->exec("set transaction isolation level serializable"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } # MAP oid --> tabname, keyname, key_type - $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . + $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . ", pg_type pgt". " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } @@ -103,11 +103,11 @@ printf "$sql\n" if $debug; - $result = $conn->exec($sql); + $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } @@ -129,11 +129,11 @@ printf "$sql\n" if $debug; - $result = $conn->exec($sql); + $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } @@ -147,7 +147,7 @@ { if ($lastoid eq '') { - my $syncid = GetSYNCID($conn, $outf); + my $syncid = GetSYNCID($mconn, $outf); return($syncid) if $syncid < 0; $havedeal = 1; } @@ -161,7 +161,7 @@ if (! defined $row[1]) { print STDERR "NULL key\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $mconn->exec("ROLLBACK"); return(-2); } printf $outf "%s\n", OutputValue($row[1]); @@ -184,18 +184,18 @@ printf "$sql\n" if $debug; - $result = $conn->exec($sql); + $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { printf $outf "-- ERROR\n" if $havedeal; - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } next if $result->ntuples <= 0; if (! $havedeal) { - my $syncid = GetSYNCID($conn, $outf); + my $syncid = GetSYNCID($mconn, $outf); return($syncid) if $syncid < 0; $havedeal = 1; } @@ -232,18 +232,18 @@ printf "$sql\n" if $debug; - $result = $conn->exec($sql); + $result = $mconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { printf $outf "-- ERROR\n" if $havedeal; - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } next if $result->ntuples <= 0; if (! $havedeal) { - my $syncid = GetSYNCID($conn, $outf); + my $syncid = GetSYNCID($mconn, $outf); return($syncid) if $syncid < 0; $havedeal = 1; } @@ -268,26 +268,26 @@ unless ($havedeal) { - $conn->exec("ROLLBACK"); + $mconn->exec("ROLLBACK"); return(0); } # Remember this snapshot info - $result = $conn->exec("select _rserv_sync_($server)"); + $result = $mconn->exec("select _rserv_sync_($server)"); if ($result->resultStatus ne PGRES_TUPLES_OK) { printf $outf "-- ERROR\n"; - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("COMMIT"); + $result = $mconn->exec("COMMIT"); if ($result->resultStatus ne PGRES_COMMAND_OK) { printf $outf "-- ERROR\n"; - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } printf $outf "-- OK\n"; @@ -408,21 +408,21 @@ sub ApplySnapshot { - my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]); + my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]); - my $result = $conn->exec("BEGIN"); + my $result = $sconn->exec("BEGIN"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED"); + $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } @@ -432,11 +432,11 @@ " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . " AND pga.attnum = rt.key"; - $result = $conn->exec($sql); + $result = $sconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } %Stables = (); @@ -459,7 +459,7 @@ if ($cmt ne '--') { printf STDERR "Invalid format\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } if ($cmd eq 'DELETE') @@ -467,13 +467,13 @@ if ($syncid eq '') { printf STDERR "Sync ID unspecified\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } - $result = DoDelete($conn, $inpf, $prm); + $result = DoDelete($sconn, $inpf, $prm); if ($result) { - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return($result); } } @@ -482,13 +482,13 @@ if ($syncid eq '') { printf STDERR "Sync ID unspecified\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } - $result = DoInsert($conn, $inpf, $prm); + $result = DoInsert($sconn, $inpf, $prm); if ($result) { - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return($result); } } @@ -497,13 +497,13 @@ if ($syncid eq '') { printf STDERR "Sync ID unspecified\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } - $result = DoUpdate($conn, $inpf, $prm); + $result = DoUpdate($sconn, $inpf, $prm); if ($result) { - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return($result); } } @@ -512,49 +512,49 @@ if ($syncid ne '') { printf STDERR "Second Sync ID ?!\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } if ($prm !~ /^\d+$/) { printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } $syncid = $prm; printf STDERR "Sync ID $syncid\n" unless ($quiet); - $result = $conn->exec("select syncid, synctime from " . + $result = $sconn->exec("select syncid, synctime from " . "_RSERV_SLAVE_SYNC_ where syncid = " . "(select max(syncid) from _RSERV_SLAVE_SYNC_)"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } my @row = $result->fetchrow; if (! defined $row[0]) { - $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ". + $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ". "(syncid, synctime) values ($syncid, now())"); } elsif ($row[0] >= $prm) { printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(0); } else { - $result = $conn->exec("update _RSERV_SLAVE_SYNC_" . + $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" . " set syncid = $syncid, synctime = now()"); } if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } } @@ -566,13 +566,13 @@ elsif ($cmd eq 'ERROR') { printf STDERR "ERROR signaled\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } else { printf STDERR "Unknown command $cmd\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } } @@ -580,15 +580,15 @@ if (! $ok) { printf STDERR "No OK flag in input\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $sconn->exec("ROLLBACK"); return(-2); } - $result = $conn->exec("COMMIT"); + $result = $sconn->exec("COMMIT"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $sconn->errorMessage unless ($quiet); + $sconn->exec("ROLLBACK"); return(-1); } @@ -597,7 +597,7 @@ sub DoDelete { - my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); + my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); # only delete tables that the slave wants if (! defined($Stables{$tabname})) { @@ -631,10 +631,10 @@ printf "$sql\n" if $debug; - my $result = $conn->exec($sql); + my $result = $sconn->exec($sql); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } } @@ -651,7 +651,7 @@ sub DoUpdate { - my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); + my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); # only update the tables that the slave wants if (! defined($Stables{$tabname})) { @@ -673,10 +673,10 @@ my $sql = "select attnum, attname from pg_attribute" . " where attrelid = $Stables{$tabname}->[0] AND attnum > 0"; - my $result = $conn->exec($sql); + my $result = $sconn->exec($sql); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } @@ -751,11 +751,11 @@ printf "$sql\n" if $debug; - $result = $conn->exec($sql); + $result = $sconn->exec($sql); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } next if $result->cmdTuples == 1; # updated @@ -772,7 +772,7 @@ if ($CBufLen >= $CBufMax) { - $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); return($result) if $result; @CopyBuf = (); $CBufLen = 0; @@ -788,7 +788,7 @@ if ($CBufLen) { print "@CopyBuf\n" if $debug; - $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); return($result) if $result; } @@ -797,7 +797,7 @@ sub DoInsert { - my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); + my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); # only insert rows into tables that the slave wants if (! defined($Stables{$tabname})) { @@ -839,7 +839,7 @@ if ($CBufLen >= $CBufMax) { - my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); return($result) if $result; @CopyBuf = (); $CBufLen = 0; @@ -855,7 +855,7 @@ if ($CBufLen) { print "@CopyBuf\n" if $debug; - my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); return($result) if $result; } @@ -865,27 +865,27 @@ sub DoCopy { - my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); + my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . "FROM STDIN"; - my $result = $conn->exec($sql); + my $result = $sconn->exec($sql); if ($result->resultStatus ne PGRES_COPY_IN) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } foreach my $str (@{$CBuf}) { - $conn->putline($str); + $sconn->putline($str); } - $conn->putline("\\.\n"); + $sconn->putline("\\.\n"); - if ($conn->endcopy) + if ($sconn->endcopy) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } @@ -898,12 +898,12 @@ # sub GetSyncID { - my ($conn) = @_; # (@_[0]); + my ($sconn) = @_; # (@_[0]); - my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); + my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); + print STDERR $sconn->errorMessage unless ($quiet); return(-1); } my @row = $result->fetchrow; @@ -916,62 +916,62 @@ # sub SyncSyncID { - my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); + my ($mconn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); - my $result = $conn->exec("BEGIN"); + my $result = $mconn->exec("BEGIN"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("select synctime, status from _RSERV_SYNC_" . + $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" . " where server = $server AND syncid = $syncid" . " for update"); if ($result->resultStatus ne PGRES_TUPLES_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } my @row = $result->fetchrow; if (! defined $row[0]) { printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $mconn->exec("ROLLBACK"); return(0); } if ($row[1] > 0) { printf STDERR "SyncID $syncid for server ". "$server already updated\n" unless ($quiet); - $conn->exec("ROLLBACK"); + $mconn->exec("ROLLBACK"); return(0); } - $result = $conn->exec("update _RSERV_SYNC_" . + $result = $mconn->exec("update _RSERV_SYNC_" . " set synctime = now(), status = 1" . " where server = $server AND syncid = $syncid"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("delete from _RSERV_SYNC_" . + $result = $mconn->exec("delete from _RSERV_SYNC_" . " where server = $server AND syncid < $syncid"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); } - $result = $conn->exec("COMMIT"); + $result = $mconn->exec("COMMIT"); if ($result->resultStatus ne PGRES_COMMAND_OK) { - print STDERR $conn->errorMessage unless ($quiet); - $conn->exec("ROLLBACK"); + print STDERR $mconn->errorMessage unless ($quiet); + $mconn->exec("ROLLBACK"); return(-1); }