/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.4 by dpavlin, Wed Aug 6 00:28:29 2003 UTC revision 1.14 by dpavlin, Sun Nov 2 11:29:09 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14  use strict;  use strict;
15  use Pg;  use Pg;
16    
17  my $debug = 0;  my $debug = 0;
18  my $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21    $debug = 1;
22    $quiet = 0;
23    $verbose = 1;
24    
25  my %Mtables = ();  my %Mtables = ();
26  my %Stables = ();  my %Stables = ();
27    
28  sub GetSlaveId  sub GetServerId
29  {  {
30      my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32        print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33    
34      my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$Host' AND dbase='$DB'");
36    
37      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
38      {      {
39          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
40          return(-1);          return(-1);
41      }      }
42            
43      if ($result->cmdTuples && $result->cmdTuples > 1)      if ($result->cmdTuples && $result->cmdTuples > 1)
44      {      {
45          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);          printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46          return(-2);          return(-2);
47      }      }
48    
49      my @row = $result->fetchrow;      my @row = $result->fetchrow;
50        
51        print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53      return $row[0];      return $row[0];
54  }  }
55    
56  sub PrepareSnapshot  sub PrepareSnapshot
57  {  {
58      my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]);      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59    
60    print STDERR "## d: $debug v: $verbose q: $quiet\n";
61    
62        if ($mserver == $sserver) {
63            print STDERR "master and slave numbers are same [$mserver] !\n";
64            return(-1);
65        }
66    
67        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
68    
69      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
70      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
71      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
72      {      {
73          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
74          return(-1);          return(-1);
75      }      }
76            
# Line 58  sub PrepareSnapshot Line 79  sub PrepareSnapshot
79          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
80      }      }
81            
82      $result = $conn->exec("BEGIN");      print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
83    
84        $result = $mconn->exec("BEGIN");
85      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
86      {      {
87          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
88          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
89          return(-1);          return(-1);
90      }      }
91      $result = $conn->exec("set transaction isolation level serializable");      $result = $mconn->exec("set transaction isolation level serializable");
92      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
93      {      {
94          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
95          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
96          return(-1);          return(-1);
97      }      }
98            
99      # MAP oid --> tabname, keyname, key_type      # MAP oid --> tabname, keyname, key_type
100      $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .      $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
101                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
102                            ", pg_type pgt".                            ", pg_type pgt".
103                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
104                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
105      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
106      {      {
107          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
108          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
109          return(-1);          return(-1);
110      }      }
111            
112      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
113      {      {
114          #       printf "$row[0], $row[1], $row[2]\n";          #       printf "$row[0], $row[1], $row[2]\n";
115                    if (ref($onlytables) eq 'HASH') {
116                            next unless (exists $onlytables->{$row[1]});
117                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
118                    }
119          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
120      }      }
121        
122        print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
123    
124      # Read last succeeded sync      # Read last succeeded sync
125      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
126          " where server = $server AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
127          " _RSERV_SYNC_ where server = $server AND status > 0)";          " _RSERV_SYNC_ where server = $sserver AND status > 0)";
128            
129      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
130    
131      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
132      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
133      {      {
134          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
135          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
136          return(-1);          return(-1);
137      }      }
138            
139      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
140            print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
141    
142        # exclude data which originated from master server
143        my $sel_server = " and l.server = $mserver ";
144    
145      my $sinfo = "";      my $sinfo = "";
146      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
147      {      {
# Line 121  sub PrepareSnapshot Line 154  sub PrepareSnapshot
154            
155      # DELETED rows      # DELETED rows
156      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
157          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
158            
159      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
160            
161      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
162      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
163      {      {
164          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
165          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
166          return(-1);          return(-1);
167      }      }
168            
169      my $lastoid = '';      my $lastoid = -1;
170      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
171      {      {
172          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
# Line 141  sub PrepareSnapshot Line 174  sub PrepareSnapshot
174    
175          if ($lastoid != $row[0])          if ($lastoid != $row[0])
176          {          {
177              if ($lastoid eq '')              if ($lastoid == -1)
178              {              {
179                  my $syncid = GetSYNCID($conn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
180                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
181                  $havedeal = 1;                  $havedeal = 1;
182              }              }
# Line 157  sub PrepareSnapshot Line 190  sub PrepareSnapshot
190          if (! defined $row[1])          if (! defined $row[1])
191          {          {
192              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
193              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
194              return(-2);              return(-2);
195          }          }
196          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
197      }      }
198      printf $outf "\\.\n" if $lastoid ne '';      printf $outf "\\.\n" if ($lastoid != -1);
199            
200      # UPDATED rows      # UPDATED rows
201            
# Line 176  sub PrepareSnapshot Line 209  sub PrepareSnapshot
209    
210          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
211            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
212            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
213              $sel_server;
214                    
215          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
216                    
217          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
218          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
219          {          {
220              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
221              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
222              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
223              return(-1);              return(-1);
224          }          }
225          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
226          if (! $havedeal)          if (! $havedeal)
227          {          {
228              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
229              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
230              $havedeal = 1;              $havedeal = 1;
231          }          }
# Line 224  sub PrepareSnapshot Line 258  sub PrepareSnapshot
258    
259          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
260            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
261            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
262              $sel_server;
263                    
264          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
265                    
266          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
267          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
268          {          {
269              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
270              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
271              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
272              return(-1);              return(-1);
273          }          }
274          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
275          if (! $havedeal)          if (! $havedeal)
276          {          {
277              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
278              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
279              $havedeal = 1;              $havedeal = 1;
280          }          }
# Line 264  sub PrepareSnapshot Line 299  sub PrepareSnapshot
299            
300      unless ($havedeal)      unless ($havedeal)
301      {      {
302          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
303          return(0);          return(0);
304      }      }
305            
306      # Remember this snapshot info      # Remember this snapshot info
307      $result = $conn->exec("select _rserv_sync_($server)");      $result = $mconn->exec("select _rserv_sync_($sserver)");
308      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
309      {      {
310          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
311          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
312          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
313          return(-1);          return(-1);
314      }      }
315            
316      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
317      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
318      {      {
319          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
320          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
321          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
322          return(-1);          return(-1);
323      }      }
324      printf $outf "-- OK\n";      printf $outf "-- OK\n";
# Line 330  sub GetSYNCID Line 365  sub GetSYNCID
365    
366  sub CleanLog  sub CleanLog
367  {  {
368      my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
369            
370      my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
371      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
# Line 372  sub CleanLog Line 407  sub CleanLog
407      my $alist = join(',', keys %active);      my $alist = join(',', keys %active);
408      my $sinfo = "logid < $maxid";      my $sinfo = "logid < $maxid";
409      $sinfo .= " AND logid not in ($alist)" if $alist ne '';      $sinfo .= " AND logid not in ($alist)" if $alist ne '';
410            #if (ref($onlytables) eq 'HASH') {
411        #   foreach my $onlytable (keys %{$onlytables}) {
412        #           $sinfo
413        #   }
414        #}
415      $sql = "delete from _RSERV_LOG_ where " .      $sql = "delete from _RSERV_LOG_ where " .
416          "logtime < now() - '$howold second'::interval AND $sinfo";          "logtime < now() - '$howold second'::interval AND $sinfo";
417            
# Line 400  sub CleanLog Line 439  sub CleanLog
439    
440  sub ApplySnapshot  sub ApplySnapshot
441  {  {
442      my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
443            
444      my $result = $conn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
445      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
446      {      {
447          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
448          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
449          return(-1);          return(-1);
450      }      }
451            
452      $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
453      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
454      {      {
455          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
456          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
457          return(-1);          return(-1);
458      }      }
459            
# Line 424  sub ApplySnapshot Line 463  sub ApplySnapshot
463          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
464          " AND pga.attnum = rt.key";          " AND pga.attnum = rt.key";
465            
466      $result = $conn->exec($sql);      $result = $sconn->exec($sql);
467      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
468      {      {
469          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
470          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
471          return(-1);          return(-1);
472      }      }
473      %Stables = ();      %Stables = ();
474      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow)
475      {      {
476          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
477                    if (ref($onlytables) eq 'HASH') {
478                            next unless (exists $onlytables->{$row[1]});
479                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
480                    }
481          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
482      }      }
483    
484        print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
485    
486      my $ok = 0;      my $ok = 0;
487      my $syncid = '';      my $syncid = -1;
488      while(<$inpf>)      while(<$inpf>)
489      {      {
490          $_ =~ s/\n//;          $_ =~ s/\n//;
491          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
492            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
493          if ($cmt ne '--')          if ($cmt ne '--')
494          {          {
495              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
496              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
497              return(-2);              return(-2);
498          }          }
499          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
500          {          {
501              if ($syncid eq '')              if ($syncid == -1)
502              {              {
503                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
504                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
505                  return(-2);                  return(-2);
506              }              }
507              $result = DoDelete($conn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
508              if ($result)              if ($result)
509              {              {
510                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
511                  return($result);                  return($result);
512              }              }
513          }          }
514          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
515          {          {
516              if ($syncid eq '')              if ($syncid == -1)
517              {              {
518                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
519                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
520                  return(-2);                  return(-2);
521              }              }
522              $result = DoInsert($conn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
523              if ($result)              if ($result)
524              {              {
525                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
526                  return($result);                  return($result);
527              }              }
528          }          }
529          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
530          {          {
531              if ($syncid eq '')              if ($syncid == -1)
532              {              {
533                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
534                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
535                  return(-2);                  return(-2);
536              }              }
537              $result = DoUpdate($conn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
538              if ($result)              if ($result)
539              {              {
540                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
541                  return($result);                  return($result);
542              }              }
543          }          }
544          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
545          {          {
546              if ($syncid ne '')              if ($syncid != -1)
547              {              {
548                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
549                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
550                  return(-2);                  return(-2);
551              }              }
552              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/)
553              {              {
554                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
555                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
556                  return(-2);                  return(-2);
557              }              }
558              $syncid = $prm;              $syncid = $prm;
559                            
560              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
561                            
562              $result = $conn->exec("select syncid, synctime from " .              $result = $sconn->exec("select syncid, synctime from " .
563                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                                    "_RSERV_SLAVE_SYNC_ where syncid = " .
564                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
565              if ($result->resultStatus ne PGRES_TUPLES_OK)              if ($result->resultStatus ne PGRES_TUPLES_OK)
566              {              {
567                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
568                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
569                  return(-1);                  return(-1);
570              }              }
571              my @row = $result->fetchrow;              my @row = $result->fetchrow;
572              if (! defined $row[0])              if (! defined $row[0])
573              {              {
574                  $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
575                                        "(syncid, synctime) values ($syncid, now())");                                        "(syncid, synctime) values ($syncid, now())");
576              }              }
577              elsif ($row[0] >= $prm)              elsif ($row[0] >= $prm)
578              {              {
579                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
580                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
581                  return(0);                  return(0);
582              }              }
583              else              else
584              {              {
585                  $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .                  $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
586                                        " set syncid = $syncid, synctime = now()");                                        " set syncid = $syncid, synctime = now()");
587              }              }
588              if ($result->resultStatus ne PGRES_COMMAND_OK)              if ($result->resultStatus ne PGRES_COMMAND_OK)
589              {              {
590                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
591                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
592                  return(-1);                  return(-1);
593              }              }
594          }          }
# Line 554  sub ApplySnapshot Line 600  sub ApplySnapshot
600          elsif ($cmd eq 'ERROR')          elsif ($cmd eq 'ERROR')
601          {          {
602              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
603              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
604              return(-2);              return(-2);
605          }          }
606          else          else
607          {          {
608              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
609              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
610              return(-2);              return(-2);
611          }          }
612      }      }
# Line 568  sub ApplySnapshot Line 614  sub ApplySnapshot
614      if (! $ok)      if (! $ok)
615      {      {
616          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
617          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
618          return(-2);          return(-2);
619      }      }
620            
621      $result = $conn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
622      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
623      {      {
624          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
625          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
626          return(-1);          return(-1);
627      }      }
628            
# Line 585  sub ApplySnapshot Line 631  sub ApplySnapshot
631    
632  sub DoDelete  sub DoDelete
633  {  {
634      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
635    
636      # only delete tables that the slave wants      # only delete tables that the slave wants
637      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 619  sub DoDelete Line 665  sub DoDelete
665                    
666          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
667                    
668          my $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
669          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
670          {          {
671              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
672              return(-1);              return(-1);
673          }          }
674      }      }
# Line 639  sub DoDelete Line 685  sub DoDelete
685    
686  sub DoUpdate  sub DoUpdate
687  {  {
688      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
689    
690      # only update the tables that the slave wants      # only update the tables that the slave wants
691      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 661  sub DoUpdate Line 707  sub DoUpdate
707      my $sql = "select attnum, attname from pg_attribute" .      my $sql = "select attnum, attname from pg_attribute" .
708          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
709            
710      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
711      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
712      {      {
713          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
714          return(-1);          return(-1);
715      }      }
716            
# Line 739  sub DoUpdate Line 785  sub DoUpdate
785                    
786          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
787                    
788          $result = $conn->exec($sql);          $result = $sconn->exec($sql);
789                    
790          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
791          {          {
792              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
793              return(-1);              return(-1);
794          }          }
795          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
# Line 760  sub DoUpdate Line 806  sub DoUpdate
806                    
807          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
808          {          {
809              $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
810              return($result) if $result;              return($result) if $result;
811              @CopyBuf = ();              @CopyBuf = ();
812              $CBufLen = 0;              $CBufLen = 0;
# Line 776  sub DoUpdate Line 822  sub DoUpdate
822      if ($CBufLen)      if ($CBufLen)
823      {      {
824          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
825          $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
826          return($result) if $result;          return($result) if $result;
827      }      }
828    
# Line 785  sub DoUpdate Line 831  sub DoUpdate
831    
832  sub DoInsert  sub DoInsert
833  {  {
834      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
835    
836      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
837      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 827  sub DoInsert Line 873  sub DoInsert
873                    
874          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
875          {          {
876              my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
877              return($result) if $result;              return($result) if $result;
878              @CopyBuf = ();              @CopyBuf = ();
879              $CBufLen = 0;              $CBufLen = 0;
# Line 843  sub DoInsert Line 889  sub DoInsert
889      if ($CBufLen)      if ($CBufLen)
890      {      {
891          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
892          my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
893          return($result) if $result;          return($result) if $result;
894      }      }
895            
# Line 853  sub DoInsert Line 899  sub DoInsert
899    
900  sub DoCopy  sub DoCopy
901  {  {
902      my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
903            
904      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
905          "FROM STDIN";          "FROM STDIN";
906      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
907      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
908      {      {
909          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
910          return(-1);          return(-1);
911      }      }
912            
913      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf})
914      {      {
915          $conn->putline($str);          $sconn->putline($str);
916      }      }
917            
918      $conn->putline("\\.\n");      $sconn->putline("\\.\n");
919            
920      if ($conn->endcopy)      if ($sconn->endcopy)
921      {      {
922          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
923          return(-1);          return(-1);
924      }      }
925            
# Line 886  sub DoCopy Line 932  sub DoCopy
932  #  #
933  sub GetSyncID  sub GetSyncID
934  {  {
935      my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
936            
937      my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
938      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
939      {      {
940          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
941          return(-1);          return(-1);
942      }      }
943      my @row = $result->fetchrow;      my @row = $result->fetchrow;
# Line 904  sub GetSyncID Line 950  sub GetSyncID
950  #  #
951  sub SyncSyncID  sub SyncSyncID
952  {  {
953      my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
954            
955      my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
956      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
957      {      {
958          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
959          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
960          return(-1);          return(-1);
961      }      }
962            
963      $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
964                            " where server = $server AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
965                            " for update");                            " for update");
966      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
967      {      {
968          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
969          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
970          return(-1);          return(-1);
971      }      }
972      my @row = $result->fetchrow;      my @row = $result->fetchrow;
973      if (! defined $row[0])      if (! defined $row[0])
974      {      {
975          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
976          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
977          return(0);          return(0);
978      }      }
979      if ($row[1] > 0)      if ($row[1] > 0)
980      {      {
981          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
982              "$server already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
983          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
984          return(0);          return(0);
985      }      }
986      $result = $conn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
987                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
988                            " where server = $server AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
989      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
990      {      {
991          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
992          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
993          return(-1);          return(-1);
994      }      }
995      $result = $conn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
996                            " where server = $server AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
997      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
998      {      {
999          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
1000          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
1001          return(-1);          return(-1);
1002      }      }
1003            
1004      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
1005      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
1006      {      {
1007          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
1008          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
1009          return(-1);          return(-1);
1010      }      }
1011            
1012      return(1);      return(1);
1013  }  }
1014    
1015    # stuff moved from perl scripts for better re-use
1016    
1017    sub Rollback {
1018        my $conn = shift @_;
1019    
1020        print STDERR $conn->errorMessage;
1021        $conn->exec("ROLLBACK");
1022    }
1023    
1024    sub RollbackAndQuit {
1025        my $conn = shift @_;
1026    
1027        Rollback($conn);
1028        exit (-1);
1029    }
1030    
1031    sub Connect {
1032            my $info = shift @_;
1033    
1034            print("Connecting to $info\n") if ($debug || $verbose);
1035            my $conn = Pg::connectdb($info);
1036            if ($conn->status != PGRES_CONNECTION_OK) {
1037                print STDERR "Failed opening $info\n";
1038                exit 1;
1039            }
1040            return $conn;
1041    }
1042    
1043    sub Exec {
1044            my $conn = shift @_;
1045            my $sql = shift @_;
1046    
1047            my $result = $conn->exec($sql);
1048            print STDERR "$sql\n" if ($debug);
1049            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1050    }
1051    
1052    sub Exec2 {
1053            my $mconn = shift @_;
1054            my $sconn = shift @_;
1055            my $sql = shift @_;
1056    
1057            my $result = $mconn->exec($sql);
1058            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1059            $result = $sconn->exec($sql);
1060            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1061    }
1062    
1063    sub MkInfo {
1064            my $db = shift || die "need database name!";
1065            my $host = shift;
1066            my $port = shift;
1067            my $user = shift;
1068            my $password = shift;
1069    
1070            my $info = "dbname=$db";
1071            $info = "$info host=$host" if (defined($host));
1072            $info = "$info port=$port" if (defined($port));
1073            $info = "$info user=$user" if (defined($user));
1074            $info = "$info password=$password" if (defined($password));
1075    
1076            return $info;
1077    }
1078    
1079  1;  1;

Legend:
Removed from v.1.4  
changed lines
  Added in v.1.14

  ViewVC Help
Powered by ViewVC 1.1.26