/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.5 by dpavlin, Sun Oct 26 23:43:54 2003 UTC revision 1.11 by dpavlin, Fri Oct 31 00:07:37 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14  use strict;  use strict;
15  use Pg;  use Pg;
16    
17  my $debug = 0;  my $debug = 0;
18  my $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21  my %Mtables = ();  my %Mtables = ();
22  my %Stables = ();  my %Stables = ();
23    
24  sub GetSlaveId  sub GetServerId
25  {  {
26      my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
27    
28      my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
29                            " host='$slaveHost' AND dbase='$slaveDB'");  
30        my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
31                              " host='$Host' AND dbase='$DB'");
32    
33      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
34      {      {
35          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
36          return(-1);          return(-1);
37      }      }
38            
39      if ($result->cmdTuples && $result->cmdTuples > 1)      if ($result->cmdTuples && $result->cmdTuples > 1)
40      {      {
41          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);          printf STDERR "Duplicate host definitions.\n" unless ($quiet);
42          return(-2);          return(-2);
43      }      }
44    
45      my @row = $result->fetchrow;      my @row = $result->fetchrow;
46        
47        print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
48    
49      return $row[0];      return $row[0];
50  }  }
51    
52  sub PrepareSnapshot  sub PrepareSnapshot
53  {  {
54      my ($conn, $sconn, $outf, $server, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
55    
56        if ($mserver == $sserver) {
57            print STDERR "master and slave numbers are same [$mserver] !\n";
58            return(-1);
59        }
60    
61        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
62    
63      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
64      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
65      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
66      {      {
67          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
68          return(-1);          return(-1);
69      }      }
70            
# Line 58  sub PrepareSnapshot Line 73  sub PrepareSnapshot
73          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
74      }      }
75            
76      $result = $conn->exec("BEGIN");      $result = $mconn->exec("BEGIN");
77      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
78      {      {
79          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
80          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
81          return(-1);          return(-1);
82      }      }
83      $result = $conn->exec("set transaction isolation level serializable");      $result = $mconn->exec("set transaction isolation level serializable");
84      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
85      {      {
86          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
87          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
88          return(-1);          return(-1);
89      }      }
90            
91      # MAP oid --> tabname, keyname, key_type      # MAP oid --> tabname, keyname, key_type
92      $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .      $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
93                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
94                            ", pg_type pgt".                            ", pg_type pgt".
95                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
96                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
97      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
98      {      {
99          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
100          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
101          return(-1);          return(-1);
102      }      }
103            
# Line 98  sub PrepareSnapshot Line 113  sub PrepareSnapshot
113            
114      # Read last succeeded sync      # Read last succeeded sync
115      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
116          " where server = $server AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
117          " _RSERV_SYNC_ where server = $server AND status > 0)";          " _RSERV_SYNC_ where server = $sserver AND status > 0)";
118            
119      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
120    
121      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
122      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
123      {      {
124          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
125          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
126          return(-1);          return(-1);
127      }      }
128            
129      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
130        
131        # exclude data which originated from master server
132        my $sel_server = " and l.server = $mserver ";
133    
134      my $sinfo = "";      my $sinfo = "";
135      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
136      {      {
# Line 125  sub PrepareSnapshot Line 143  sub PrepareSnapshot
143            
144      # DELETED rows      # DELETED rows
145      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
146          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
147            
148      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
149            
150      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
151      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
152      {      {
153          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
154          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
155          return(-1);          return(-1);
156      }      }
157            
158      my $lastoid = '';      my $lastoid = -1;
159      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
160      {      {
161          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
# Line 145  sub PrepareSnapshot Line 163  sub PrepareSnapshot
163    
164          if ($lastoid != $row[0])          if ($lastoid != $row[0])
165          {          {
166              if ($lastoid eq '')              if ($lastoid == -1)
167              {              {
168                  my $syncid = GetSYNCID($conn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
169                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
170                  $havedeal = 1;                  $havedeal = 1;
171              }              }
# Line 161  sub PrepareSnapshot Line 179  sub PrepareSnapshot
179          if (! defined $row[1])          if (! defined $row[1])
180          {          {
181              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
182              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
183              return(-2);              return(-2);
184          }          }
185          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
186      }      }
187      printf $outf "\\.\n" if $lastoid ne '';      printf $outf "\\.\n" if ($lastoid != -1);
188            
189      # UPDATED rows      # UPDATED rows
190            
# Line 180  sub PrepareSnapshot Line 198  sub PrepareSnapshot
198    
199          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
200            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
201            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
202              $sel_server;
203                    
204          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
205                    
206          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
207          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
208          {          {
209              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
210              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
211              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
212              return(-1);              return(-1);
213          }          }
214          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
215          if (! $havedeal)          if (! $havedeal)
216          {          {
217              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
218              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
219              $havedeal = 1;              $havedeal = 1;
220          }          }
# Line 228  sub PrepareSnapshot Line 247  sub PrepareSnapshot
247    
248          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
249            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
250            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
251              $sel_server;
252                    
253          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
254                    
255          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
256          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
257          {          {
258              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
259              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
260              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
261              return(-1);              return(-1);
262          }          }
263          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
264          if (! $havedeal)          if (! $havedeal)
265          {          {
266              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
267              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
268              $havedeal = 1;              $havedeal = 1;
269          }          }
# Line 268  sub PrepareSnapshot Line 288  sub PrepareSnapshot
288            
289      unless ($havedeal)      unless ($havedeal)
290      {      {
291          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
292          return(0);          return(0);
293      }      }
294            
295      # Remember this snapshot info      # Remember this snapshot info
296      $result = $conn->exec("select _rserv_sync_($server)");      $result = $mconn->exec("select _rserv_sync_($sserver)");
297      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
298      {      {
299          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
300          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
301          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
302          return(-1);          return(-1);
303      }      }
304            
305      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
306      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
307      {      {
308          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
309          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
310          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
311          return(-1);          return(-1);
312      }      }
313      printf $outf "-- OK\n";      printf $outf "-- OK\n";
# Line 408  sub CleanLog Line 428  sub CleanLog
428    
429  sub ApplySnapshot  sub ApplySnapshot
430  {  {
431      my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
432            
433      my $result = $conn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
434      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
435      {      {
436          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
437          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
438          return(-1);          return(-1);
439      }      }
440            
441      $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
442      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
443      {      {
444          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
445          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
446          return(-1);          return(-1);
447      }      }
448            
# Line 432  sub ApplySnapshot Line 452  sub ApplySnapshot
452          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
453          " AND pga.attnum = rt.key";          " AND pga.attnum = rt.key";
454            
455      $result = $conn->exec($sql);      $result = $sconn->exec($sql);
456      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
457      {      {
458          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
459          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
460          return(-1);          return(-1);
461      }      }
462      %Stables = ();      %Stables = ();
# Line 451  sub ApplySnapshot Line 471  sub ApplySnapshot
471      }      }
472    
473      my $ok = 0;      my $ok = 0;
474      my $syncid = '';      my $syncid = -1;
475      while(<$inpf>)      while(<$inpf>)
476      {      {
477          $_ =~ s/\n//;          $_ =~ s/\n//;
478          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
479            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
480          if ($cmt ne '--')          if ($cmt ne '--')
481          {          {
482              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
483              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
484              return(-2);              return(-2);
485          }          }
486          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
487          {          {
488              if ($syncid eq '')              if ($syncid == -1)
489              {              {
490                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
491                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
492                  return(-2);                  return(-2);
493              }              }
494              $result = DoDelete($conn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
495              if ($result)              if ($result)
496              {              {
497                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
498                  return($result);                  return($result);
499              }              }
500          }          }
501          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
502          {          {
503              if ($syncid eq '')              if ($syncid == -1)
504              {              {
505                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
506                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
507                  return(-2);                  return(-2);
508              }              }
509              $result = DoInsert($conn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
510              if ($result)              if ($result)
511              {              {
512                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
513                  return($result);                  return($result);
514              }              }
515          }          }
516          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
517          {          {
518              if ($syncid eq '')              if ($syncid == -1)
519              {              {
520                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
521                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
522                  return(-2);                  return(-2);
523              }              }
524              $result = DoUpdate($conn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
525              if ($result)              if ($result)
526              {              {
527                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
528                  return($result);                  return($result);
529              }              }
530          }          }
531          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
532          {          {
533              if ($syncid ne '')              if ($syncid != -1)
534              {              {
535                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
536                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
537                  return(-2);                  return(-2);
538              }              }
539              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/)
540              {              {
541                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
542                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
543                  return(-2);                  return(-2);
544              }              }
545              $syncid = $prm;              $syncid = $prm;
546                            
547              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
548                            
549              $result = $conn->exec("select syncid, synctime from " .              $result = $sconn->exec("select syncid, synctime from " .
550                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                                    "_RSERV_SLAVE_SYNC_ where syncid = " .
551                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
552              if ($result->resultStatus ne PGRES_TUPLES_OK)              if ($result->resultStatus ne PGRES_TUPLES_OK)
553              {              {
554                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
555                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
556                  return(-1);                  return(-1);
557              }              }
558              my @row = $result->fetchrow;              my @row = $result->fetchrow;
559              if (! defined $row[0])              if (! defined $row[0])
560              {              {
561                  $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
562                                        "(syncid, synctime) values ($syncid, now())");                                        "(syncid, synctime) values ($syncid, now())");
563              }              }
564              elsif ($row[0] >= $prm)              elsif ($row[0] >= $prm)
565              {              {
566                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
567                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
568                  return(0);                  return(0);
569              }              }
570              else              else
571              {              {
572                  $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .                  $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
573                                        " set syncid = $syncid, synctime = now()");                                        " set syncid = $syncid, synctime = now()");
574              }              }
575              if ($result->resultStatus ne PGRES_COMMAND_OK)              if ($result->resultStatus ne PGRES_COMMAND_OK)
576              {              {
577                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
578                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
579                  return(-1);                  return(-1);
580              }              }
581          }          }
# Line 566  sub ApplySnapshot Line 587  sub ApplySnapshot
587          elsif ($cmd eq 'ERROR')          elsif ($cmd eq 'ERROR')
588          {          {
589              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
590              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
591              return(-2);              return(-2);
592          }          }
593          else          else
594          {          {
595              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
596              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
597              return(-2);              return(-2);
598          }          }
599      }      }
# Line 580  sub ApplySnapshot Line 601  sub ApplySnapshot
601      if (! $ok)      if (! $ok)
602      {      {
603          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
604          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
605          return(-2);          return(-2);
606      }      }
607            
608      $result = $conn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
609      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
610      {      {
611          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
612          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
613          return(-1);          return(-1);
614      }      }
615            
# Line 597  sub ApplySnapshot Line 618  sub ApplySnapshot
618    
619  sub DoDelete  sub DoDelete
620  {  {
621      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
622    
623      # only delete tables that the slave wants      # only delete tables that the slave wants
624      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 631  sub DoDelete Line 652  sub DoDelete
652                    
653          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
654                    
655          my $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
656          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
657          {          {
658              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
659              return(-1);              return(-1);
660          }          }
661      }      }
# Line 651  sub DoDelete Line 672  sub DoDelete
672    
673  sub DoUpdate  sub DoUpdate
674  {  {
675      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
676    
677      # only update the tables that the slave wants      # only update the tables that the slave wants
678      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 673  sub DoUpdate Line 694  sub DoUpdate
694      my $sql = "select attnum, attname from pg_attribute" .      my $sql = "select attnum, attname from pg_attribute" .
695          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
696            
697      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
698      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
699      {      {
700          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
701          return(-1);          return(-1);
702      }      }
703            
# Line 751  sub DoUpdate Line 772  sub DoUpdate
772                    
773          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
774                    
775          $result = $conn->exec($sql);          $result = $sconn->exec($sql);
776                    
777          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
778          {          {
779              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
780              return(-1);              return(-1);
781          }          }
782          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
# Line 772  sub DoUpdate Line 793  sub DoUpdate
793                    
794          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
795          {          {
796              $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
797              return($result) if $result;              return($result) if $result;
798              @CopyBuf = ();              @CopyBuf = ();
799              $CBufLen = 0;              $CBufLen = 0;
# Line 788  sub DoUpdate Line 809  sub DoUpdate
809      if ($CBufLen)      if ($CBufLen)
810      {      {
811          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
812          $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
813          return($result) if $result;          return($result) if $result;
814      }      }
815    
# Line 797  sub DoUpdate Line 818  sub DoUpdate
818    
819  sub DoInsert  sub DoInsert
820  {  {
821      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
822    
823      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
824      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 839  sub DoInsert Line 860  sub DoInsert
860                    
861          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
862          {          {
863              my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
864              return($result) if $result;              return($result) if $result;
865              @CopyBuf = ();              @CopyBuf = ();
866              $CBufLen = 0;              $CBufLen = 0;
# Line 855  sub DoInsert Line 876  sub DoInsert
876      if ($CBufLen)      if ($CBufLen)
877      {      {
878          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
879          my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
880          return($result) if $result;          return($result) if $result;
881      }      }
882            
# Line 865  sub DoInsert Line 886  sub DoInsert
886    
887  sub DoCopy  sub DoCopy
888  {  {
889      my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
890            
891      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
892          "FROM STDIN";          "FROM STDIN";
893      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
894      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
895      {      {
896          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
897          return(-1);          return(-1);
898      }      }
899            
900      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf})
901      {      {
902          $conn->putline($str);          $sconn->putline($str);
903      }      }
904            
905      $conn->putline("\\.\n");      $sconn->putline("\\.\n");
906            
907      if ($conn->endcopy)      if ($sconn->endcopy)
908      {      {
909          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
910          return(-1);          return(-1);
911      }      }
912            
# Line 898  sub DoCopy Line 919  sub DoCopy
919  #  #
920  sub GetSyncID  sub GetSyncID
921  {  {
922      my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
923            
924      my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
925      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
926      {      {
927          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
928          return(-1);          return(-1);
929      }      }
930      my @row = $result->fetchrow;      my @row = $result->fetchrow;
# Line 916  sub GetSyncID Line 937  sub GetSyncID
937  #  #
938  sub SyncSyncID  sub SyncSyncID
939  {  {
940      my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
941            
942      my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
943      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
944      {      {
945          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
946          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
947          return(-1);          return(-1);
948      }      }
949            
950      $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
951                            " where server = $server AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
952                            " for update");                            " for update");
953      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
954      {      {
955          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
956          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
957          return(-1);          return(-1);
958      }      }
959      my @row = $result->fetchrow;      my @row = $result->fetchrow;
960      if (! defined $row[0])      if (! defined $row[0])
961      {      {
962          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
963          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
964          return(0);          return(0);
965      }      }
966      if ($row[1] > 0)      if ($row[1] > 0)
967      {      {
968          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
969              "$server already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
970          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
971          return(0);          return(0);
972      }      }
973      $result = $conn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
974                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
975                            " where server = $server AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
976      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
977      {      {
978          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
979          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
980          return(-1);          return(-1);
981      }      }
982      $result = $conn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
983                            " where server = $server AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
984      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
985      {      {
986          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
987          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
988          return(-1);          return(-1);
989      }      }
990            
991      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
992      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
993      {      {
994          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
995          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
996          return(-1);          return(-1);
997      }      }
998            
999      return(1);      return(1);
1000  }  }
1001    
1002    # stuff moved from perl scripts for better re-use
1003    
1004    sub Rollback {
1005        my $conn = shift @_;
1006    
1007        print STDERR $conn->errorMessage;
1008        $conn->exec("ROLLBACK");
1009    }
1010    
1011    sub RollbackAndQuit {
1012        my $conn = shift @_;
1013    
1014        Rollback($conn);
1015        exit (-1);
1016    }
1017    
1018    sub Connect {
1019            my $info = shift @_;
1020    
1021            print("Connecting to $info\n") if ($debug || $verbose);
1022            my $conn = Pg::connectdb($info);
1023            if ($conn->status != PGRES_CONNECTION_OK) {
1024                print STDERR "Failed opening $info\n";
1025                exit 1;
1026            }
1027            return $conn;
1028    }
1029    
1030    sub Exec {
1031            my $conn = shift @_;
1032            my $sql = shift @_;
1033    
1034            my $result = $conn->exec($sql);
1035            print STDERR "$sql\n" if ($debug);
1036            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1037    }
1038    
1039    sub Exec2 {
1040            my $mconn = shift @_;
1041            my $sconn = shift @_;
1042            my $sql = shift @_;
1043    
1044            my $result = $mconn->exec($sql);
1045            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1046            $result = $sconn->exec($sql);
1047            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1048    }
1049    
1050  1;  1;

Legend:
Removed from v.1.5  
changed lines
  Added in v.1.11

  ViewVC Help
Powered by ViewVC 1.1.26