/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.15 by dpavlin, Sun Nov 2 13:21:12 2003 UTC revision 1.18 by dpavlin, Mon Nov 3 21:30:39 2003 UTC
# Line 7  package RServ; Line 7  package RServ;
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10          Rollback RollbackAndQuit Connect Exec Exec2 MkInfo          Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11          $debug $quiet $verbose          $debug $quiet $verbose
12          );          );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
# Line 48  sub GetServerId Line 48  sub GetServerId
48    
49      my @row = $result->fetchrow;      my @row = $result->fetchrow;
50    
51      print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);      print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53      return $row[0];      return $row[0];
54  }  }
55    
56  sub PrepareSnapshot  sub PrepareSnapshot
57  {  {
58      my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;      my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59    
60      if ($mserver == $sserver) {      if ($mserver == $sserver) {
61          print STDERR "master and slave numbers are same [$mserver] !\n";          print STDERR "master and slave numbers are same [$mserver] !\n";
62          return(-1);          return(-1);
63      }      }
64    
65      print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);      print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66    
67        # dump master server ID into snapshot file (to prevent replication
68        # of colums from master back to slave)
69        print $outf "-- SERVER $mserver\n";
70    
71      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
72      my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);      my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
# Line 73  sub PrepareSnapshot Line 77  sub PrepareSnapshot
77          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
78      }      }
79            
80      print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);      print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81    
82      Exec($mconn,"BEGIN");      Exec($mconn,"BEGIN");
83      Exec($mconn,"set transaction isolation level serializable");      Exec($mconn,"set transaction isolation level serializable");
# Line 90  sub PrepareSnapshot Line 94  sub PrepareSnapshot
94    
95      while (@row = $result->fetchrow)      while (@row = $result->fetchrow)
96      {      {
97          #       printf "$row[0], $row[1], $row[2]\n";                  printf "$row[0], $row[1], $row[2]\n" if ($debug);
98                  if (ref($onlytables) eq 'HASH') {                  if (ref($onlytables) eq 'HASH') {
99                          next unless (exists $onlytables->{$row[1]});                          next unless (exists $onlytables->{$row[1]});
100                          $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});                          $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
# Line 98  sub PrepareSnapshot Line 102  sub PrepareSnapshot
102          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103      }      }
104    
105      print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);      print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106      if (! %Mtables) {      if (! %Mtables) {
107          print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";          print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108          RollbackAndQuit($mconn);          RollbackAndQuit($mconn);
# Line 113  sub PrepareSnapshot Line 117  sub PrepareSnapshot
117      };      };
118            
119      $result = Exec($mconn,$sql);      $result = Exec($mconn,$sql);
120        
121      my @lastsync = $result->fetchrow;      my @lastsync = $result->fetchrow;
122      print "lastsync: ",join(",",@lastsync),"\n" if ($debug);      print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123    
124      # exclude data which originated from master server      # exclude data which originated from master server
125      my $sel_server = " and l.server = $mserver ";      my $sel_server = " and l.server = $mserver ";
# Line 127  sub PrepareSnapshot Line 131  sub PrepareSnapshot
131          $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';          $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132          $sinfo .= ")";          $sinfo .= ")";
133      }      }
134        
135      my $havedeal = 0;      my $havedeal = 0;
136            
137      # DELETED rows      # DELETED rows
# Line 137  sub PrepareSnapshot Line 141  sub PrepareSnapshot
141      printf "DELETED: $sql\n" if $debug;      printf "DELETED: $sql\n" if $debug;
142            
143      $result = $mconn->exec($sql);      $result = $mconn->exec($sql);
144      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
145          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
146          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
147          return(-1);          return(-1);
148      }      }
149            
150      my $lastoid = -1;      my $lastoid = -1;
151      while (@row = $result->fetchrow)      while (@row = $result->fetchrow) {
     {  
152          next unless exists $Mtables{$row[0]};          next unless exists $Mtables{$row[0]};
153          next unless exists $Stables{$Mtables{$row[0]}[0]};          next unless exists $Stables{$Mtables{$row[0]}[0]};
154    
155          if ($lastoid != $row[0])          if ($lastoid != $row[0]) {
156          {              if ($lastoid == -1) {
             if ($lastoid == -1)  
             {  
157                  my $syncid = GetSYNCID($mconn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
158                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
159                  $havedeal = 1;                  $havedeal = 1;
160              }              } else {
             else  
             {  
161                  printf $outf "\\.\n";                  printf $outf "\\.\n";
162              }              }
163              printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";              printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
164              $lastoid = $row[0];              $lastoid = $row[0];
165          }          }
166          if (! defined $row[1])          if (! defined $row[1]) {
         {  
167              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
168              $mconn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
169              return(-2);              return(-2);
# Line 265  sub PrepareSnapshot Line 262  sub PrepareSnapshot
262                  printf $outf "  " if $i;                  printf $outf "  " if $i;
263                  printf "        " if $i && $debug;                  printf "        " if $i && $debug;
264                  printf $outf "%s", OutputValue($row[$i]);                  printf $outf "%s", OutputValue($row[$i]);
265                  printf "%s", OutputValue($row[$i]) if $debug;;                  printf "%s", OutputValue($row[$i]) if $debug;
266              }              }
267              printf $outf "\n";              printf $outf "\n";
268              printf "\n" if $debug;              printf "\n" if $debug;
# Line 277  sub PrepareSnapshot Line 274  sub PrepareSnapshot
274            
275      unless ($havedeal)      unless ($havedeal)
276      {      {
277            print STDERR "hon't have deal, rollback...\n" if ($debug);
278          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
279          return(0);          return(0);
280      }      }
# Line 299  sub PrepareSnapshot Line 297  sub PrepareSnapshot
297          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
298          return(-1);          return(-1);
299      }      }
300    
301      printf $outf "-- OK\n";      printf $outf "-- OK\n";
302      printf "-- OK\n" if $debug;      printf "-- OK\n" if $debug;
303            
# Line 332  sub GetSYNCID Line 331  sub GetSYNCID
331          $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
332          return(-1);          return(-1);
333      }      }
334        
335      my @row = $result->fetchrow;      my @row = $result->fetchrow;
336            
337      printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
# Line 417  sub CleanLog Line 416  sub CleanLog
416    
417  sub ApplySnapshot  sub ApplySnapshot
418  {  {
419      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
420        
421        my $serverId;
422    
423      my $result = $sconn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
424      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
425          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
426          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
427          return(-1);          return(-1);
428      }      }
429            
430      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
431      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
432          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
433          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
434          return(-1);          return(-1);
435      }      }
436            
437      # MAP name --> oid, keyname, keynum      # MAP name --> oid, keyname, keynum
438      my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .      my $sql = qq{
439          " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .          select pgc.oid, pgc.relname, pga.attname, rt.key
440          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
441          " AND pga.attnum = rt.key";          where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
442                    AND pga.attnum = rt.key
443        };
444            
445      $result = $sconn->exec($sql);      $result = $sconn->exec($sql);
446      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
447          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
448          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
449          return(-1);          return(-1);
450      }      }
451      %Stables = ();      %Stables = ();
452      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow) {
     {  
453          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];          #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
454                  if (ref($onlytables) eq 'HASH') {                  if (ref($onlytables) eq 'HASH') {
455                          next unless (exists $onlytables->{$row[1]});                          next unless (exists $onlytables->{$row[1]});
# Line 463  sub ApplySnapshot Line 462  sub ApplySnapshot
462    
463      my $ok = 0;      my $ok = 0;
464      my $syncid = -1;      my $syncid = -1;
465      while(<$inpf>)      while(<$inpf>) {
     {  
466          $_ =~ s/\n//;          $_ =~ s/\n//;
467          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);          my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
468          die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);          die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
469          if ($cmt ne '--')          if ($cmt ne '--') {
         {  
470              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
471              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
472              return(-2);              return(-2);
473          }          }
474          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE') {
475          {              if ($syncid == -1) {
             if ($syncid == -1)  
             {  
476                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
477                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
478                  return(-2);                  return(-2);
479              }              }
480              $result = DoDelete($sconn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
481              if ($result)              if ($result) {
             {  
482                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
483                  return($result);                  return($result);
484              }              }
485          }          } elsif ($cmd eq 'INSERT') {
486          elsif ($cmd eq 'INSERT')              if ($syncid == -1) {
         {  
             if ($syncid == -1)  
             {  
487                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
488                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
489                  return(-2);                  return(-2);
490              }              }
491              $result = DoInsert($sconn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
492              if ($result)              if ($result) {
             {  
493                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
494                  return($result);                  return($result);
495              }              }
496          }          } elsif ($cmd eq 'UPDATE') {
497          elsif ($cmd eq 'UPDATE')              if ($syncid == -1) {
         {  
             if ($syncid == -1)  
             {  
498                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
499                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
500                  return(-2);                  return(-2);
501              }              }
502              $result = DoUpdate($sconn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
503              if ($result)              if ($result) {
             {  
504                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
505                  return($result);                  return($result);
506              }              }
507          }          } elsif ($cmd eq 'SYNCID') {
508          elsif ($cmd eq 'SYNCID')              if ($syncid != -1) {
         {  
             if ($syncid != -1)  
             {  
509                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
510                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
511                  return(-2);                  return(-2);
512              }              }
513              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/) {
             {  
514                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
515                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
516                  return(-2);                  return(-2);
# Line 537  sub ApplySnapshot Line 519  sub ApplySnapshot
519                            
520              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
521                            
522              $result = $sconn->exec("select syncid, synctime from " .              $result = $sconn->exec(qq{
523                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                  select syncid, synctime
524                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                  from _RSERV_SLAVE_SYNC_
525              if ($result->resultStatus ne PGRES_TUPLES_OK)                  where syncid =
526              {                          (select max(syncid) from _RSERV_SLAVE_SYNC_)
527                  print STDERR $sconn->errorMessage unless ($quiet);              });
528                if ($result->resultStatus ne PGRES_TUPLES_OK) {
529                    print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
530                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
531                  return(-1);                  return(-1);
532              }              }
533    
534              my @row = $result->fetchrow;              my @row = $result->fetchrow;
535              if (! defined $row[0])              print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
536              {              if (! defined $row[0]) {
537                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = Exec($sconn,qq{
538                                        "(syncid, synctime) values ($syncid, now())");                          insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
539              }                          values ($syncid, now())
540              elsif ($row[0] >= $prm)                  });
541              {              } elsif ($row[0] >= $prm) {
542                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
543                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
544                  return(0);                  return(0);
545                } else {
546                    $result = Exec($sconn,qq{
547                            update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
548                    });
549              }              }
550              else              if ($result->resultStatus ne PGRES_COMMAND_OK) {
             {  
                 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .  
                                       " set syncid = $syncid, synctime = now()");  
             }  
             if ($result->resultStatus ne PGRES_COMMAND_OK)  
             {  
551                  print STDERR $sconn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
552                  $sconn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
553                  return(-1);                  return(-1);
554              }              }
555          }          } elsif ($cmd eq 'OK') {
         elsif ($cmd eq 'OK')  
         {  
556              $ok = 1;              $ok = 1;
557                if ($multimaster) {
558                # now, update server in _rserv_log_ based on transaction xid
559                    ExecFatch($sconn,"select count(*) from _rserv_log_");
560                    ExecDebug($sconn,"select * from _rserv_log_");
561                    my $keys_sql = qq{
562                            update _rserv_log_ set server=$serverId
563                            where logid = (select _rserv_xid_())
564                    };
565                                    
566                    Exec($sconn,$keys_sql);
567                }
568              last;              last;
569          }          } elsif ($cmd eq 'ERROR') {
         elsif ($cmd eq 'ERROR')  
         {  
570              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
571              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
572              return(-2);              return(-2);
573          }          } elsif ($cmd eq 'SERVER') {
574          else              if ($prm !~ /^\d+$/) {
575          {                  printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
576                    $sconn->exec("ROLLBACK");
577                    return(-2);
578                }
579                $serverId = $prm;
580                print STDERR "Server ID $serverId\n" unless ($quiet);
581            } else {
582              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
583              $sconn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
584              return(-2);              return(-2);
585          }          }
586      }      }
587            
588      if (! $ok)      if (! $ok) {
     {  
589          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
590          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
591          return(-2);          return(-2);
592      }      }
593            
594      $result = $sconn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
595      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
596          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
597          $sconn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
598          return(-1);          return(-1);
# Line 613  sub DoDelete Line 607  sub DoDelete
607    
608      # only delete tables that the slave wants      # only delete tables that the slave wants
609      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
610          print "Not configured to delete rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
611          while (<$inpf>) {          while (<$inpf>) {
612              my $istring = $_;              my $istring = $_;
613              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 667  sub DoUpdate Line 661  sub DoUpdate
661    
662      # only update the tables that the slave wants      # only update the tables that the slave wants
663      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
664          print "Not configured to update rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
665          while (<$inpf>) {          while (<$inpf>) {
666              my $istring = $_;              my $istring = $_;
667              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 693  sub DoUpdate Line 687  sub DoUpdate
687      }      }
688            
689      my @anames = ();      my @anames = ();
690      while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow) {
     {  
691          $anames[$row[0]] = $row[1];          $anames[$row[0]] = $row[1];
692      }      }
693            
694      my $istring;      my $istring;
695      my $ok = 0;      my $ok = 0;
696      while(<$inpf>)      while(<$inpf>) {
697      {          if ($_ !~ /\n$/) {
         if ($_ !~ /\n$/)  
         {  
698              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
699              return(-2);              return(-2);
700          }          }
701          $istring = $_;          $istring = $_;
702          $istring =~ s/\n//;          $istring =~ s/\n//;
703          if ($istring eq '\.')          if ($istring eq '\.') {
         {  
704              $ok = 1;              $ok = 1;
705              last;              last;
706          }          }
707          my @vals = split(/      /, $istring);          my @vals = split(/      /, $istring);
708          if ($oidkey)          if ($oidkey) {
709          {              if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
             if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)  
             {  
710                  printf STDERR "Invalid OID\n" unless ($quiet);                  printf STDERR "Invalid OID\n" unless ($quiet);
711                  return(-2);                  return(-2);
712              }              }
713              $oidkey = $vals[0];              $oidkey = $vals[0];
714          }          } else {
         else  
         {  
715              unshift @vals, '';              unshift @vals, '';
716          }          }
717                    
718          $sql = "update \"$tabname\" set ";          $sql = "update \"$tabname\" set ";
719          my $ocnt = 0;          my $ocnt = 0;
720          for (my $i = 1; $i <= $#anames; $i++)          for (my $i = 1; $i <= $#anames; $i++) {
721          {              if ($vals[$i] eq '\N') {
722              if ($vals[$i] eq '\N')                  if ($i == $Stables{$tabname}->[2]) {
             {  
                 if ($i == $Stables{$tabname}->[2])  
                 {  
723                      printf STDERR "NULL key\n" unless ($quiet);                      printf STDERR "NULL key\n" unless ($quiet);
724                      return(-2);                      return(-2);
725                  }                  }
726                  $vals[$i] = 'null';                  $vals[$i] = 'null';
727              }              } else {
             else  
             {  
728                  $vals[$i] = "'" . $vals[$i] . "'";                  $vals[$i] = "'" . $vals[$i] . "'";
729                  next if $i == $Stables{$tabname}->[2];                  next if $i == $Stables{$tabname}->[2];
730              }              }
731              $ocnt++;              $ocnt++;
732              $sql .= ', ' if $ocnt > 1;              $sql .= ', ' if $ocnt > 1;
733              $sql .= "\"$anames[$i]\" = $vals[$i]";              $sql .= "\"$anames[$i]\" = $vals[$i]";
734          }          } if ($oidkey) {
         if ($oidkey)  
         {  
735              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
736          }          } else {
         else  
         {  
737              $sql .= " where \"$Stables{$tabname}->[1]\" = ".              $sql .= " where \"$Stables{$tabname}->[1]\" = ".
738                  $vals[$Stables{$tabname}->[2]];                  $vals[$Stables{$tabname}->[2]];
739          }          }
# Line 765  sub DoUpdate Line 742  sub DoUpdate
742                    
743          $result = $sconn->exec($sql);          $result = $sconn->exec($sql);
744                    
745          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK) {
         {  
746              print STDERR $sconn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
747              return(-1);              return(-1);
748          }          }
749          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
750                    
751          if ($result->cmdTuples > 1)          if ($result->cmdTuples > 1) {
         {  
752              printf STDERR "Duplicate keys\n" unless ($quiet);              printf STDERR "Duplicate keys\n" unless ($quiet);
753              return(-2);              return(-2);
754          }          }
# Line 782  sub DoUpdate Line 757  sub DoUpdate
757          push @CopyBuf, "$istring\n";          push @CopyBuf, "$istring\n";
758          $CBufLen += length($istring);          $CBufLen += length($istring);
759                    
760          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax) {
         {  
761              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
762              return($result) if $result;              return($result) if $result;
763              @CopyBuf = ();              @CopyBuf = ();
# Line 791  sub DoUpdate Line 765  sub DoUpdate
765          }          }
766      }      }
767            
768      if (! $ok)      if (! $ok) {
     {  
769          printf STDERR "No end of input in UPDATE section\n" unless ($quiet);          printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
770          return(-2);          return(-2);
771      }      }
772            
773      if ($CBufLen)      if ($CBufLen) {
774      {          print STDERR "@CopyBuf\n" if $debug;
         print "@CopyBuf\n" if $debug;  
775          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776          return($result) if $result;          return($result) if $result;
777      }      }
# Line 813  sub DoInsert Line 785  sub DoInsert
785    
786      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
787      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
788          print "Not configured to insert rows from table $tabname\n" unless $quiet;          print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
789          while (<$inpf>) {          while (<$inpf>) {
790              my $istring = $_;              my $istring = $_;
791              $istring =~ s/\n//;              $istring =~ s/\n//;
# Line 830  sub DoInsert Line 802  sub DoInsert
802            
803      my $istring;      my $istring;
804      my $ok = 0;      my $ok = 0;
805      while(<$inpf>)      while(<$inpf>) {
806      {          if ($_ !~ /\n$/) {
         if ($_ !~ /\n$/)  
         {  
807              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
808              return(-2);              return(-2);
809          }          }
810          $istring = $_;          $istring = $_;
811          $istring =~ s/\n//;          $istring =~ s/\n//;
812          if ($istring eq '\.')          if ($istring eq '\.') {
         {  
813              $ok = 1;              $ok = 1;
814              last;              last;
815          }          }
# Line 849  sub DoInsert Line 818  sub DoInsert
818          push @CopyBuf, "$istring\n";          push @CopyBuf, "$istring\n";
819          $CBufLen += length($istring);          $CBufLen += length($istring);
820                    
821          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax) {
         {  
822              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
823              return($result) if $result;              return($result) if $result;
824              @CopyBuf = ();              @CopyBuf = ();
# Line 858  sub DoInsert Line 826  sub DoInsert
826          }          }
827      }      }
828            
829      if (! $ok)      if (! $ok) {
     {  
830          printf STDERR "No end of input in INSERT section\n" unless ($quiet);          printf STDERR "No end of input in INSERT section\n" unless ($quiet);
831          return(-2);          return(-2);
832      }      }
833            
834      if ($CBufLen)      if ($CBufLen) {
835      {          print STDERR "@CopyBuf\n" if $debug;
         print "@CopyBuf\n" if $debug;  
836          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
837          return($result) if $result;          return($result) if $result;
838      }      }
# Line 875  sub DoInsert Line 841  sub DoInsert
841  }  }
842    
843    
844  sub DoCopy  sub DoCopy {
 {  
845      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
846            
847      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
848          "FROM STDIN";          "FROM STDIN";
849      my $result = $sconn->exec($sql);      my $result = $sconn->exec($sql);
850      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN) {
     {  
851          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
852          return(-1);          return(-1);
853      }      }
854            
855      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf}) {
     {  
856          $sconn->putline($str);          $sconn->putline($str);
857      }      }
858            
859      $sconn->putline("\\.\n");      $sconn->putline("\\.\n");
860            
861      if ($sconn->endcopy)      if ($sconn->endcopy) {
     {  
862          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
863          return(-1);          return(-1);
864      }      }
# Line 908  sub DoCopy Line 870  sub DoCopy
870  #  #
871  # Returns last SyncID applied on Slave  # Returns last SyncID applied on Slave
872  #  #
873  sub GetSyncID  sub GetSyncID {
 {  
874      my ($sconn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
875            
876      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
877      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
878          print STDERR $sconn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
879          return(-1);          return(-1);
880      }      }
881      my @row = $result->fetchrow;      my @row = $result->fetchrow;
882        print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
883      return(undef) unless defined $row[0];       # null      return(undef) unless defined $row[0];       # null
884      return($row[0]);      return($row[0]);
885  }  }
# Line 926  sub GetSyncID Line 887  sub GetSyncID
887  #  #
888  # Updates _RSERV_SYNC_ on Master with Slave SyncID  # Updates _RSERV_SYNC_ on Master with Slave SyncID
889  #  #
890  sub SyncSyncID  sub SyncSyncID {
 {  
891      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
892            
893      my $result = $mconn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
894      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
895          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
896          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
897          return(-1);          return(-1);
# Line 941  sub SyncSyncID Line 900  sub SyncSyncID
900      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
901                            " where server = $sserver AND syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
902                            " for update");                            " for update");
903      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK) {
     {  
904          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
905          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
906          return(-1);          return(-1);
907      }      }
908      my @row = $result->fetchrow;      my @row = $result->fetchrow;
909      if (! defined $row[0])      if (! defined $row[0]) {
     {  
910          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
911          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
912          return(0);          return(0);
913      }      }
914      if ($row[1] > 0)      if ($row[1] > 0) {
     {  
915          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
916              "$sserver already updated\n" unless ($quiet);              "$sserver already updated\n" unless ($quiet);
917          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
# Line 964  sub SyncSyncID Line 920  sub SyncSyncID
920      $result = $mconn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
921                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
922                            " where server = $sserver AND syncid = $syncid");                            " where server = $sserver AND syncid = $syncid");
923      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
924          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
925          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
926          return(-1);          return(-1);
927      }      }
928      $result = $mconn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
929                            " where server = $sserver AND syncid < $syncid");                            " where server = $sserver AND syncid < $syncid");
930      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
931          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
932          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
933          return(-1);          return(-1);
934      }      }
935            
936      $result = $mconn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
937      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
     {  
938          print STDERR $mconn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
939          $mconn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
940          return(-1);          return(-1);
# Line 1012  sub Connect { Line 965  sub Connect {
965          print("Connecting to $info\n") if ($debug || $verbose);          print("Connecting to $info\n") if ($debug || $verbose);
966          my $conn = Pg::connectdb($info);          my $conn = Pg::connectdb($info);
967          if ($conn->status != PGRES_CONNECTION_OK) {          if ($conn->status != PGRES_CONNECTION_OK) {
968              print STDERR "Failed opening $info\n";              die "Failed opening $info";
             exit 1;  
969          }          }
970          return $conn;          return $conn;
971  }  }
# Line 1031  sub Exec { Line 983  sub Exec {
983          }          }
984          my $result = $conn->exec($sql);          my $result = $conn->exec($sql);
985          if ($result->resultStatus eq PGRES_COMMAND_OK) {          if ($result->resultStatus eq PGRES_COMMAND_OK) {
986                  return;                  return $result;
987          } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {          } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
988                  print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);                  print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
989                  return $result;                  return $result;
# Line 1057  sub Exec2 { Line 1009  sub Exec2 {
1009          # XXX TODO: return results?!          # XXX TODO: return results?!
1010  }  }
1011    
1012    # exec sql query and return one row from it
1013    sub ExecFatch {
1014            my $conn = shift || die "ExecFatch need conn!";
1015            my $sql = shift || die "ExecFatch need SQL!";
1016    
1017            if ($debug) {
1018                    # re-format SQL in one line (for nicer output)
1019                    $sql =~ s/[\s\n\r]+/ /gs;
1020                    print STDERR "Exec: $sql\n";
1021            }
1022    
1023            my $result = $conn->exec($sql);
1024            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1025    
1026            print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1027    
1028            my @row = $result->fetchrow;
1029            print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1030            return @row;
1031    }
1032    
1033    # exec sql query and dump all rows retured to STDERR (great for debugging)
1034    sub ExecDebug {
1035            return if (! $debug);
1036    
1037            my $conn = shift || die "ExecDebug need conn!";
1038            my $sql = shift || die "ExecDebug need SQL!";
1039    
1040            if ($debug) {
1041                    # re-format SQL in one line (for nicer output)
1042                    $sql =~ s/[\s\n\r]+/ /gs;
1043                    print STDERR "Exec: $sql\n";
1044            }
1045    
1046            my $result = $conn->exec($sql);
1047            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1048    
1049            print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1050    
1051            while (my @row = $result->fetchrow) {
1052                    print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1053            }
1054            return $result->ntuples;
1055    }
1056  sub MkInfo {  sub MkInfo {
1057          my $db = shift || die "need database name!";          my $db = shift || die "need database name!";
1058          my $host = shift;          my $host = shift;

Legend:
Removed from v.1.15  
changed lines
  Added in v.1.18

  ViewVC Help
Powered by ViewVC 1.1.26