/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.5 by dpavlin, Sun Oct 26 23:43:54 2003 UTC revision 1.6 by dpavlin, Tue Oct 28 19:55:25 2003 UTC
# Line 19  my %Stables = (); Line 19  my %Stables = ();
19    
20  sub GetSlaveId  sub GetSlaveId
21  {  {
22      my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23    
24      my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25                            " host='$slaveHost' AND dbase='$slaveDB'");                            " host='$slaveHost' AND dbase='$slaveDB'");
26    
27      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
28      {      {
29          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
30          return(-1);          return(-1);
31      }      }
32            
# Line 43  sub GetSlaveId Line 43  sub GetSlaveId
43    
44  sub PrepareSnapshot  sub PrepareSnapshot
45  {  {
46      my ($conn, $sconn, $outf, $server, $onlytables) = @_;      my ($mconn, $sconn, $outf, $server, $onlytables) = @_;
47    
48      # first, we must know for wich tables the slave subscribed      # first, we must know for wich tables the slave subscribed
49      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
50      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
51      {      {
52          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
53          return(-1);          return(-1);
54      }      }
55            
# Line 58  sub PrepareSnapshot Line 58  sub PrepareSnapshot
58          $Stables{$row[0]} = 1;          $Stables{$row[0]} = 1;
59      }      }
60            
61      $result = $conn->exec("BEGIN");      $result = $mconn->exec("BEGIN");
62      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
63      {      {
64          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
65          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
66          return(-1);          return(-1);
67      }      }
68      $result = $conn->exec("set transaction isolation level serializable");      $result = $mconn->exec("set transaction isolation level serializable");
69      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
70      {      {
71          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
72          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
73          return(-1);          return(-1);
74      }      }
75            
76      # MAP oid --> tabname, keyname, key_type      # MAP oid --> tabname, keyname, key_type
77      $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .      $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
78                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .                            " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
79                            ", pg_type pgt".                            ", pg_type pgt".
80                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .                            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
81                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");                            " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
82      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
83      {      {
84          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
85          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
86          return(-1);          return(-1);
87      }      }
88            
# Line 103  sub PrepareSnapshot Line 103  sub PrepareSnapshot
103            
104      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
105    
106      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
107      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
108      {      {
109          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
110          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
111          return(-1);          return(-1);
112      }      }
113            
# Line 129  sub PrepareSnapshot Line 129  sub PrepareSnapshot
129            
130      printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
131            
132      $result = $conn->exec($sql);      $result = $mconn->exec($sql);
133      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
134      {      {
135          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
136          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
137          return(-1);          return(-1);
138      }      }
139            
# Line 147  sub PrepareSnapshot Line 147  sub PrepareSnapshot
147          {          {
148              if ($lastoid eq '')              if ($lastoid eq '')
149              {              {
150                  my $syncid = GetSYNCID($conn, $outf);                  my $syncid = GetSYNCID($mconn, $outf);
151                  return($syncid) if $syncid < 0;                  return($syncid) if $syncid < 0;
152                  $havedeal = 1;                  $havedeal = 1;
153              }              }
# Line 161  sub PrepareSnapshot Line 161  sub PrepareSnapshot
161          if (! defined $row[1])          if (! defined $row[1])
162          {          {
163              print STDERR "NULL key\n" unless ($quiet);              print STDERR "NULL key\n" unless ($quiet);
164              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
165              return(-2);              return(-2);
166          }          }
167          printf $outf "%s\n", OutputValue($row[1]);          printf $outf "%s\n", OutputValue($row[1]);
# Line 184  sub PrepareSnapshot Line 184  sub PrepareSnapshot
184                    
185          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
186                    
187          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
188          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
189          {          {
190              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
191              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
192              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
193              return(-1);              return(-1);
194          }          }
195          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
196          if (! $havedeal)          if (! $havedeal)
197          {          {
198              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
199              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
200              $havedeal = 1;              $havedeal = 1;
201          }          }
# Line 232  sub PrepareSnapshot Line 232  sub PrepareSnapshot
232                    
233          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
234                    
235          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
236          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
237          {          {
238              printf $outf "-- ERROR\n" if $havedeal;              printf $outf "-- ERROR\n" if $havedeal;
239              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
240              $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
241              return(-1);              return(-1);
242          }          }
243          next if $result->ntuples <= 0;          next if $result->ntuples <= 0;
244          if (! $havedeal)          if (! $havedeal)
245          {          {
246              my $syncid = GetSYNCID($conn, $outf);              my $syncid = GetSYNCID($mconn, $outf);
247              return($syncid) if $syncid < 0;              return($syncid) if $syncid < 0;
248              $havedeal = 1;              $havedeal = 1;
249          }          }
# Line 268  sub PrepareSnapshot Line 268  sub PrepareSnapshot
268            
269      unless ($havedeal)      unless ($havedeal)
270      {      {
271          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
272          return(0);          return(0);
273      }      }
274            
275      # Remember this snapshot info      # Remember this snapshot info
276      $result = $conn->exec("select _rserv_sync_($server)");      $result = $mconn->exec("select _rserv_sync_($server)");
277      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
278      {      {
279          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
280          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
281          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
282          return(-1);          return(-1);
283      }      }
284            
285      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
286      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
287      {      {
288          printf $outf "-- ERROR\n";          printf $outf "-- ERROR\n";
289          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
290          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
291          return(-1);          return(-1);
292      }      }
293      printf $outf "-- OK\n";      printf $outf "-- OK\n";
# Line 408  sub CleanLog Line 408  sub CleanLog
408    
409  sub ApplySnapshot  sub ApplySnapshot
410  {  {
411      my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
412            
413      my $result = $conn->exec("BEGIN");      my $result = $sconn->exec("BEGIN");
414      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
415      {      {
416          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
417          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
418          return(-1);          return(-1);
419      }      }
420            
421      $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
422      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
423      {      {
424          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
425          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
426          return(-1);          return(-1);
427      }      }
428            
# Line 432  sub ApplySnapshot Line 432  sub ApplySnapshot
432          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .          " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
433          " AND pga.attnum = rt.key";          " AND pga.attnum = rt.key";
434            
435      $result = $conn->exec($sql);      $result = $sconn->exec($sql);
436      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
437      {      {
438          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
439          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
440          return(-1);          return(-1);
441      }      }
442      %Stables = ();      %Stables = ();
# Line 459  sub ApplySnapshot Line 459  sub ApplySnapshot
459          if ($cmt ne '--')          if ($cmt ne '--')
460          {          {
461              printf STDERR "Invalid format\n" unless ($quiet);              printf STDERR "Invalid format\n" unless ($quiet);
462              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
463              return(-2);              return(-2);
464          }          }
465          if ($cmd eq 'DELETE')          if ($cmd eq 'DELETE')
# Line 467  sub ApplySnapshot Line 467  sub ApplySnapshot
467              if ($syncid eq '')              if ($syncid eq '')
468              {              {
469                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
470                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
471                  return(-2);                  return(-2);
472              }              }
473              $result = DoDelete($conn, $inpf, $prm);              $result = DoDelete($sconn, $inpf, $prm);
474              if ($result)              if ($result)
475              {              {
476                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
477                  return($result);                  return($result);
478              }              }
479          }          }
# Line 482  sub ApplySnapshot Line 482  sub ApplySnapshot
482              if ($syncid eq '')              if ($syncid eq '')
483              {              {
484                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
485                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
486                  return(-2);                  return(-2);
487              }              }
488              $result = DoInsert($conn, $inpf, $prm);              $result = DoInsert($sconn, $inpf, $prm);
489              if ($result)              if ($result)
490              {              {
491                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
492                  return($result);                  return($result);
493              }              }
494          }          }
# Line 497  sub ApplySnapshot Line 497  sub ApplySnapshot
497              if ($syncid eq '')              if ($syncid eq '')
498              {              {
499                  printf STDERR "Sync ID unspecified\n" unless ($quiet);                  printf STDERR "Sync ID unspecified\n" unless ($quiet);
500                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
501                  return(-2);                  return(-2);
502              }              }
503              $result = DoUpdate($conn, $inpf, $prm);              $result = DoUpdate($sconn, $inpf, $prm);
504              if ($result)              if ($result)
505              {              {
506                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
507                  return($result);                  return($result);
508              }              }
509          }          }
# Line 512  sub ApplySnapshot Line 512  sub ApplySnapshot
512              if ($syncid ne '')              if ($syncid ne '')
513              {              {
514                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);                  printf STDERR "Second Sync ID ?!\n" unless ($quiet);
515                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
516                  return(-2);                  return(-2);
517              }              }
518              if ($prm !~ /^\d+$/)              if ($prm !~ /^\d+$/)
519              {              {
520                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);                  printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
521                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
522                  return(-2);                  return(-2);
523              }              }
524              $syncid = $prm;              $syncid = $prm;
525                            
526              printf STDERR "Sync ID $syncid\n" unless ($quiet);              printf STDERR "Sync ID $syncid\n" unless ($quiet);
527                            
528              $result = $conn->exec("select syncid, synctime from " .              $result = $sconn->exec("select syncid, synctime from " .
529                                    "_RSERV_SLAVE_SYNC_ where syncid = " .                                    "_RSERV_SLAVE_SYNC_ where syncid = " .
530                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");                                    "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
531              if ($result->resultStatus ne PGRES_TUPLES_OK)              if ($result->resultStatus ne PGRES_TUPLES_OK)
532              {              {
533                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
534                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
535                  return(-1);                  return(-1);
536              }              }
537              my @row = $result->fetchrow;              my @row = $result->fetchrow;
538              if (! defined $row[0])              if (! defined $row[0])
539              {              {
540                  $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".                  $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
541                                        "(syncid, synctime) values ($syncid, now())");                                        "(syncid, synctime) values ($syncid, now())");
542              }              }
543              elsif ($row[0] >= $prm)              elsif ($row[0] >= $prm)
544              {              {
545                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
546                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
547                  return(0);                  return(0);
548              }              }
549              else              else
550              {              {
551                  $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .                  $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
552                                        " set syncid = $syncid, synctime = now()");                                        " set syncid = $syncid, synctime = now()");
553              }              }
554              if ($result->resultStatus ne PGRES_COMMAND_OK)              if ($result->resultStatus ne PGRES_COMMAND_OK)
555              {              {
556                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $sconn->errorMessage unless ($quiet);
557                  $conn->exec("ROLLBACK");                  $sconn->exec("ROLLBACK");
558                  return(-1);                  return(-1);
559              }              }
560          }          }
# Line 566  sub ApplySnapshot Line 566  sub ApplySnapshot
566          elsif ($cmd eq 'ERROR')          elsif ($cmd eq 'ERROR')
567          {          {
568              printf STDERR "ERROR signaled\n" unless ($quiet);              printf STDERR "ERROR signaled\n" unless ($quiet);
569              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
570              return(-2);              return(-2);
571          }          }
572          else          else
573          {          {
574              printf STDERR "Unknown command $cmd\n" unless ($quiet);              printf STDERR "Unknown command $cmd\n" unless ($quiet);
575              $conn->exec("ROLLBACK");              $sconn->exec("ROLLBACK");
576              return(-2);              return(-2);
577          }          }
578      }      }
# Line 580  sub ApplySnapshot Line 580  sub ApplySnapshot
580      if (! $ok)      if (! $ok)
581      {      {
582          printf STDERR "No OK flag in input\n" unless ($quiet);          printf STDERR "No OK flag in input\n" unless ($quiet);
583          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
584          return(-2);          return(-2);
585      }      }
586            
587      $result = $conn->exec("COMMIT");      $result = $sconn->exec("COMMIT");
588      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
589      {      {
590          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
591          $conn->exec("ROLLBACK");          $sconn->exec("ROLLBACK");
592          return(-1);          return(-1);
593      }      }
594            
# Line 597  sub ApplySnapshot Line 597  sub ApplySnapshot
597    
598  sub DoDelete  sub DoDelete
599  {  {
600      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
601    
602      # only delete tables that the slave wants      # only delete tables that the slave wants
603      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 631  sub DoDelete Line 631  sub DoDelete
631                    
632          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
633                    
634          my $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
635          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
636          {          {
637              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
638              return(-1);              return(-1);
639          }          }
640      }      }
# Line 651  sub DoDelete Line 651  sub DoDelete
651    
652  sub DoUpdate  sub DoUpdate
653  {  {
654      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
655    
656      # only update the tables that the slave wants      # only update the tables that the slave wants
657      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 673  sub DoUpdate Line 673  sub DoUpdate
673      my $sql = "select attnum, attname from pg_attribute" .      my $sql = "select attnum, attname from pg_attribute" .
674          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";          " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
675            
676      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
677      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
678      {      {
679          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
680          return(-1);          return(-1);
681      }      }
682            
# Line 751  sub DoUpdate Line 751  sub DoUpdate
751                    
752          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
753                    
754          $result = $conn->exec($sql);          $result = $sconn->exec($sql);
755                    
756          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
757          {          {
758              print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
759              return(-1);              return(-1);
760          }          }
761          next if $result->cmdTuples == 1;        # updated          next if $result->cmdTuples == 1;        # updated
# Line 772  sub DoUpdate Line 772  sub DoUpdate
772                    
773          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
774          {          {
775              $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776              return($result) if $result;              return($result) if $result;
777              @CopyBuf = ();              @CopyBuf = ();
778              $CBufLen = 0;              $CBufLen = 0;
# Line 788  sub DoUpdate Line 788  sub DoUpdate
788      if ($CBufLen)      if ($CBufLen)
789      {      {
790          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
791          $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
792          return($result) if $result;          return($result) if $result;
793      }      }
794    
# Line 797  sub DoUpdate Line 797  sub DoUpdate
797    
798  sub DoInsert  sub DoInsert
799  {  {
800      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
801    
802      # only insert rows into tables that the slave wants      # only insert rows into tables that the slave wants
803      if (! defined($Stables{$tabname})) {      if (! defined($Stables{$tabname})) {
# Line 839  sub DoInsert Line 839  sub DoInsert
839                    
840          if ($CBufLen >= $CBufMax)          if ($CBufLen >= $CBufMax)
841          {          {
842              my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
843              return($result) if $result;              return($result) if $result;
844              @CopyBuf = ();              @CopyBuf = ();
845              $CBufLen = 0;              $CBufLen = 0;
# Line 855  sub DoInsert Line 855  sub DoInsert
855      if ($CBufLen)      if ($CBufLen)
856      {      {
857          print "@CopyBuf\n" if $debug;          print "@CopyBuf\n" if $debug;
858          my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
859          return($result) if $result;          return($result) if $result;
860      }      }
861            
# Line 865  sub DoInsert Line 865  sub DoInsert
865    
866  sub DoCopy  sub DoCopy
867  {  {
868      my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
869            
870      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
871          "FROM STDIN";          "FROM STDIN";
872      my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
873      if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
874      {      {
875          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
876          return(-1);          return(-1);
877      }      }
878            
879      foreach my $str (@{$CBuf})      foreach my $str (@{$CBuf})
880      {      {
881          $conn->putline($str);          $sconn->putline($str);
882      }      }
883            
884      $conn->putline("\\.\n");      $sconn->putline("\\.\n");
885            
886      if ($conn->endcopy)      if ($sconn->endcopy)
887      {      {
888          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
889          return(-1);          return(-1);
890      }      }
891            
# Line 898  sub DoCopy Line 898  sub DoCopy
898  #  #
899  sub GetSyncID  sub GetSyncID
900  {  {
901      my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
902            
903      my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
904      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
905      {      {
906          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
907          return(-1);          return(-1);
908      }      }
909      my @row = $result->fetchrow;      my @row = $result->fetchrow;
# Line 916  sub GetSyncID Line 916  sub GetSyncID
916  #  #
917  sub SyncSyncID  sub SyncSyncID
918  {  {
919      my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
920            
921      my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
922      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
923      {      {
924          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
925          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
926          return(-1);          return(-1);
927      }      }
928            
929      $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
930                            " where server = $server AND syncid = $syncid" .                            " where server = $server AND syncid = $syncid" .
931                            " for update");                            " for update");
932      if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
933      {      {
934          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
935          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
936          return(-1);          return(-1);
937      }      }
938      my @row = $result->fetchrow;      my @row = $result->fetchrow;
939      if (! defined $row[0])      if (! defined $row[0])
940      {      {
941          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
942          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
943          return(0);          return(0);
944      }      }
945      if ($row[1] > 0)      if ($row[1] > 0)
946      {      {
947          printf STDERR "SyncID $syncid for server ".          printf STDERR "SyncID $syncid for server ".
948              "$server already updated\n" unless ($quiet);              "$server already updated\n" unless ($quiet);
949          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
950          return(0);          return(0);
951      }      }
952      $result = $conn->exec("update _RSERV_SYNC_" .      $result = $mconn->exec("update _RSERV_SYNC_" .
953                            " set synctime = now(), status = 1" .                            " set synctime = now(), status = 1" .
954                            " where server = $server AND syncid = $syncid");                            " where server = $server AND syncid = $syncid");
955      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
956      {      {
957          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
958          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
959          return(-1);          return(-1);
960      }      }
961      $result = $conn->exec("delete from _RSERV_SYNC_" .      $result = $mconn->exec("delete from _RSERV_SYNC_" .
962                            " where server = $server AND syncid < $syncid");                            " where server = $server AND syncid < $syncid");
963      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
964      {      {
965          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
966          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
967          return(-1);          return(-1);
968      }      }
969            
970      $result = $conn->exec("COMMIT");      $result = $mconn->exec("COMMIT");
971      if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
972      {      {
973          print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
974          $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
975          return(-1);          return(-1);
976      }      }
977            

Legend:
Removed from v.1.5  
changed lines
  Added in v.1.6

  ViewVC Help
Powered by ViewVC 1.1.26