/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.6 by dpavlin, Tue Oct 28 19:55:25 2003 UTC revision 1.11 by dpavlin, Fri Oct 31 00:07:37 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14  use strict;  use strict;
15  use Pg;  use Pg;
16    
17  my $debug = 0;  my $debug = 0;
18  my $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21  my %Mtables = ();  my %Mtables = ();
22  my %Stables = ();  my %Stables = ();
23    
24  sub GetSlaveId  sub GetServerId
25  {  {
26      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
27    
28        print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
29    
30      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
31                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$Host' AND dbase='$DB'");
32    
33      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
34      {      {
# Line 32  sub GetSlaveId Line 38  sub GetSlaveId
38            
39      if ($result->cmdTuples && $result->cmdTuples > 1)      if ($result->cmdTuples && $result->cmdTuples > 1)
40      {      {
41          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);          printf STDERR "Duplicate host definitions.\n" unless ($quiet);
42          return(-2);          return(-2);
43      }      }
44    
45      my @row = $result->fetchrow;      my @row = $result->fetchrow;
46        
47        print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
48    
49      return $row[0];      return $row[0];
50  }  }
51    
52  sub PrepareSnapshot  sub PrepareSnapshot
53  {  {
54      my ($mconn, $sconn, $outf, $server, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
55    
56        if ($mserver == $sserver) {
57            print STDERR "master and slave numbers are same [$mserver] !\n";
58            return(-1);
59        }
60    
61        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
62    
63      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
64      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
# Line 98  sub PrepareSnapshot Line 113  sub PrepareSnapshot
113            
114      # Read last succeeded sync      # Read last succeeded sync
115      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
116          " where server = $server AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
117          " _RSERV_SYNC_ where server = $server AND status > 0)";          " _RSERV_SYNC_ where server = $sserver AND status > 0)";
118            
119      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
120    
# Line 112  sub PrepareSnapshot Line 127  sub PrepareSnapshot
127      }      }
128            
129      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
130        
131        # exclude data which originated from master server
132        my $sel_server = " and l.server = $mserver ";
133    
134      my $sinfo = "";      my $sinfo = "";
135      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
136      {      {
# Line 125  sub PrepareSnapshot Line 143  sub PrepareSnapshot
143            
144      # DELETED rows      # DELETED rows
145      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
146          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
147            
148      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
149            
150      $result = $mconn->exec($sql);      $result = $mconn->exec($sql);
151      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 137  sub PrepareSnapshot Line 155  sub PrepareSnapshot
155          return(-1);          return(-1);
156      }      }
157            
158      my $lastoid = '';      my $lastoid = -1;
159      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
160      {      {
161          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
# Line 145  sub PrepareSnapshot Line 163  sub PrepareSnapshot
163    
164          if ($lastoid != $row[0])          if ($lastoid != $row[0])
165          {          {
166              if ($lastoid eq '')              if ($lastoid == -1)
167              {              {
168                  my $syncid = GetSYNCID($mconn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
169                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
# Line 166  sub PrepareSnapshot Line 184  sub PrepareSnapshot
184          }          }
185          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
186      }      }
187      printf $outf "\\.\n" if $lastoid ne '';      printf $outf "\\.\n" if ($lastoid != -1);
188            
189      # UPDATED rows      # UPDATED rows
190            
# Line 180  sub PrepareSnapshot Line 198  sub PrepareSnapshot
198    
199          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
200            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
201            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
202              $sel_server;
203                    
204          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
205                    
206          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
207          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 228  sub PrepareSnapshot Line 247  sub PrepareSnapshot
247    
248          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
249            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
250            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
251              $sel_server;
252                    
253          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
254                    
255          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
256          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 273  sub PrepareSnapshot Line 293  sub PrepareSnapshot
293      }      }
294            
295      # Remember this snapshot info      # Remember this snapshot info
296      $result = $mconn->exec("select _rserv_sync_($server)");      $result = $mconn->exec("select _rserv_sync_($sserver)");
297      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
298      {      {
299          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
# Line 451  sub ApplySnapshot Line 471  sub ApplySnapshot
471      }      }
472    
473      my $ok = 0;      my $ok = 0;
474      my $syncid = '';      my $syncid = -1;
475      while(<$inpf>)      while(<$inpf>)
476      {      {
477          $_ =~ s/\n//;          $_ =~ s/\n//;
478          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
479            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
480          if ($cmt ne '--')          if ($cmt ne '--')
481          {          {
482              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
# Line 464  sub ApplySnapshot Line 485  sub ApplySnapshot
485          }          }
486          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
487          {          {
488              if ($syncid eq '')              if ($syncid == -1)
489              {              {
490                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
491                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 479  sub ApplySnapshot Line 500  sub ApplySnapshot
500          }          }
501          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
502          {          {
503              if ($syncid eq '')              if ($syncid == -1)
504              {              {
505                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
506                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 494  sub ApplySnapshot Line 515  sub ApplySnapshot
515          }          }
516          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
517          {          {
518              if ($syncid eq '')              if ($syncid == -1)
519              {              {
520                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
521                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 509  sub ApplySnapshot Line 530  sub ApplySnapshot
530          }          }
531          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
532          {          {
533              if ($syncid ne '')              if ($syncid != -1)
534              {              {
535                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
536                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 916  sub GetSyncID Line 937  sub GetSyncID
937  #  #
938  sub SyncSyncID  sub SyncSyncID
939  {  {
940      my ($mconn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
941            
942      my $result = $mconn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
943      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
# Line 927  sub SyncSyncID Line 948  sub SyncSyncID
948      }      }
949            
950      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
951                            " where server = $server AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
952                            " for update");                            " for update");
953      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
954      {      {
# Line 938  sub SyncSyncID Line 959  sub SyncSyncID
959      my @row = $result->fetchrow;      my @row = $result->fetchrow;
960      if (! defined $row[0])      if (! defined $row[0])
961      {      {
962          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
963          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
964          return(0);          return(0);
965      }      }
966      if ($row[1] > 0)      if ($row[1] > 0)
967      {      {
968          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
969              "$server already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
970          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
971          return(0);          return(0);
972      }      }
973      $result = $mconn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
974                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
975                            " where server = $server AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
976      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
977      {      {
978          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
# Line 959  sub SyncSyncID Line 980  sub SyncSyncID
980          return(-1);          return(-1);
981      }      }
982      $result = $mconn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
983                            " where server = $server AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
984      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
985      {      {
986          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
# Line 978  sub SyncSyncID Line 999  sub SyncSyncID
999      return(1);      return(1);
1000  }  }
1001    
1002    # stuff moved from perl scripts for better re-use
1003    
1004    sub Rollback {
1005        my $conn = shift @_;
1006    
1007        print STDERR $conn->errorMessage;
1008        $conn->exec("ROLLBACK");
1009    }
1010    
1011    sub RollbackAndQuit {
1012        my $conn = shift @_;
1013    
1014        Rollback($conn);
1015        exit (-1);
1016    }
1017    
1018    sub Connect {
1019            my $info = shift @_;
1020    
1021            print("Connecting to $info\n") if ($debug || $verbose);
1022            my $conn = Pg::connectdb($info);
1023            if ($conn->status != PGRES_CONNECTION_OK) {
1024                print STDERR "Failed opening $info\n";
1025                exit 1;
1026            }
1027            return $conn;
1028    }
1029    
1030    sub Exec {
1031            my $conn = shift @_;
1032            my $sql = shift @_;
1033    
1034            my $result = $conn->exec($sql);
1035            print STDERR "$sql\n" if ($debug);
1036            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1037    }
1038    
1039    sub Exec2 {
1040            my $mconn = shift @_;
1041            my $sconn = shift @_;
1042            my $sql = shift @_;
1043    
1044            my $result = $mconn->exec($sql);
1045            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1046            $result = $sconn->exec($sql);
1047            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1048    }
1049    
1050  1;  1;

Legend:
Removed from v.1.6  
changed lines
  Added in v.1.11

  ViewVC Help
Powered by ViewVC 1.1.26