/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.4 by dpavlin, Wed Aug 6 00:28:29 2003 UTC revision 1.9 by dpavlin, Wed Oct 29 18:13:22 2003 UTC
# Line 11  require Exporter; Line 11  require Exporter;
11  use strict;  use strict;
12  use Pg;  use Pg;
13    
14  my $debug = 0;  #my $debug = 0;
15  my $quiet = 1;  #my $quiet = 1;
16    
17    my $debug = 1;
18    my $quiet = 0;
19    
20  my %Mtables = ();  my %Mtables = ();
21  my %Stables = ();  my %Stables = ();
22    
23  sub GetSlaveId  sub GetSlaveId
24  {  {
25      my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
26    
27      my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
28                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$slaveHost' AND dbase='$slaveDB'");
29    
30      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
31      {      {
32          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
33          return(-1);          return(-1);
34      }      }
35            
# Line 43  sub GetSlaveId Line 46  sub GetSlaveId
46    
47  sub PrepareSnapshot  sub PrepareSnapshot
48  {  {
49      my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]);      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
50    
51        if ($mserver == $sserver) {
52            print STDERR "master and slave numbers are same!\n";
53            return(-1);
54        }
55    
56        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
57    
58      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
59      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
60      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
61      {      {
62          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
63          return(-1);          return(-1);
64      }      }
65            
# Line 58  sub PrepareSnapshot Line 68  sub PrepareSnapshot
68          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
69      }      }
70            
71      $result = $conn->exec("BEGIN");      $result = $mconn->exec("BEGIN");
72      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
73      {      {
74          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
75          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
76          return(-1);          return(-1);
77      }      }
78      $result = $conn->exec("set transaction isolation level serializable");      $result = $mconn->exec("set transaction isolation level serializable");
79      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
80      {      {
81          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
82          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
83          return(-1);          return(-1);
84      }      }
85            
86      # MAP oid --> tabname, keyname, key_type      # MAP oid --> tabname, keyname, key_type
87      $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .      $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
88                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
89                            ", pg_type pgt".                            ", pg_type pgt".
90                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
91                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
92      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
93      {      {
94          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
95          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
96          return(-1);          return(-1);
97      }      }
98            
99      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
100      {      {
101          #       printf "$row[0], $row[1], $row[2]\n";          #       printf "$row[0], $row[1], $row[2]\n";
102                    if (ref($onlytables) eq 'HASH') {
103                            next unless (exists $onlytables->{$row[1]});
104                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
105                    }
106          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
107      }      }
108            
109      # Read last succeeded sync      # Read last succeeded sync
110      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
111          " where server = $server AND syncid = (select max(syncid) from" .          " where server = $sserver AND syncid = (select max(syncid) from" .
112          " _RSERV_SYNC_ where server = $server AND status > 0)";          " _RSERV_SYNC_ where server = $sserver AND status > 0)";
113            
114      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
115    
116      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
117      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
118      {      {
119          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
120          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
121          return(-1);          return(-1);
122      }      }
123            
124      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
125        
126        # exclude data which originated from master server
127        my $sel_server = " and l.server = $mserver ";
128    
129      my $sinfo = "";      my $sinfo = "";
130      if (@lastsync && $lastsync[3] ne '')        # sync info      if (@lastsync && $lastsync[3] ne '')        # sync info
131      {      {
# Line 121  sub PrepareSnapshot Line 138  sub PrepareSnapshot
138            
139      # DELETED rows      # DELETED rows
140      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .      $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
141          " where l.delete = 1 $sinfo order by l.reloid";          " where l.delete = 1 $sinfo $sel_server order by l.reloid";
142            
143      printf "$sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
144            
145      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
146      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
147      {      {
148          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
149          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
150          return(-1);          return(-1);
151      }      }
152            
153      my $lastoid = '';      my $lastoid = -1;
154      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
155      {      {
156          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
# Line 141  sub PrepareSnapshot Line 158  sub PrepareSnapshot
158    
159          if ($lastoid != $row[0])          if ($lastoid != $row[0])
160          {          {
161              if ($lastoid eq '')              if ($lastoid == -1)
162              {              {
163                  my $syncid = GetSYNCID($conn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
164                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
165                  $havedeal = 1;                  $havedeal = 1;
166              }              }
# Line 157  sub PrepareSnapshot Line 174  sub PrepareSnapshot
174          if (! defined $row[1])          if (! defined $row[1])
175          {          {
176              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
177              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
178              return(-2);              return(-2);
179          }          }
180          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
181      }      }
182      printf $outf "\\.\n" if $lastoid ne '';      printf $outf "\\.\n" if ($lastoid != -1);
183            
184      # UPDATED rows      # UPDATED rows
185            
# Line 176  sub PrepareSnapshot Line 193  sub PrepareSnapshot
193    
194          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
195            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
196            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
197              $sel_server;
198                    
199          printf "$sql\n" if $debug;          printf "UPDATED: $sql\n" if $debug;
200                    
201          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
202          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
203          {          {
204              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
205              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
206              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
207              return(-1);              return(-1);
208          }          }
209          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
210          if (! $havedeal)          if (! $havedeal)
211          {          {
212              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
213              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
214              $havedeal = 1;              $havedeal = 1;
215          }          }
# Line 224  sub PrepareSnapshot Line 242  sub PrepareSnapshot
242    
243          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
244            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
245            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
246              $sel_server;
247                    
248          printf "$sql\n" if $debug;          printf "INSERTED: $sql\n" if $debug;
249                    
250          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
251          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
252          {          {
253              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
254              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
255              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
256              return(-1);              return(-1);
257          }          }
258          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
259          if (! $havedeal)          if (! $havedeal)
260          {          {
261              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
262              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
263              $havedeal = 1;              $havedeal = 1;
264          }          }
# Line 264  sub PrepareSnapshot Line 283  sub PrepareSnapshot
283            
284      unless ($havedeal)      unless ($havedeal)
285      {      {
286          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
287          return(0);          return(0);
288      }      }
289            
290      # Remember this snapshot info      # Remember this snapshot info
291      $result = $conn->exec("select _rserv_sync_($server)");      $result = $mconn->exec("select _rserv_sync_($sserver)");
292      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
293      {      {
294          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
295          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
296          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
297          return(-1);          return(-1);
298      }      }
299            
300      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
301      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
302      {      {
303          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
304          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
305          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
306          return(-1);          return(-1);
307      }      }
308      printf $outf "-- OK\n";      printf $outf "-- OK\n";
# Line 330  sub GetSYNCID Line 349  sub GetSYNCID
349    
350  sub CleanLog  sub CleanLog
351  {  {
352      my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
353            
354      my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
355      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
# Line 372  sub CleanLog Line 391  sub CleanLog
391      my $alist = join(',', keys %active);      my $alist = join(',', keys %active);
392      my $sinfo = "logid < $maxid";      my $sinfo = "logid < $maxid";
393      $sinfo .= " AND logid not in ($alist)" if $alist ne '';      $sinfo .= " AND logid not in ($alist)" if $alist ne '';
394            #if (ref($onlytables) eq 'HASH') {
395        #   foreach my $onlytable (keys %{$onlytables}) {
396        #           $sinfo
397        #   }
398        #}
399      $sql = "delete from _RSERV_LOG_ where " .      $sql = "delete from _RSERV_LOG_ where " .
400          "logtime < now() - '$howold second'::interval AND $sinfo";          "logtime < now() - '$howold second'::interval AND $sinfo";
401            
# Line 400  sub CleanLog Line 423  sub CleanLog
423    
424  sub ApplySnapshot  sub ApplySnapshot
425  {  {
426      my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
427            
428      my $result = $conn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
429      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
430      {      {
431          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
432          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
433          return(-1);          return(-1);
434      }      }
435            
436      $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
437      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
438      {      {
439          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
440          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
441          return(-1);          return(-1);
442      }      }
443            
# Line 424  sub ApplySnapshot Line 447  sub ApplySnapshot
447          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
448          " AND pga.attnum = rt.key";          " AND pga.attnum = rt.key";
449            
450      $result = $conn->exec($sql);      $result = $sconn->exec($sql);
451      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
452      {      {
453          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
454          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
455          return(-1);          return(-1);
456      }      }
457      %Stables = ();      %Stables = ();
458      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow)
459      {      {
460          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
461                    if (ref($onlytables) eq 'HASH') {
462                            next unless (exists $onlytables->{$row[1]});
463                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
464                    }
465          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
466      }      }
467    
468      my $ok = 0;      my $ok = 0;
469      my $syncid = '';      my $syncid = -1;
470      while(<$inpf>)      while(<$inpf>)
471      {      {
472          $_ =~ s/\n//;          $_ =~ s/\n//;
# Line 447  sub ApplySnapshot Line 474  sub ApplySnapshot
474          if ($cmt ne '--')          if ($cmt ne '--')
475          {          {
476              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
477              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
478              return(-2);              return(-2);
479          }          }
480          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
481          {          {
482              if ($syncid eq '')              if ($syncid == -1)
483              {              {
484                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
485                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
486                  return(-2);                  return(-2);
487              }              }
488              $result = DoDelete($conn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
489              if ($result)              if ($result)
490              {              {
491                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
492                  return($result);                  return($result);
493              }              }
494          }          }
495          elsif ($cmd eq 'INSERT')          elsif ($cmd eq 'INSERT')
496          {          {
497              if ($syncid eq '')              if ($syncid == -1)
498              {              {
499                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
500                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
501                  return(-2);                  return(-2);
502              }              }
503              $result = DoInsert($conn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
504              if ($result)              if ($result)
505              {              {
506                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
507                  return($result);                  return($result);
508              }              }
509          }          }
510          elsif ($cmd eq 'UPDATE')          elsif ($cmd eq 'UPDATE')
511          {          {
512              if ($syncid eq '')              if ($syncid == -1)
513              {              {
514                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
515                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
516                  return(-2);                  return(-2);
517              }              }
518              $result = DoUpdate($conn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
519              if ($result)              if ($result)
520              {              {
521                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
522                  return($result);                  return($result);
523              }              }
524          }          }
525          elsif ($cmd eq 'SYNCID')          elsif ($cmd eq 'SYNCID')
526          {          {
527              if ($syncid ne '')              if ($syncid != -1)
528              {              {
529                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
530                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
531                  return(-2);                  return(-2);
532              }              }
533              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/)
534              {              {
535                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
536                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
537                  return(-2);                  return(-2);
538              }              }
539              $syncid = $prm;              $syncid = $prm;
540                            
541              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
542                            
543              $result = $conn->exec("select syncid, synctime from " .              $result = $sconn->exec("select syncid, synctime from " .
544                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                                    "_RSERV_SLAVE_SYNC_ where syncid = " .
545                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
546              if ($result->resultStatus ne PGRES_TUPLES_OK)              if ($result->resultStatus ne PGRES_TUPLES_OK)
547              {              {
548                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
549                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
550                  return(-1);                  return(-1);
551              }              }
552              my @row = $result->fetchrow;              my @row = $result->fetchrow;
553              if (! defined $row[0])              if (! defined $row[0])
554              {              {
555                  $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
556                                        "(syncid, synctime) values ($syncid, now())");                                        "(syncid, synctime) values ($syncid, now())");
557              }              }
558              elsif ($row[0] >= $prm)              elsif ($row[0] >= $prm)
559              {              {
560                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
561                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
562                  return(0);                  return(0);
563              }              }
564              else              else
565              {              {
566                  $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .                  $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
567                                        " set syncid = $syncid, synctime = now()");                                        " set syncid = $syncid, synctime = now()");
568              }              }
569              if ($result->resultStatus ne PGRES_COMMAND_OK)              if ($result->resultStatus ne PGRES_COMMAND_OK)
570              {              {
571                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
572                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
573                  return(-1);                  return(-1);
574              }              }
575          }          }
# Line 554  sub ApplySnapshot Line 581  sub ApplySnapshot
581          elsif ($cmd eq 'ERROR')          elsif ($cmd eq 'ERROR')
582          {          {
583              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
584              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
585              return(-2);              return(-2);
586          }          }
587          else          else
588          {          {
589              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
590              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
591              return(-2);              return(-2);
592          }          }
593      }      }
# Line 568  sub ApplySnapshot Line 595  sub ApplySnapshot
595      if (! $ok)      if (! $ok)
596      {      {
597          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
598          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
599          return(-2);          return(-2);
600      }      }
601            
602      $result = $conn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
603      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
604      {      {
605          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
606          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
607          return(-1);          return(-1);
608      }      }
609            
# Line 585  sub ApplySnapshot Line 612  sub ApplySnapshot
612    
613  sub DoDelete  sub DoDelete
614  {  {
615      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
616    
617      # only delete tables that the slave wants      # only delete tables that the slave wants
618      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 619  sub DoDelete Line 646  sub DoDelete
646                    
647          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
648                    
649          my $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
650          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
651          {          {
652              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
653              return(-1);              return(-1);
654          }          }
655      }      }
# Line 639  sub DoDelete Line 666  sub DoDelete
666    
667  sub DoUpdate  sub DoUpdate
668  {  {
669      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
670    
671      # only update the tables that the slave wants      # only update the tables that the slave wants
672      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 661  sub DoUpdate Line 688  sub DoUpdate
688      my $sql = "select attnum, attname from pg_attribute" .      my $sql = "select attnum, attname from pg_attribute" .
689          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
690            
691      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
692      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
693      {      {
694          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
695          return(-1);          return(-1);
696      }      }
697            
# Line 739  sub DoUpdate Line 766  sub DoUpdate
766                    
767          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
768                    
769          $result = $conn->exec($sql);          $result = $sconn->exec($sql);
770                    
771          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
772          {          {
773              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
774              return(-1);              return(-1);
775          }          }
776          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
# Line 760  sub DoUpdate Line 787  sub DoUpdate
787                    
788          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
789          {          {
790              $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
791              return($result) if $result;              return($result) if $result;
792              @CopyBuf = ();              @CopyBuf = ();
793              $CBufLen = 0;              $CBufLen = 0;
# Line 776  sub DoUpdate Line 803  sub DoUpdate
803      if ($CBufLen)      if ($CBufLen)
804      {      {
805          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
806          $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
807          return($result) if $result;          return($result) if $result;
808      }      }
809    
# Line 785  sub DoUpdate Line 812  sub DoUpdate
812    
813  sub DoInsert  sub DoInsert
814  {  {
815      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
816    
817      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
818      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 827  sub DoInsert Line 854  sub DoInsert
854                    
855          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
856          {          {
857              my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
858              return($result) if $result;              return($result) if $result;
859              @CopyBuf = ();              @CopyBuf = ();
860              $CBufLen = 0;              $CBufLen = 0;
# Line 843  sub DoInsert Line 870  sub DoInsert
870      if ($CBufLen)      if ($CBufLen)
871      {      {
872          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
873          my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
874          return($result) if $result;          return($result) if $result;
875      }      }
876            
# Line 853  sub DoInsert Line 880  sub DoInsert
880    
881  sub DoCopy  sub DoCopy
882  {  {
883      my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
884            
885      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
886          "FROM STDIN";          "FROM STDIN";
887      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
888      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
889      {      {
890          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
891          return(-1);          return(-1);
892      }      }
893            
894      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf})
895      {      {
896          $conn->putline($str);          $sconn->putline($str);
897      }      }
898            
899      $conn->putline("\\.\n");      $sconn->putline("\\.\n");
900            
901      if ($conn->endcopy)      if ($sconn->endcopy)
902      {      {
903          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
904          return(-1);          return(-1);
905      }      }
906            
# Line 886  sub DoCopy Line 913  sub DoCopy
913  #  #
914  sub GetSyncID  sub GetSyncID
915  {  {
916      my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
917            
918      my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
919      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
920      {      {
921          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
922          return(-1);          return(-1);
923      }      }
924      my @row = $result->fetchrow;      my @row = $result->fetchrow;
# Line 904  sub GetSyncID Line 931  sub GetSyncID
931  #  #
932  sub SyncSyncID  sub SyncSyncID
933  {  {
934      my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
935            
936      my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
937      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
938      {      {
939          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
940          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
941          return(-1);          return(-1);
942      }      }
943            
944      $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
945                            " where server = $server AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
946                            " for update");                            " for update");
947      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
948      {      {
949          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
950          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
951          return(-1);          return(-1);
952      }      }
953      my @row = $result->fetchrow;      my @row = $result->fetchrow;
954      if (! defined $row[0])      if (! defined $row[0])
955      {      {
956          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
957          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
958          return(0);          return(0);
959      }      }
960      if ($row[1] > 0)      if ($row[1] > 0)
961      {      {
962          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
963              "$server already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
964          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
965          return(0);          return(0);
966      }      }
967      $result = $conn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
968                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
969                            " where server = $server AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
970      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
971      {      {
972          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
973          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
974          return(-1);          return(-1);
975      }      }
976      $result = $conn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
977                            " where server = $server AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
978      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
979      {      {
980          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
981          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
982          return(-1);          return(-1);
983      }      }
984            
985      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
986      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
987      {      {
988          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
989          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
990          return(-1);          return(-1);
991      }      }
992            

Legend:
Removed from v.1.4  
changed lines
  Added in v.1.9

  ViewVC Help
Powered by ViewVC 1.1.26