/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.15 by dpavlin, Sun Nov 2 13:21:12 2003 UTC revision 1.17 by dpavlin, Sun Nov 2 21:07:20 2003 UTC
# Line 7  package RServ; Line 7  package RServ;
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10          Rollback RollbackAndQuit Connect Exec Exec2 MkInfo          Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11          $debug $quiet $verbose          $debug $quiet $verbose
12          );          );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
# Line 48  sub GetServerId Line 48  sub GetServerId
48    
49      my @row = $result->fetchrow;      my @row = $result->fetchrow;
50    
51      print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);      print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53      return $row[0];      return $row[0];
54  }  }
55    
56  sub PrepareSnapshot  sub PrepareSnapshot
57  {  {
58      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59    
60      if ($mserver == $sserver) {      if ($mserver == $sserver) {
61          print STDERR "master and slave numbers are same [$mserver] !\n";          print STDERR "master and slave numbers are same [$mserver] !\n";
62          return(-1);          return(-1);
63      }      }
64    
65      print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);      print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66    
67        # dump master server ID into snapshot file (to prevent replication
68        # of colums from master back to slave)
69        print $outf "-- SERVER $mserver\n";
70    
71      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
72      my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);      my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
# Line 73  sub PrepareSnapshot Line 77  sub PrepareSnapshot
77          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
78      }      }
79            
80      print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);      print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81    
82      Exec($mconn,"BEGIN");      Exec($mconn,"BEGIN");
83      Exec($mconn,"set transaction isolation level serializable");      Exec($mconn,"set transaction isolation level serializable");
# Line 90  sub PrepareSnapshot Line 94  sub PrepareSnapshot
94    
95      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
96      {      {
97          #       printf "$row[0], $row[1], $row[2]\n";                  printf "$row[0], $row[1], $row[2]\n" if ($debug);
98                  if (ref($onlytables) eq 'HASH') {                  if (ref($onlytables) eq 'HASH') {
99                          next unless (exists $onlytables->{$row[1]});                          next unless (exists $onlytables->{$row[1]});
100                          $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});                          $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
# Line 98  sub PrepareSnapshot Line 102  sub PrepareSnapshot
102          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103      }      }
104    
105      print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);      print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106      if (! %Mtables) {      if (! %Mtables) {
107          print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";          print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108          RollbackAndQuit($mconn);          RollbackAndQuit($mconn);
# Line 113  sub PrepareSnapshot Line 117  sub PrepareSnapshot
117      };      };
118            
119      $result = Exec($mconn,$sql);      $result = Exec($mconn,$sql);
120        
121      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
122      print "lastsync: ",join(",",@lastsync),"\n" if ($debug);      print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123    
124      # exclude data which originated from master server      # exclude data which originated from master server
125      my $sel_server = " and l.server = $mserver ";      my $sel_server = " and l.server = $mserver ";
# Line 127  sub PrepareSnapshot Line 131  sub PrepareSnapshot
131          $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';          $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132          $sinfo .= ")";          $sinfo .= ")";
133      }      }
134        
135        my @keys;   # keys in this snapshot
136    
137      my $havedeal = 0;      my $havedeal = 0;
138            
139      # DELETED rows      # DELETED rows
# Line 137  sub PrepareSnapshot Line 143  sub PrepareSnapshot
143      printf "DELETED: $sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
144            
145      $result = $mconn->exec($sql);      $result = $mconn->exec($sql);
146      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
147          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
148          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
149          return(-1);          return(-1);
150      }      }
151            
152      my $lastoid = -1;      my $lastoid = -1;
153      while (@row = $result->fetchrow)      while (@row = $result->fetchrow) {
     {  
154          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
155          next unless exists $Stables{$Mtables{$row[0]}[0]};          next unless exists $Stables{$Mtables{$row[0]}[0]};
156    
157          if ($lastoid != $row[0])          if ($lastoid != $row[0]) {
158          {              if ($lastoid == -1) {
             if ($lastoid == -1)  
             {  
159                  my $syncid = GetSYNCID($mconn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
160                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
161                  $havedeal = 1;                  $havedeal = 1;
162              }              } else {
             else  
             {  
163                  printf $outf "\\.\n";                  printf $outf "\\.\n";
164              }              }
165              printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";              printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
166              $lastoid = $row[0];              $lastoid = $row[0];
167          }          }
168          if (! defined $row[1])          if (! defined $row[1]) {
         {  
169              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
170              $mconn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
171              return(-2);              return(-2);
172          }          }
173          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
174            push @keys,OutputKey($row[2],$Mtables{$row[0]}[2]);
175      }      }
176      printf $outf "\\.\n" if ($lastoid != -1);      printf $outf "\\.\n" if ($lastoid != -1);
177            
# Line 185  sub PrepareSnapshot Line 185  sub PrepareSnapshot
185    
186          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
187    
188          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\",$oidkey \"_$tabname\".* FROM \"$tabname\" ".
189            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
190            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
191            $sel_server;            $sel_server;
# Line 211  sub PrepareSnapshot Line 211  sub PrepareSnapshot
211          printf "-- UPDATE $tabname\n" if $debug;          printf "-- UPDATE $tabname\n" if $debug;
212          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
213          {          {
214                push @keys,OutputKey(shift @row,$keytype);
215              for (my $i = 0; $i <= $#row; $i++)              for (my $i = 0; $i <= $#row; $i++)
216              {              {
217                  printf $outf "  " if $i;                  printf $outf "  " if $i;
# Line 234  sub PrepareSnapshot Line 235  sub PrepareSnapshot
235    
236          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
237    
238          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".          $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\", $oidkey \"_$tabname\".* FROM \"$tabname\" ".
239            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
240            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".            " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
241            $sel_server;            $sel_server;
# Line 260  sub PrepareSnapshot Line 261  sub PrepareSnapshot
261          printf "-- INSERT $tabname\n" if $debug;          printf "-- INSERT $tabname\n" if $debug;
262          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
263          {          {
264                push @keys,OutputKey(shift @row,$keytype);
265              for (my $i = 0; $i <= $#row; $i++)              for (my $i = 0; $i <= $#row; $i++)
266              {              {
267                  printf $outf "  " if $i;                  printf $outf "  " if $i;
268                  printf "        " if $i && $debug;                  printf "        " if $i && $debug;
269                  printf $outf "%s", OutputValue($row[$i]);                  printf $outf "%s", OutputValue($row[$i]);
270                  printf "%s", OutputValue($row[$i]) if $debug;;                  printf "%s", OutputValue($row[$i]) if $debug;
271              }              }
272              printf $outf "\n";              printf $outf "\n";
273              printf "\n" if $debug;              printf "\n" if $debug;
# Line 277  sub PrepareSnapshot Line 279  sub PrepareSnapshot
279            
280      unless ($havedeal)      unless ($havedeal)
281      {      {
282            print STDERR "hon't have deal, rollback...\n" if ($debug);
283          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
284          return(0);          return(0);
285      }      }
# Line 299  sub PrepareSnapshot Line 302  sub PrepareSnapshot
302          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
303          return(-1);          return(-1);
304      }      }
305    
306        if ($multimaster) {
307            # save keys
308    #       my $key_out = "-- KEYS ".($#keys+1)."\n(\"key\" like ".join(") or (\"key\" like ",@keys).")\n";
309            my $key_out = "-- KEYS ".($#keys+1)."\n".join(",",@keys)."\n";
310            print $outf $key_out;
311            print $key_out if ($debug);
312        }
313    
314      printf $outf "-- OK\n";      printf $outf "-- OK\n";
315      printf "-- OK\n" if $debug;      printf "-- OK\n" if $debug;
316            
# Line 320  sub OutputValue Line 332  sub OutputValue
332          return($val);          return($val);
333  }  }
334    
335    sub OutputKey {
336            my $val = shift;
337            my $cast = shift || '';
338    #       $cast = "::$cast" if ($cast);
339            $cast = "::text";
340    #       $cast = "";
341    
342            return "null" if (! defined($val));
343    
344            print STDERR "Key: ${val}${cast}\n" if ($debug);
345    
346            if ($val =~ m/^\d+$/) {
347                    return "${val}${cast}";
348            } else {
349                    return "'$val'${cast}";
350            }
351    }
352    
353  # Get syncid for new snapshot  # Get syncid for new snapshot
354  sub GetSYNCID  sub GetSYNCID
355  {  {
# Line 332  sub GetSYNCID Line 362  sub GetSYNCID
362          $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
363          return(-1);          return(-1);
364      }      }
365        
366      my @row = $result->fetchrow;      my @row = $result->fetchrow;
367            
368      printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
# Line 417  sub CleanLog Line 447  sub CleanLog
447    
448  sub ApplySnapshot  sub ApplySnapshot
449  {  {
450      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
451        
452        my $serverId;
453    
454      my $result = $sconn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
455      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
456          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
457          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
458          return(-1);          return(-1);
459      }      }
460            
461      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
462      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
463          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
464          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
465          return(-1);          return(-1);
466      }      }
467            
468      # MAP name --> oid, keyname, keynum      # MAP name --> oid, keyname, keynum
469      my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .      my $sql = qq{
470          " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .          select pgc.oid, pgc.relname, pga.attname, rt.key
471          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
472          " AND pga.attnum = rt.key";          where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
473                    AND pga.attnum = rt.key
474        };
475            
476      $result = $sconn->exec($sql);      $result = $sconn->exec($sql);
477      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
478          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
479          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
480          return(-1);          return(-1);
481      }      }
482      %Stables = ();      %Stables = ();
483      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow) {
     {  
484          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
485                  if (ref($onlytables) eq 'HASH') {                  if (ref($onlytables) eq 'HASH') {
486                          next unless (exists $onlytables->{$row[1]});                          next unless (exists $onlytables->{$row[1]});
# Line 461  sub ApplySnapshot Line 491  sub ApplySnapshot
491    
492      print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);      print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
493    
494        # save keys from snapshot because we want to update _rserv_log_ with
495        # correct source server later...
496        my @keys;
497        my $keys_sql;
498    
499      my $ok = 0;      my $ok = 0;
500      my $syncid = -1;      my $syncid = -1;
501      while(<$inpf>)      while(<$inpf>) {
     {  
502          $_ =~ s/\n//;          $_ =~ s/\n//;
503          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
504          die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);          die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
505          if ($cmt ne '--')          if ($cmt ne '--') {
         {  
506              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
507              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
508              return(-2);              return(-2);
509          }          }
510          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE') {
511          {              if ($syncid == -1) {
             if ($syncid == -1)  
             {  
512                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
513                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
514                  return(-2);                  return(-2);
515              }              }
516              $result = DoDelete($sconn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
517              if ($result)              if ($result) {
             {  
518                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
519                  return($result);                  return($result);
520              }              }
521          }          } elsif ($cmd eq 'INSERT') {
522          elsif ($cmd eq 'INSERT')              if ($syncid == -1) {
         {  
             if ($syncid == -1)  
             {  
523                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
524                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
525                  return(-2);                  return(-2);
526              }              }
527              $result = DoInsert($sconn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
528              if ($result)              if ($result) {
             {  
529                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
530                  return($result);                  return($result);
531              }              }
532          }          } elsif ($cmd eq 'UPDATE') {
533          elsif ($cmd eq 'UPDATE')              if ($syncid == -1) {
         {  
             if ($syncid == -1)  
             {  
534                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
535                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
536                  return(-2);                  return(-2);
537              }              }
538              $result = DoUpdate($sconn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
539              if ($result)              if ($result) {
             {  
540                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
541                  return($result);                  return($result);
542              }              }
543          }          } elsif ($cmd eq 'SYNCID') {
544          elsif ($cmd eq 'SYNCID')              if ($syncid != -1) {
         {  
             if ($syncid != -1)  
             {  
545                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
546                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
547                  return(-2);                  return(-2);
548              }              }
549              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/) {
             {  
550                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
551                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
552                  return(-2);                  return(-2);
# Line 537  sub ApplySnapshot Line 555  sub ApplySnapshot
555                            
556              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
557                            
558              $result = $sconn->exec("select syncid, synctime from " .              $result = $sconn->exec(qq{
559                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                  select syncid, synctime
560                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                  from _RSERV_SLAVE_SYNC_
561              if ($result->resultStatus ne PGRES_TUPLES_OK)                  where syncid =
562              {                          (select max(syncid) from _RSERV_SLAVE_SYNC_)
563                  print STDERR $sconn->errorMessage unless ($quiet);              });
564                if ($result->resultStatus ne PGRES_TUPLES_OK) {
565                    print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
566                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
567                  return(-1);                  return(-1);
568              }              }
569    
570              my @row = $result->fetchrow;              my @row = $result->fetchrow;
571              if (! defined $row[0])              print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
572              {              if (! defined $row[0]) {
573                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = Exec($sconn,qq{
574                                        "(syncid, synctime) values ($syncid, now())");                          insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
575              }                          values ($syncid, now())
576              elsif ($row[0] >= $prm)                  });
577              {              } elsif ($row[0] >= $prm) {
578                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
579                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
580                  return(0);                  return(0);
581                } else {
582                    $result = Exec($sconn,qq{
583                            update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
584                    });
585              }              }
586              else              if ($result->resultStatus ne PGRES_COMMAND_OK) {
             {  
                 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .  
                                       " set syncid = $syncid, synctime = now()");  
             }  
             if ($result->resultStatus ne PGRES_COMMAND_OK)  
             {  
587                  print STDERR $sconn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
588                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
589                  return(-1);                  return(-1);
590              }              }
591          }          } elsif ($cmd eq 'OK') {
         elsif ($cmd eq 'OK')  
         {  
592              $ok = 1;              $ok = 1;
593              last;              last;
594          }          } elsif ($cmd eq 'ERROR') {
         elsif ($cmd eq 'ERROR')  
         {  
595              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
596              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
597              return(-2);              return(-2);
598          }          } elsif ($cmd eq 'SERVER') {
599          else              if ($prm !~ /^\d+$/) {
600          {                  printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
601                    $sconn->exec("ROLLBACK");
602                    return(-2);
603                }
604                $serverId = $prm;
605                print STDERR "Server ID $serverId\n" unless ($quiet);
606            } elsif ($cmd eq 'KEYS') {
607                if ($prm !~ /^\d+$/) {
608                    printf STDERR "Invalid numer of keys $prm\n" unless ($quiet);
609                    $sconn->exec("ROLLBACK");
610                    return(-2);
611                 }
612                 my $keys = <$inpf>;
613                 chomp($keys);
614                 if ($multimaster) {
615    ExecFatch($sconn,"select count(*) from _rserv_log_");
616    ExecDebug($sconn,"select * from _rserv_log_");
617    my ($logid) = ExecFatch($sconn,"select distinct logid from _rserv_log_ where key in ($keys)");
618                    $keys_sql = qq{
619                            update _rserv_log_ set server=$serverId
620                            where key in ($keys)
621                    };
622                    $keys_sql = qq{
623                            update _rserv_log_ set server=$serverId
624                            where logid = $logid
625                    };
626                                    
627                    print STDERR "$keys_sql\n" if ($debug);
628                    $result = $sconn->exec($keys_sql);
629    ExecDebug($sconn,"explain analyze $keys_sql");
630                    print STDERR "expected $prm updates, got ",$result->ntuples,"\n" if ($result->ntuples != $prm);
631    #               if ($result->resultStatus ne PGRES_COMMAND_OK || $result->ntuples != $prm) {
632                    if (0) {
633                            print STDERR "FATAL: Cannot update source server in _rserv_log_: ",$sconn->errorMessage,"\n";
634                            $sconn->exec("ROLLBACK");
635                            return(-1);
636                    }
637                }
638            } else {
639              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
640              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
641              return(-2);              return(-2);
642          }          }
643      }      }
644            
645      if (! $ok)  ExecFatch($sconn,"select count(*) from _rserv_log_");
646      {      if (! $ok) {
647          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
648          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
649          return(-2);          return(-2);
650      }      }
651            
652      $result = $sconn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
653      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
654          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
655          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
656          return(-1);          return(-1);
# Line 613  sub DoDelete Line 665  sub DoDelete
665    
666      # only delete tables that the slave wants      # only delete tables that the slave wants
667      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
668          print "Not configured to delete rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
669          while (<$inpf>) {          while (<$inpf>) {
670              my $istring = $_;              my $istring = $_;
671              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 667  sub DoUpdate Line 719  sub DoUpdate
719    
720      # only update the tables that the slave wants      # only update the tables that the slave wants
721      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
722          print "Not configured to update rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
723          while (<$inpf>) {          while (<$inpf>) {
724              my $istring = $_;              my $istring = $_;
725              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 693  sub DoUpdate Line 745  sub DoUpdate
745      }      }
746            
747      my @anames = ();      my @anames = ();
748      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow) {
     {  
749          $anames[$row[0]] = $row[1];          $anames[$row[0]] = $row[1];
750      }      }
751            
752      my $istring;      my $istring;
753      my $ok = 0;      my $ok = 0;
754      while(<$inpf>)      while(<$inpf>) {
755      {          if ($_ !~ /\n$/) {
         if ($_ !~ /\n$/)  
         {  
756              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
757              return(-2);              return(-2);
758          }          }
759          $istring = $_;          $istring = $_;
760          $istring =~ s/\n//;          $istring =~ s/\n//;
761          if ($istring eq '\.')          if ($istring eq '\.') {
         {  
762              $ok = 1;              $ok = 1;
763              last;              last;
764          }          }
765          my @vals = split(/      /, $istring);          my @vals = split(/      /, $istring);
766          if ($oidkey)          if ($oidkey) {
767          {              if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
             if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)  
             {  
768                  printf STDERR "Invalid OID\n" unless ($quiet);                  printf STDERR "Invalid OID\n" unless ($quiet);
769                  return(-2);                  return(-2);
770              }              }
771              $oidkey = $vals[0];              $oidkey = $vals[0];
772          }          } else {
         else  
         {  
773              unshift @vals, '';              unshift @vals, '';
774          }          }
775                    
776          $sql = "update \"$tabname\" set ";          $sql = "update \"$tabname\" set ";
777          my $ocnt = 0;          my $ocnt = 0;
778          for (my $i = 1; $i <= $#anames; $i++)          for (my $i = 1; $i <= $#anames; $i++) {
779          {              if ($vals[$i] eq '\N') {
780              if ($vals[$i] eq '\N')                  if ($i == $Stables{$tabname}->[2]) {
             {  
                 if ($i == $Stables{$tabname}->[2])  
                 {  
781                      printf STDERR "NULL key\n" unless ($quiet);                      printf STDERR "NULL key\n" unless ($quiet);
782                      return(-2);                      return(-2);
783                  }                  }
784                  $vals[$i] = 'null';                  $vals[$i] = 'null';
785              }              } else {
             else  
             {  
786                  $vals[$i] = "'" . $vals[$i] . "'";                  $vals[$i] = "'" . $vals[$i] . "'";
787                  next if $i == $Stables{$tabname}->[2];                  next if $i == $Stables{$tabname}->[2];
788              }              }
789              $ocnt++;              $ocnt++;
790              $sql .= ', ' if $ocnt > 1;              $sql .= ', ' if $ocnt > 1;
791              $sql .= "\"$anames[$i]\" = $vals[$i]";              $sql .= "\"$anames[$i]\" = $vals[$i]";
792          }          } if ($oidkey) {
         if ($oidkey)  
         {  
793              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
794          }          } else {
         else  
         {  
795              $sql .= " where \"$Stables{$tabname}->[1]\" = ".              $sql .= " where \"$Stables{$tabname}->[1]\" = ".
796                  $vals[$Stables{$tabname}->[2]];                  $vals[$Stables{$tabname}->[2]];
797          }          }
# Line 765  sub DoUpdate Line 800  sub DoUpdate
800                    
801          $result = $sconn->exec($sql);          $result = $sconn->exec($sql);
802                    
803          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK) {
         {  
804              print STDERR $sconn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
805              return(-1);              return(-1);
806          }          }
807          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
808                    
809          if ($result->cmdTuples > 1)          if ($result->cmdTuples > 1) {
         {  
810              printf STDERR "Duplicate keys\n" unless ($quiet);              printf STDERR "Duplicate keys\n" unless ($quiet);
811              return(-2);              return(-2);
812          }          }
# Line 782  sub DoUpdate Line 815  sub DoUpdate
815          push @CopyBuf, "$istring\n";          push @CopyBuf, "$istring\n";
816          $CBufLen += length($istring);          $CBufLen += length($istring);
817                    
818          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax) {
         {  
819              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
820              return($result) if $result;              return($result) if $result;
821              @CopyBuf = ();              @CopyBuf = ();
# Line 791  sub DoUpdate Line 823  sub DoUpdate
823          }          }
824      }      }
825            
826      if (! $ok)      if (! $ok) {
     {  
827          printf STDERR "No end of input in UPDATE section\n" unless ($quiet);          printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
828          return(-2);          return(-2);
829      }      }
830            
831      if ($CBufLen)      if ($CBufLen) {
832      {          print STDERR "@CopyBuf\n" if $debug;
         print "@CopyBuf\n" if $debug;  
833          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
834          return($result) if $result;          return($result) if $result;
835      }      }
# Line 813  sub DoInsert Line 843  sub DoInsert
843    
844      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
845      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
846          print "Not configured to insert rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
847          while (<$inpf>) {          while (<$inpf>) {
848              my $istring = $_;              my $istring = $_;
849              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 830  sub DoInsert Line 860  sub DoInsert
860            
861      my $istring;      my $istring;
862      my $ok = 0;      my $ok = 0;
863      while(<$inpf>)      while(<$inpf>) {
864      {          if ($_ !~ /\n$/) {
         if ($_ !~ /\n$/)  
         {  
865              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
866              return(-2);              return(-2);
867          }          }
868          $istring = $_;          $istring = $_;
869          $istring =~ s/\n//;          $istring =~ s/\n//;
870          if ($istring eq '\.')          if ($istring eq '\.') {
         {  
871              $ok = 1;              $ok = 1;
872              last;              last;
873          }          }
# Line 849  sub DoInsert Line 876  sub DoInsert
876          push @CopyBuf, "$istring\n";          push @CopyBuf, "$istring\n";
877          $CBufLen += length($istring);          $CBufLen += length($istring);
878                    
879          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax) {
         {  
880              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
881              return($result) if $result;              return($result) if $result;
882              @CopyBuf = ();              @CopyBuf = ();
# Line 858  sub DoInsert Line 884  sub DoInsert
884          }          }
885      }      }
886            
887      if (! $ok)      if (! $ok) {
     {  
888          printf STDERR "No end of input in INSERT section\n" unless ($quiet);          printf STDERR "No end of input in INSERT section\n" unless ($quiet);
889          return(-2);          return(-2);
890      }      }
891            
892      if ($CBufLen)      if ($CBufLen) {
893      {          print STDERR "@CopyBuf\n" if $debug;
         print "@CopyBuf\n" if $debug;  
894          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
895          return($result) if $result;          return($result) if $result;
896      }      }
# Line 875  sub DoInsert Line 899  sub DoInsert
899  }  }
900    
901    
902  sub DoCopy  sub DoCopy {
 {  
903      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
904            
905      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
906          "FROM STDIN";          "FROM STDIN";
907      my $result = $sconn->exec($sql);      my $result = $sconn->exec($sql);
908      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN) {
     {  
909          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
910          return(-1);          return(-1);
911      }      }
912            
913      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf}) {
     {  
914          $sconn->putline($str);          $sconn->putline($str);
915      }      }
916            
917      $sconn->putline("\\.\n");      $sconn->putline("\\.\n");
918            
919      if ($sconn->endcopy)      if ($sconn->endcopy) {
     {  
920          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
921          return(-1);          return(-1);
922      }      }
# Line 908  sub DoCopy Line 928  sub DoCopy
928  #  #
929  # Returns last SyncID applied on Slave  # Returns last SyncID applied on Slave
930  #  #
931  sub GetSyncID  sub GetSyncID {
 {  
932      my ($sconn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
933            
934      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
935      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
936          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
937          return(-1);          return(-1);
938      }      }
939      my @row = $result->fetchrow;      my @row = $result->fetchrow;
940        print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
941      return(undef) unless defined $row[0];       # null      return(undef) unless defined $row[0];       # null
942      return($row[0]);      return($row[0]);
943  }  }
# Line 926  sub GetSyncID Line 945  sub GetSyncID
945  #  #
946  # Updates _RSERV_SYNC_ on Master with Slave SyncID  # Updates _RSERV_SYNC_ on Master with Slave SyncID
947  #  #
948  sub SyncSyncID  sub SyncSyncID {
 {  
949      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
950            
951      my $result = $mconn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
952      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
953          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
954          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
955          return(-1);          return(-1);
# Line 941  sub SyncSyncID Line 958  sub SyncSyncID
958      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
959                            " where server = $sserver AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
960                            " for update");                            " for update");
961      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
962          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
963          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
964          return(-1);          return(-1);
965      }      }
966      my @row = $result->fetchrow;      my @row = $result->fetchrow;
967      if (! defined $row[0])      if (! defined $row[0]) {
     {  
968          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
969          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
970          return(0);          return(0);
971      }      }
972      if ($row[1] > 0)      if ($row[1] > 0) {
     {  
973          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
974              "$sserver already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
975          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
# Line 964  sub SyncSyncID Line 978  sub SyncSyncID
978      $result = $mconn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
979                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
980                            " where server = $sserver AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
981      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
982          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
983          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
984          return(-1);          return(-1);
985      }      }
986      $result = $mconn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
987                            " where server = $sserver AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
988      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
989          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
990          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
991          return(-1);          return(-1);
992      }      }
993            
994      $result = $mconn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
995      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
996          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
997          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
998          return(-1);          return(-1);
# Line 1012  sub Connect { Line 1023  sub Connect {
1023          print("Connecting to $info\n") if ($debug || $verbose);          print("Connecting to $info\n") if ($debug || $verbose);
1024          my $conn = Pg::connectdb($info);          my $conn = Pg::connectdb($info);
1025          if ($conn->status != PGRES_CONNECTION_OK) {          if ($conn->status != PGRES_CONNECTION_OK) {
1026              print STDERR "Failed opening $info\n";              die "Failed opening $info";
             exit 1;  
1027          }          }
1028          return $conn;          return $conn;
1029  }  }
# Line 1031  sub Exec { Line 1041  sub Exec {
1041          }          }
1042          my $result = $conn->exec($sql);          my $result = $conn->exec($sql);
1043          if ($result->resultStatus eq PGRES_COMMAND_OK) {          if ($result->resultStatus eq PGRES_COMMAND_OK) {
1044                  return;                  return $result;
1045          } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {          } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
1046                  print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);                  print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1047                  return $result;                  return $result;
# Line 1057  sub Exec2 { Line 1067  sub Exec2 {
1067          # XXX TODO: return results?!          # XXX TODO: return results?!
1068  }  }
1069    
1070    sub ExecFatch {
1071            my $conn = shift || die "ExecFatch need conn!";
1072            my $sql = shift || die "ExecFatch need SQL!";
1073    
1074            print STDERR "ExecFatch: $sql\n" if ($debug);
1075    
1076            my $result = $conn->exec($sql);
1077            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1078    
1079            print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1080    
1081            my @row = $result->fetchrow;
1082            print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1083            return @row;
1084    }
1085    
1086    sub ExecDebug {
1087            my $conn = shift || die "ExecDebug need conn!";
1088            my $sql = shift || die "ExecDebug need SQL!";
1089    
1090            print STDERR "ExecDebug: $sql\n" if ($debug);
1091    
1092            my $result = $conn->exec($sql);
1093            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1094    
1095            print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1096    
1097            while (my @row = $result->fetchrow) {
1098                    print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1099            }
1100            return $result->ntuples;
1101    }
1102  sub MkInfo {  sub MkInfo {
1103          my $db = shift || die "need database name!";          my $db = shift || die "need database name!";
1104          my $host = shift;          my $host = shift;

Legend:
Removed from v.1.15  
changed lines
  Added in v.1.17

  ViewVC Help
Powered by ViewVC 1.1.26