/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.8 by dpavlin, Tue Oct 28 20:16:39 2003 UTC revision 1.14 by dpavlin, Sun Nov 2 11:29:09 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14  use strict;  use strict;
15  use Pg;  use Pg;
16    
17  my $debug = 0;  my $debug = 0;
18  my $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21    $debug = 1;
22    $quiet = 0;
23    $verbose = 1;
24    
25  my %Mtables = ();  my %Mtables = ();
26  my %Stables = ();  my %Stables = ();
27    
28  sub GetSlaveId  sub GetServerId
29  {  {
30      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32        print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33    
34      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$Host' AND dbase='$DB'");
36    
37      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
38      {      {
# Line 32  sub GetSlaveId Line 42  sub GetSlaveId
42            
43      if ($result->cmdTuples && $result->cmdTuples > 1)      if ($result->cmdTuples && $result->cmdTuples > 1)
44      {      {
45          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);          printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46          return(-2);          return(-2);
47      }      }
48    
49      my @row = $result->fetchrow;      my @row = $result->fetchrow;
50        
51        print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53      return $row[0];      return $row[0];
54  }  }
55    
56  sub PrepareSnapshot  sub PrepareSnapshot
57  {  {
58      my ($mconn, $sconn, $outf, $sserver, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59    
60    print STDERR "## d: $debug v: $verbose q: $quiet\n";
61    
62        if ($mserver == $sserver) {
63            print STDERR "master and slave numbers are same [$mserver] !\n";
64            return(-1);
65        }
66    
67        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
68    
69      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
70      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
# Line 58  sub PrepareSnapshot Line 79  sub PrepareSnapshot
79          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
80      }      }
81            
82        print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
83    
84      $result = $mconn->exec("BEGIN");      $result = $mconn->exec("BEGIN");
85      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
86      {      {
# Line 95  sub PrepareSnapshot Line 118  sub PrepareSnapshot
118                  }                  }
119          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
120      }      }
121        
122        print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
123    
124      # Read last succeeded sync      # Read last succeeded sync
125      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
126          " where server = $sserver AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
# Line 112  sub PrepareSnapshot Line 137  sub PrepareSnapshot
137      }      }
138            
139      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
140            print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
141    
142        # exclude data which originated from master server
143        my $sel_server = " and l.server = $mserver ";
144    
145      my $sinfo = "";      my $sinfo = "";
146      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
147      {      {
# Line 125  sub PrepareSnapshot Line 154  sub PrepareSnapshot
154            
155      # DELETED rows      # DELETED rows
156      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
157          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
158            
159      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
160            
161      $result = $mconn->exec($sql);      $result = $mconn->exec($sql);
162      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 180  sub PrepareSnapshot Line 209  sub PrepareSnapshot
209    
210          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
211            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
212            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
213              $sel_server;
214                    
215          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
216                    
217          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
218          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 228  sub PrepareSnapshot Line 258  sub PrepareSnapshot
258    
259          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
260            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
261            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
262              $sel_server;
263                    
264          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
265                    
266          $result = $mconn->exec($sql);          $result = $mconn->exec($sql);
267          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
# Line 450  sub ApplySnapshot Line 481  sub ApplySnapshot
481          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
482      }      }
483    
484        print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
485    
486      my $ok = 0;      my $ok = 0;
487      my $syncid = '';      my $syncid = -1;
488      while(<$inpf>)      while(<$inpf>)
489      {      {
490          $_ =~ s/\n//;          $_ =~ s/\n//;
491          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
492            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
493          if ($cmt ne '--')          if ($cmt ne '--')
494          {          {
495              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
# Line 464  sub ApplySnapshot Line 498  sub ApplySnapshot
498          }          }
499          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
500          {          {
501              if ($syncid eq '')              if ($syncid == -1)
502              {              {
503                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
504                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 479  sub ApplySnapshot Line 513  sub ApplySnapshot
513          }          }
514          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
515          {          {
516              if ($syncid eq '')              if ($syncid == -1)
517              {              {
518                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
519                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 494  sub ApplySnapshot Line 528  sub ApplySnapshot
528          }          }
529          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
530          {          {
531              if ($syncid eq '')              if ($syncid == -1)
532              {              {
533                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
534                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 509  sub ApplySnapshot Line 543  sub ApplySnapshot
543          }          }
544          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
545          {          {
546              if ($syncid ne '')              if ($syncid != -1)
547              {              {
548                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
549                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
# Line 978  sub SyncSyncID Line 1012  sub SyncSyncID
1012      return(1);      return(1);
1013  }  }
1014    
1015    # stuff moved from perl scripts for better re-use
1016    
1017    sub Rollback {
1018        my $conn = shift @_;
1019    
1020        print STDERR $conn->errorMessage;
1021        $conn->exec("ROLLBACK");
1022    }
1023    
1024    sub RollbackAndQuit {
1025        my $conn = shift @_;
1026    
1027        Rollback($conn);
1028        exit (-1);
1029    }
1030    
1031    sub Connect {
1032            my $info = shift @_;
1033    
1034            print("Connecting to $info\n") if ($debug || $verbose);
1035            my $conn = Pg::connectdb($info);
1036            if ($conn->status != PGRES_CONNECTION_OK) {
1037                print STDERR "Failed opening $info\n";
1038                exit 1;
1039            }
1040            return $conn;
1041    }
1042    
1043    sub Exec {
1044            my $conn = shift @_;
1045            my $sql = shift @_;
1046    
1047            my $result = $conn->exec($sql);
1048            print STDERR "$sql\n" if ($debug);
1049            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1050    }
1051    
1052    sub Exec2 {
1053            my $mconn = shift @_;
1054            my $sconn = shift @_;
1055            my $sql = shift @_;
1056    
1057            my $result = $mconn->exec($sql);
1058            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1059            $result = $sconn->exec($sql);
1060            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1061    }
1062    
1063    sub MkInfo {
1064            my $db = shift || die "need database name!";
1065            my $host = shift;
1066            my $port = shift;
1067            my $user = shift;
1068            my $password = shift;
1069    
1070            my $info = "dbname=$db";
1071            $info = "$info host=$host" if (defined($host));
1072            $info = "$info port=$port" if (defined($port));
1073            $info = "$info user=$user" if (defined($user));
1074            $info = "$info password=$password" if (defined($password));
1075    
1076            return $info;
1077    }
1078    
1079  1;  1;

Legend:
Removed from v.1.8  
changed lines
  Added in v.1.14

  ViewVC Help
Powered by ViewVC 1.1.26