/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.8 by dpavlin, Tue Oct 28 20:16:39 2003 UTC revision 1.13 by dpavlin, Sun Nov 2 10:31:45 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14  use strict;  use strict;
15  use Pg;  use Pg;
16    
17  my $debug = 0;  my $debug = 0;
18  my $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21    $debug = 1;
22    $quiet = 0;
23    $verbose = 1;
24    
25  my %Mtables = ();  my %Mtables = ();
26  my %Stables = ();  my %Stables = ();
27    
28  sub GetSlaveId  sub GetServerId
29  {  {
30      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32        print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33    
34      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$Host' AND dbase='$DB'");
36    
37      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
38      {      {
# Line 32  sub GetSlaveId Line 42  sub GetSlaveId
42            
43      if ($result->cmdTuples && $result->cmdTuples > 1)      if ($result->cmdTuples && $result->cmdTuples > 1)
44      {      {
45          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);          printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46          return(-2);          return(-2);
47      }      }
48    
49      my @row = $result->fetchrow;      my @row = $result->fetchrow;
50        
51        print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53      return $row[0];      return $row[0];
54  }  }
55    
56  sub PrepareSnapshot  sub PrepareSnapshot
57  {  {
58      my ($mconn, $sconn, $outf, $sserver, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59    
60    print STDERR "## d: $debug v: $verbose q: $quiet\n";
61    
62        if ($mserver == $sserver) {
63            print STDERR "master and slave numbers are same [$mserver] !\n";
64            return(-1);
65        }
66    
67        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
68    
69      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
70      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
# Line 95  sub PrepareSnapshot Line 116  sub PrepareSnapshot
116                  }                  }
117          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
118      }      }
119        
120        print "Prepare snapshot for tables with oid: ",join(",",keys %Mtables),"\n" if ($debug);
121    
122      # Read last succeeded sync      # Read last succeeded sync
123      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
124          " where server = $sserver AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
# Line 112  sub PrepareSnapshot Line 135  sub PrepareSnapshot
135      }      }
136            
137      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
138        
139        # exclude data which originated from master server
140        my $sel_server = " and l.server = $mserver ";
141    
142      my $sinfo = "";      my $sinfo = "";
143      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
144      {      {
# Line 125  sub PrepareSnapshot Line 151  sub PrepareSnapshot
151            
152      # DELETED rows      # DELETED rows
153      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
154          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
155            
156      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
157            
158      $result = $mconn->exec($sql);      $result = $mconn->exec($sql);
159      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 180  sub PrepareSnapshot Line 206  sub PrepareSnapshot
206    
207          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
208            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
209            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
210              $sel_server;
211                    
212          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
213                    
214          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
215          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 228  sub PrepareSnapshot Line 255  sub PrepareSnapshot
255    
256          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
257            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
258            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
259              $sel_server;
260                    
261          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
262                    
263          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
264          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 450  sub ApplySnapshot Line 478  sub ApplySnapshot
478          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
479      }      }
480    
481        print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
482    
483      my $ok = 0;      my $ok = 0;
484      my $syncid = '';      my $syncid = -1;
485      while(<$inpf>)      while(<$inpf>)
486      {      {
487          $_ =~ s/\n//;          $_ =~ s/\n//;
488          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
489            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
490          if ($cmt ne '--')          if ($cmt ne '--')
491          {          {
492              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
# Line 464  sub ApplySnapshot Line 495  sub ApplySnapshot
495          }          }
496          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
497          {          {
498              if ($syncid eq '')              if ($syncid == -1)
499              {              {
500                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
501                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 479  sub ApplySnapshot Line 510  sub ApplySnapshot
510          }          }
511          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
512          {          {
513              if ($syncid eq '')              if ($syncid == -1)
514              {              {
515                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
516                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 494  sub ApplySnapshot Line 525  sub ApplySnapshot
525          }          }
526          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
527          {          {
528              if ($syncid eq '')              if ($syncid == -1)
529              {              {
530                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
531                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 509  sub ApplySnapshot Line 540  sub ApplySnapshot
540          }          }
541          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
542          {          {
543              if ($syncid ne '')              if ($syncid != -1)
544              {              {
545                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
546                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 978  sub SyncSyncID Line 1009  sub SyncSyncID
1009      return(1);      return(1);
1010  }  }
1011    
1012    # stuff moved from perl scripts for better re-use
1013    
1014    sub Rollback {
1015        my $conn = shift @_;
1016    
1017        print STDERR $conn->errorMessage;
1018        $conn->exec("ROLLBACK");
1019    }
1020    
1021    sub RollbackAndQuit {
1022        my $conn = shift @_;
1023    
1024        Rollback($conn);
1025        exit (-1);
1026    }
1027    
1028    sub Connect {
1029            my $info = shift @_;
1030    
1031            print("Connecting to $info\n") if ($debug || $verbose);
1032            my $conn = Pg::connectdb($info);
1033            if ($conn->status != PGRES_CONNECTION_OK) {
1034                print STDERR "Failed opening $info\n";
1035                exit 1;
1036            }
1037            return $conn;
1038    }
1039    
1040    sub Exec {
1041            my $conn = shift @_;
1042            my $sql = shift @_;
1043    
1044            my $result = $conn->exec($sql);
1045            print STDERR "$sql\n" if ($debug);
1046            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1047    }
1048    
1049    sub Exec2 {
1050            my $mconn = shift @_;
1051            my $sconn = shift @_;
1052            my $sql = shift @_;
1053    
1054            my $result = $mconn->exec($sql);
1055            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1056            $result = $sconn->exec($sql);
1057            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1058    }
1059    
1060    sub MkInfo {
1061            my $db = shift || die "need database name!";
1062            my $host = shift;
1063            my $port = shift;
1064            my $user = shift;
1065            my $password = shift;
1066    
1067            my $info = "dbname=$db";
1068            $info = "$info host=$host" if (defined($host));
1069            $info = "$info port=$port" if (defined($port));
1070            $info = "$info user=$user" if (defined($user));
1071            $info = "$info password=$password" if (defined($password));
1072    
1073            return $info;
1074    }
1075    
1076  1;  1;

Legend:
Removed from v.1.8  
changed lines
  Added in v.1.13

  ViewVC Help
Powered by ViewVC 1.1.26