/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1 by dpavlin, Wed Dec 20 17:22:35 2000 UTC revision 1.2 by dpavlin, Tue Aug 5 09:52:41 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10  @EXPORT_OK = qw();  @EXPORT_OK = qw();
11    
12  use Pg;  use Pg;
# Line 17  $quiet = 1; Line 17  $quiet = 1;
17  my %Mtables = ();  my %Mtables = ();
18  my %Stables = ();  my %Stables = ();
19    
20  sub PrepareSnapshot  sub GetSlaveId
21  {  {
22          my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);      my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23    
24          my $result = $conn->exec("BEGIN");      $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25          if ($result->resultStatus ne PGRES_COMMAND_OK)                            " host='$slaveHost' AND dbase='$slaveDB'");
26          {      if ($result->resultStatus ne PGRES_TUPLES_OK)
27                  print STDERR $conn->errorMessage unless ($quiet);      {
28                  $conn->exec("ROLLBACK");          print STDERR $conn->errorMessage unless ($quiet);
29                  return(-1);          return(-1);
30          }      }
31          $result = $conn->exec("set transaction isolation level serializable");      
32          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->cmdTuples > 1)
33          {      {
34                  print STDERR $conn->errorMessage unless ($quiet);          printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
35                  $conn->exec("ROLLBACK");          return(-2);
36                  return(-1);      }
37          }  
38        my @row = $result->fetchrow;
39        
40        return $row[0];
41    }
42    
43          # MAP oid --> tabname, keyname  sub PrepareSnapshot
44          $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .  {
45                                                    " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .      my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]);
                                                   " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                                   " and pga.attnum = rt.key");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
46    
47          my @row;      # first, we must know for wich tables the slave subscribed
48          while (@row = $result->fetchrow)      my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
49          {      if ($result->resultStatus ne PGRES_TUPLES_OK)
50        {
51            print STDERR $conn->errorMessage unless ($quiet);
52            return(-1);
53        }
54        
55        my @row;
56        while (@row = $result->fetchrow) {
57            $Stables{$row[0]} = 1;
58        }
59        
60        $result = $conn->exec("BEGIN");
61        if ($result->resultStatus ne PGRES_COMMAND_OK)
62        {
63            print STDERR $conn->errorMessage unless ($quiet);
64            $conn->exec("ROLLBACK");
65            return(-1);
66        }
67        $result = $conn->exec("set transaction isolation level serializable");
68        if ($result->resultStatus ne PGRES_COMMAND_OK)
69        {
70            print STDERR $conn->errorMessage unless ($quiet);
71            $conn->exec("ROLLBACK");
72            return(-1);
73        }
74        
75        # MAP oid --> tabname, keyname, key_type
76        $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
77                              " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
78                              ", pg_type pgt".
79                              " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
80                              " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
81        if ($result->resultStatus ne PGRES_TUPLES_OK)
82        {
83            print STDERR $conn->errorMessage unless ($quiet);
84            $conn->exec("ROLLBACK");
85            return(-1);
86        }
87        
88        while (@row = $result->fetchrow)
89        {
90          #       printf "$row[0], $row[1], $row[2]\n";          #       printf "$row[0], $row[1], $row[2]\n";
91                  push @{$Mtables{$row[0]}}, $row[1], $row[2];          push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
92          }      }
93        
94          # Read last succeeded sync      # Read last succeeded sync
95          $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .      $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
96                  " where server = $server and syncid = (select max(syncid) from" .          " where server = $server AND syncid = (select max(syncid) from" .
97                          " _RSERV_SYNC_ where server = $server and status > 0)";          " _RSERV_SYNC_ where server = $server AND status > 0)";
98        
99        printf "$sql\n" if $debug;
100    
101        $result = $conn->exec($sql);
102        if ($result->resultStatus ne PGRES_TUPLES_OK)
103        {
104            print STDERR $conn->errorMessage unless ($quiet);
105            $conn->exec("ROLLBACK");
106            return(-1);
107        }
108        
109        my @lastsync = $result->fetchrow;
110        
111        my $sinfo = "";
112        if ($lastsync[3] ne '')     # sync info
113        {
114            $sinfo = "and (l.logid >= $lastsync[3]";
115            $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
116            $sinfo .= ")";
117        }
118        
119        my $havedeal = 0;
120        
121        # DELETED rows
122        $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
123            " where l.delete = 1 $sinfo order by l.reloid";
124        
125        printf "$sql\n" if $debug;
126        
127        $result = $conn->exec($sql);
128        if ($result->resultStatus ne PGRES_TUPLES_OK)
129        {
130            print STDERR $conn->errorMessage unless ($quiet);
131            $conn->exec("ROLLBACK");
132            return(-1);
133        }
134        
135        $lastoid = '';
136        while (@row = $result->fetchrow)
137        {
138            next unless exists $Mtables{$row[0]};
139            next unless exists $Stables{$Mtables{$row[0]}[0]};
140    
141            if ($lastoid != $row[0])
142            {
143                if ($lastoid eq '')
144                {
145                    my $syncid = GetSYNCID($conn, $outf);
146                    return($syncid) if $syncid < 0;
147                    $havedeal = 1;
148                }
149                else
150                {
151                    printf $outf "\\.\n";
152                }
153                printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
154                $lastoid = $row[0];
155            }
156            if (! defined $row[1])
157            {
158                print STDERR "NULL key\n" unless ($quiet);
159                $conn->exec("ROLLBACK");
160                return(-2);
161            }
162            printf $outf "%s\n", OutputValue($row[1]);
163        }
164        printf $outf "\\.\n" if $lastoid ne '';
165        
166        # UPDATED rows
167        
168        my ($taboid, $tabname, $tabkey);
169        foreach $taboid (keys %Mtables)
170        {
171            ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
172            next unless exists $Stables{$tabname};
173    
174            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
175    
176            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
177              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
178              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
179            
180          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
181            
182          $result = $conn->exec($sql);          $result = $conn->exec($sql);
183          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
184          {          {
185                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
186                  $conn->exec("ROLLBACK");              print STDERR $conn->errorMessage unless ($quiet);
187                  return(-1);              $conn->exec("ROLLBACK");
188                return(-1);
189            }
190            next if $result->ntuples <= 0;
191            if (! $havedeal)
192            {
193                my $syncid = GetSYNCID($conn, $outf);
194                return($syncid) if $syncid < 0;
195                $havedeal = 1;
196          }          }
197            printf $outf "-- UPDATE $tabname\n";
198          my @lastsync = $result->fetchrow;          printf "-- UPDATE $tabname\n" if $debug;
199            while (@row = $result->fetchrow)
         my $sinfo = "";  
         if ($lastsync[3] ne '') # sync info  
200          {          {
201                  $sinfo = "and (l.logid >= $lastsync[3]";              for ($i = 0; $i <= $#row; $i++)
202                  $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';              {
203                  $sinfo .= ")";                  printf $outf "  " if $i;
204          }                  printf "        " if $i && $debug;
205                    printf $outf "%s", OutputValue($row[$i]);
206          my $havedeal = 0;                  printf "%s", OutputValue($row[$i]) if $debug;;
207                }
208          # DELETED rows              printf $outf "\n";
209          $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .              printf "\n" if $debug;
210                  " where l.deleted = 1 $sinfo order by l.reloid";          }
211            printf $outf "\\.\n";
212            printf "\\.\n" if $debug;;
213        }
214    
215        # INSERTED rows
216    
217        my ($taboid, $tabname, $tabkey);
218        foreach $taboid (keys %Mtables)
219        {
220            ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
221            next unless exists $Stables{$tabname};
222    
223            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
224    
225            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
226              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
227              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
228            
229          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
230            
231          $result = $conn->exec($sql);          $result = $conn->exec($sql);
232          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
233          {          {
234                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
235                  $conn->exec("ROLLBACK");              print STDERR $conn->errorMessage unless ($quiet);
236                  return(-1);              $conn->exec("ROLLBACK");
237                return(-1);
238            }
239            next if $result->ntuples <= 0;
240            if (! $havedeal)
241            {
242                my $syncid = GetSYNCID($conn, $outf);
243                return($syncid) if $syncid < 0;
244                $havedeal = 1;
245          }          }
246            printf $outf "-- INSERT $tabname\n";
247          $lastoid = '';          printf "-- INSERT $tabname\n" if $debug;
248          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
249          {          {
250                  next unless exists $Mtables{$row[0]};              for ($i = 0; $i <= $#row; $i++)
251                  if ($lastoid != $row[0])              {
252                  {                  printf $outf "  " if $i;
253                          if ($lastoid eq '')                  printf "        " if $i && $debug;
254                          {                  printf $outf "%s", OutputValue($row[$i]);
255                                  my $syncid = GetSYNCID($conn, $outf);                  printf "%s", OutputValue($row[$i]) if $debug;;
256                                  return($syncid) if $syncid < 0;              }
257                                  $havedeal = 1;              printf $outf "\n";
258                          }              printf "\n" if $debug;
259                          else          }
260                          {          printf $outf "\\.\n";
261                                  printf $outf "\\.\n";          printf "\\.\n" if $debug;;
262                          }      }
263                          printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";      
264                          $lastoid = $row[0];      
265                  }      unless ($havedeal)
266                  if (! defined $row[1])      {
267                  {          $conn->exec("ROLLBACK");
268                          print STDERR "NULL key\n" unless ($quiet);          return(0);
269                          $conn->exec("ROLLBACK");      }
270                          return(-2);      
271                  }      # Remember this snapshot info
272                  printf $outf "%s\n", OutputValue($row[1]);      $result = $conn->exec("select _rserv_sync_($server)");
273          }      if ($result->resultStatus ne PGRES_TUPLES_OK)
274          printf $outf "\\.\n" if $lastoid ne '';      {
275            printf $outf "-- ERROR\n";
276          # UPDATED rows          print STDERR $conn->errorMessage unless ($quiet);
277            $conn->exec("ROLLBACK");
278          my ($taboid, $tabname, $tabkey);          return(-1);
279          foreach $taboid (keys %Mtables)      }
280          {      
281                  ($tabname, $tabkey) = @{$Mtables{$taboid}};      $result = $conn->exec("COMMIT");
282                  my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';      if ($result->resultStatus ne PGRES_COMMAND_OK)
283                  $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .      {
284                          " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .          printf $outf "-- ERROR\n";
285                                  " and l.key = _$tabname.${tabkey}::text";          print STDERR $conn->errorMessage unless ($quiet);
286            $conn->exec("ROLLBACK");
287                  printf "$sql\n" if $debug;          return(-1);
288        }
289                  $result = $conn->exec($sql);      printf $outf "-- OK\n";
290                  if ($result->resultStatus ne PGRES_TUPLES_OK)      printf "-- OK\n" if $debug;
291                  {      
292                          printf $outf "-- ERROR\n" if $havedeal;      return(1);
293                          print STDERR $conn->errorMessage unless ($quiet);      
                         $conn->exec("ROLLBACK");  
                         return(-1);  
                 }  
                 next if $result->ntuples <= 0;  
                 if (! $havedeal)  
                 {  
                         my $syncid = GetSYNCID($conn, $outf);  
                         return($syncid) if $syncid < 0;  
                         $havedeal = 1;  
                 }  
                 printf $outf "-- UPDATE $tabname\n";  
                 while (@row = $result->fetchrow)  
                 {  
                         for ($i = 0; $i <= $#row; $i++)  
                         {  
                                 printf $outf "  " if $i;  
                                 printf $outf "%s", OutputValue($row[$i]);  
                         }  
                         printf $outf "\n";  
                 }  
                 printf $outf "\\.\n";  
         }  
   
         unless ($havedeal)  
         {  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
   
         # Remember this snapshot info  
         $result = $conn->exec("select _rserv_sync_($server)");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 printf $outf "-- ERROR\n";  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
   
         $result = $conn->exec("COMMIT");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 printf $outf "-- ERROR\n";  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         printf $outf "-- OK\n";  
   
         return(1);  
   
294  }  }
295    
296  sub OutputValue  sub OutputValue
# Line 213  sub OutputValue Line 310  sub OutputValue
310  # Get syncid for new snapshot  # Get syncid for new snapshot
311  sub GetSYNCID  sub GetSYNCID
312  {  {
313          my ($conn, $outf) = @_; # (@_[0], @_[1]);      my ($conn, $outf) = @_; # (@_[0], @_[1]);
314        
315          my $result = $conn->exec("select nextval('_rserv_sync_seq_')");      my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
316          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
317          {      {
318                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
319                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
320                  return(-1);          return(-1);
321          }      }
322        
323          my @row = $result->fetchrow;      my @row = $result->fetchrow;
324        
325          printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
326          return($row[0]);      printf "-- SYNCID $row[0]\n" if $debug;
327        return($row[0]);
328  }  }
329    
330    
331  sub CleanLog  sub CleanLog
332  {  {
333          my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold) = @_; # (@_[0], @_[1]);
334        
335          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
336          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
337          {      {
338                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
339                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
340                  return(-1);          return(-1);
341          }      }
342        
343          my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .      my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
344                  " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .          " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
345                          " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";          " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
346        
347          printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
348        
349          $result = $conn->exec($sql);      $result = $conn->exec($sql);
350          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
351          {      {
352                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
353                  return(-1);          return(-1);
354          }      }
355          my $maxid = '';      my $maxid = '';
356          my %active = ();      my %active = ();
357          while (my @row = $result->fetchrow)      while (my @row = $result->fetchrow)
358          {      {
359                  $maxid = $row[0] if $maxid eq '';          $maxid = $row[0] if $maxid eq '';
360                  last if $row[0] > $maxid;          last if $row[0] > $maxid;
361                  my @ids = split(/[      ]+,[    ]+/, $row[1]);          my @ids = split(/[      ]+,[    ]+/, $row[1]);
362                  foreach $aid (@ids)          foreach $aid (@ids)
363                  {          {
364                          $active{$aid} = 1 unless exists $active{$aid};              $active{$aid} = 1 unless exists $active{$aid};
365                  }          }
366          }      }
367          if ($maxid eq '')      if ($maxid eq '')
368          {      {
369                  print STDERR "No Sync IDs\n" unless ($quiet);          print STDERR "No Sync IDs\n" unless ($quiet);
370                  return(0);          return(0);
371          }      }
372          my $alist = join(',', keys %active);      my $alist = join(',', keys %active);
373          my $sinfo = "logid < $maxid";      my $sinfo = "logid < $maxid";
374          $sinfo .= " and logid not in ($alist)" if $alist ne '';      $sinfo .= " AND logid not in ($alist)" if $alist ne '';
375                
376          $sql = "delete from _RSERV_LOG_ where " .      $sql = "delete from _RSERV_LOG_ where " .
377                  "logtime < now() - '$howold second'::interval and $sinfo";          "logtime < now() - '$howold second'::interval AND $sinfo";
378        
379          printf "$sql\n" if $debug;      printf "$sql\n" if $debug;
380        
381          $result = $conn->exec($sql);      $result = $conn->exec($sql);
382          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
383          {      {
384                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
385                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
386                  return(-1);          return(-1);
387          }      }
388          $maxid = $result->cmdTuples;      $maxid = $result->cmdTuples;
389        
390          $result = $conn->exec("COMMIT");      $result = $conn->exec("COMMIT");
391          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
392          {      {
393                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
394                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
395                  return(-1);          return(-1);
396          }      }
397        
398          return($maxid);      return($maxid);
399  }  }
400    
401  sub ApplySnapshot  sub ApplySnapshot
402  {  {
403          my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($conn, $inpf) = @_; # (@_[0], @_[1]);
404        
405          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
406          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
407          {      {
408                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
409            $conn->exec("ROLLBACK");
410            return(-1);
411        }
412        
413        $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
414        if ($result->resultStatus ne PGRES_COMMAND_OK)
415        {
416            print STDERR $conn->errorMessage unless ($quiet);
417            $conn->exec("ROLLBACK");
418            return(-1);
419        }
420        
421        # MAP name --> oid, keyname, keynum
422        my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
423            " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
424            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
425            " AND pga.attnum = rt.key";
426        
427        $result = $conn->exec($sql);
428        if ($result->resultStatus ne PGRES_TUPLES_OK)
429        {
430            print STDERR $conn->errorMessage unless ($quiet);
431            $conn->exec("ROLLBACK");
432            return(-1);
433        }
434        %Stables = ();
435        while (@row = $result->fetchrow)
436        {
437            #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
438            push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
439        }
440    
441        my $ok = 0;
442        my $syncid = '';
443        while(<$inpf>)
444        {
445            $_ =~ s/\n//;
446            my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
447            if ($cmt ne '--')
448            {
449                printf STDERR "Invalid format\n" unless ($quiet);
450                $conn->exec("ROLLBACK");
451                return(-2);
452            }
453            if ($cmd eq 'DELETE')
454            {
455                if ($syncid eq '')
456                {
457                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
458                  $conn->exec("ROLLBACK");                  $conn->exec("ROLLBACK");
459                  return(-1);                  return(-2);
460          }              }
461                $result = DoDelete($conn, $inpf, $prm);
462          $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");              if ($result)
463          if ($result->resultStatus ne PGRES_COMMAND_OK)              {
464          {                  $conn->exec("ROLLBACK");
465                  print STDERR $conn->errorMessage unless ($quiet);                  return($result);
466                }
467            }
468            elsif ($cmd eq 'INSERT')
469            {
470                if ($syncid eq '')
471                {
472                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
473                  $conn->exec("ROLLBACK");                  $conn->exec("ROLLBACK");
474                  return(-1);                  return(-2);
475          }              }
476                $result = DoInsert($conn, $inpf, $prm);
477          # MAP name --> oid, keyname, keynum              if ($result)
478          my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .              {
479                  " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .                  $conn->exec("ROLLBACK");
480                          " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .                  return($result);
481                                  " and pga.attnum = rt.key";              }
482          $result = $conn->exec($sql);          }
483          if ($result->resultStatus ne PGRES_TUPLES_OK)          elsif ($cmd eq 'UPDATE')
484          {          {
485                if ($syncid eq '')
486                {
487                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
488                    $conn->exec("ROLLBACK");
489                    return(-2);
490                }
491                $result = DoUpdate($conn, $inpf, $prm);
492                if ($result)
493                {
494                    $conn->exec("ROLLBACK");
495                    return($result);
496                }
497            }
498            elsif ($cmd eq 'SYNCID')
499            {
500                if ($syncid ne '')
501                {
502                    printf STDERR "Second Sync ID ?!\n" unless ($quiet);
503                    $conn->exec("ROLLBACK");
504                    return(-2);
505                }
506                if ($prm !~ /^\d+$/)
507                {
508                    printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
509                    $conn->exec("ROLLBACK");
510                    return(-2);
511                }
512                $syncid = $prm;
513                
514                printf STDERR "Sync ID $syncid\n" unless ($quiet);
515                
516                $result = $conn->exec("select syncid, synctime from " .
517                                      "_RSERV_SLAVE_SYNC_ where syncid = " .
518                                      "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
519                if ($result->resultStatus ne PGRES_TUPLES_OK)
520                {
521                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $conn->errorMessage unless ($quiet);
522                  $conn->exec("ROLLBACK");                  $conn->exec("ROLLBACK");
523                  return(-1);                  return(-1);
524          }              }
525                my @row = $result->fetchrow;
526          while (@row = $result->fetchrow)              if (! defined $row[0])
527          {              {
528          #       printf "        %s      %s\n", $row[1], $row[0];                  $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".
529                  push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];                                        "(syncid, synctime) values ($syncid, now())");
530          }              }
531                elsif ($row[0] >= $prm)
532          my $ok = 0;              {
533          my $syncid = '';                  printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
         while(<$inpf>)  
         {  
                 $_ =~ s/\n//;  
                 my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);  
                 if ($cmt ne '--')  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 if ($cmd eq 'DELETE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoDelete($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'UPDATE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoUpdate($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'SYNCID')  
                 {  
                         if ($syncid ne '')  
                         {  
                                 printf STDERR "Second Sync ID ?!\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         if ($prm !~ /^\d+$/)  
                         {  
                                 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $syncid = $prm;  
   
                         printf STDERR "Sync ID $syncid\n" unless ($quiet);  
   
                         $result = $conn->exec("select syncid, synctime from " .  
                                                                   "_RSERV_SLAVE_SYNC_ where syncid = " .  
                                                                   "(select max(syncid) from _RSERV_SLAVE_SYNC_)");  
                         if ($result->resultStatus ne PGRES_TUPLES_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                         my @row = $result->fetchrow;  
                         if (! defined $row[0])  
                         {  
                                 $result = $conn->exec("insert into" .  
                                                                           " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");  
                         }  
                         elsif ($row[0] >= $prm)  
                         {  
                                 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(0);  
                         }  
                         else  
                         {  
                                 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .  
                                                                           " set syncid = $syncid, synctime = now()");  
                         }  
                         if ($result->resultStatus ne PGRES_COMMAND_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                 }  
                 elsif ($cmd eq 'OK')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 elsif ($cmd eq 'ERROR')  
                 {  
                         printf STDERR "ERROR signaled\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 else  
                 {  
                         printf STDERR "Unknown command $cmd\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
         }  
   
         if (! $ok)  
         {  
                 printf STDERR "No OK flag in input\n" unless ($quiet);  
534                  $conn->exec("ROLLBACK");                  $conn->exec("ROLLBACK");
535                  return(-2);                  return(0);
536          }              }
537                else
538          $result = $conn->exec("COMMIT");              {
539          if ($result->resultStatus ne PGRES_COMMAND_OK)                  $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
540          {                                        " set syncid = $syncid, synctime = now()");
541                }
542                if ($result->resultStatus ne PGRES_COMMAND_OK)
543                {
544                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR $conn->errorMessage unless ($quiet);
545                  $conn->exec("ROLLBACK");                  $conn->exec("ROLLBACK");
546                  return(-1);                  return(-1);
547          }              }
548            }
549          return(1);          elsif ($cmd eq 'OK')
550            {
551                $ok = 1;
552                last;
553            }
554            elsif ($cmd eq 'ERROR')
555            {
556                printf STDERR "ERROR signaled\n" unless ($quiet);
557                $conn->exec("ROLLBACK");
558                return(-2);
559            }
560            else
561            {
562                printf STDERR "Unknown command $cmd\n" unless ($quiet);
563                $conn->exec("ROLLBACK");
564                return(-2);
565            }
566        }
567        
568        if (! $ok)
569        {
570            printf STDERR "No OK flag in input\n" unless ($quiet);
571            $conn->exec("ROLLBACK");
572            return(-2);
573        }
574        
575        $result = $conn->exec("COMMIT");
576        if ($result->resultStatus ne PGRES_COMMAND_OK)
577        {
578            print STDERR $conn->errorMessage unless ($quiet);
579            $conn->exec("ROLLBACK");
580            return(-1);
581        }
582        
583        return(1);
584  }  }
585    
586  sub DoDelete  sub DoDelete
587  {  {
588          my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
   
         my $ok = 0;  
         while(<$inpf>)  
         {  
                 if ($_ !~ /\n$/)  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         return(-2);  
                 }  
                 my $key = $_;  
                 $key =~ s/\n//;  
                 if ($key eq '\.')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
   
                 my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";  
   
                 printf "$sql\n" if $debug;  
589    
590                  my $result = $conn->exec($sql);      # only delete tables that the slave wants
591                  if ($result->resultStatus ne PGRES_COMMAND_OK)      if (! defined($Stables{$tabname})) {
592                  {          print "Not configured to delete rows from table $tabname\n" unless $quiet;
593                          print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
594                          return(-1);              $istring = $_;
595                  }              $istring =~ s/\n//;
596                last if ($istring eq '\.');
597          }          }
598            return(0);
599        }
600    
601          if (! $ok)      my $ok = 0;
602        while(<$inpf>)
603        {
604            if ($_ !~ /\n$/)
605            {
606                printf STDERR "Invalid format\n" unless ($quiet);
607                return(-2);
608            }
609            my $key = $_;
610            $key =~ s/\n//;
611            if ($key eq '\.')
612          {          {
613                  printf STDERR "No end of input in DELETE section\n" unless ($quiet);              $ok = 1;
614                  return(-2);              last;
615          }          }
616            
617          return(0);          my $sql = "delete from \"$tabname\" where ".
618                "\"$Stables{$tabname}->[1]\" = '$key'";
619            
620            printf "$sql\n" if $debug;
621            
622            my $result = $conn->exec($sql);
623            if ($result->resultStatus ne PGRES_COMMAND_OK)
624            {
625                print STDERR $conn->errorMessage unless ($quiet);
626                return(-1);
627            }
628        }
629        
630        if (! $ok)
631        {
632            printf STDERR "No end of input in DELETE section\n" unless ($quiet);
633            return(-2);
634        }
635        
636        return(0);
637  }  }
638    
639    
640  sub DoUpdate  sub DoUpdate
641  {  {
642          my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
         my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;  
   
         my @CopyBuf = ();  
         my $CBufLen = 0;  
         my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy  
   
         my $sql = "select attnum, attname from pg_attribute" .  
                 " where attrelid = $Stables{$tabname}->[0] and attnum > 0";  
643    
644          my $result = $conn->exec($sql);      # only update the tables that the slave wants
645          if ($result->resultStatus ne PGRES_TUPLES_OK)      if (! defined($Stables{$tabname})) {
646          {          print "Not configured to update rows from table $tabname\n" unless $quiet;
647                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
648                  return(-1);              $istring = $_;
649                $istring =~ s/\n//;
650                last if ($istring eq '\.');
651          }          }
652            return(0);
653        }
654    
655          my @anames = ();      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
656          while (@row = $result->fetchrow)      
657        my @CopyBuf = ();
658        my $CBufLen = 0;
659        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
660    
661        my $sql = "select attnum, attname from pg_attribute" .
662            " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
663        
664        my $result = $conn->exec($sql);
665        if ($result->resultStatus ne PGRES_TUPLES_OK)
666        {
667            print STDERR $conn->errorMessage unless ($quiet);
668            return(-1);
669        }
670        
671        my @anames = ();
672        while (@row = $result->fetchrow)
673        {
674            $anames[$row[0]] = $row[1];
675        }
676        
677        my $istring;
678        my $ok = 0;
679        while(<$inpf>)
680        {
681            if ($_ !~ /\n$/)
682            {
683                printf STDERR "Invalid format\n" unless ($quiet);
684                return(-2);
685            }
686            $istring = $_;
687            $istring =~ s/\n//;
688            if ($istring eq '\.')
689            {
690                $ok = 1;
691                last;
692            }
693            my @vals = split(/      /, $istring);
694            if ($oidkey)
695            {
696                if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
697                {
698                    printf STDERR "Invalid OID\n" unless ($quiet);
699                    return(-2);
700                }
701                $oidkey = $vals[0];
702            }
703            else
704          {          {
705                  $anames[$row[0]] = $row[1];              unshift @vals, '';
706          }          }
707            
708          my $istring;          $sql = "update \"$tabname\" set ";
709          my $ok = 0;          my $ocnt = 0;
710          while(<$inpf>)          for (my $i = 1; $i <= $#anames; $i++)
711            {
712                if ($vals[$i] eq '\N')
713                {
714                    if ($i == $Stables{$tabname}->[2])
715                    {
716                        printf STDERR "NULL key\n" unless ($quiet);
717                        return(-2);
718                    }
719                    $vals[$i] = 'null';
720                }
721                else
722                {
723                    $vals[$i] = "'" . $vals[$i] . "'";
724                    next if $i == $Stables{$tabname}->[2];
725                }
726                $ocnt++;
727                $sql .= ', ' if $ocnt > 1;
728                $sql .= "\"$anames[$i]\" = $vals[$i]";
729            }
730            if ($oidkey)
731          {          {
732                  if ($_ !~ /\n$/)              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         return(-2);  
                 }  
                 $istring = $_;  
                 $istring =~ s/\n//;  
                 if ($istring eq '\.')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 my @vals = split(/      /, $istring);  
                 if ($oidkey)  
                 {  
                         if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)  
                         {  
                                 printf STDERR "Invalid OID\n" unless ($quiet);  
                                 return(-2);  
                         }  
                         $oidkey = $vals[0];  
                 }  
                 else  
                 {  
                         unshift @vals, '';  
                 }  
   
                 $sql = "update $tabname set ";  
                 my $ocnt = 0;  
                 for (my $i = 1; $i <= $#anames; $i++)  
                 {  
                         if ($vals[$i] eq '\N')  
                         {  
                                 if ($i == $Stables{$tabname}->[2])  
                                 {  
                                         printf STDERR "NULL key\n" unless ($quiet);  
                                         return(-2);  
                                 }  
                                 $vals[$i] = 'null';  
                         }  
                         else  
                         {  
                                 $vals[$i] = "'" . $vals[$i] . "'";  
                                 next if $i == $Stables{$tabname}->[2];  
                         }  
                         $ocnt++;  
                         $sql .= ', ' if $ocnt > 1;  
                         $sql .= "$anames[$i] = $vals[$i]";  
                 }  
                 if ($oidkey)  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $oidkey";  
                 }  
                 else  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";  
                 }  
   
                 printf "$sql\n" if $debug;  
   
                 $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_COMMAND_OK)  
                 {  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
                 next if $result->cmdTuples == 1;        # updated  
   
                 if ($result->cmdTuples > 1)  
                 {  
                         printf STDERR "Duplicate keys\n" unless ($quiet);  
                         return(-2);  
                 }  
   
                 # no key - copy  
                 push @CopyBuf, "$istring\n";  
                 $CBufLen += length($istring);  
   
                 if ($CBufLen >= $CBufMax)  
                 {  
                         $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);  
                         return($result) if $result;  
                         @CopyBuf = ();  
                         $CBufLen = 0;  
                 }  
733          }          }
734            else
         if (! $ok)  
735          {          {
736                  printf STDERR "No end of input in UPDATE section\n" unless ($quiet);              $sql .= " where \"$Stables{$tabname}->[1]\" = ".
737                  return(-2);                  $vals[$Stables{$tabname}->[2]];
738          }          }
739            
740          if ($CBufLen)          printf "$sql\n" if $debug;
741            
742            $result = $conn->exec($sql);
743            
744            if ($result->resultStatus ne PGRES_COMMAND_OK)
745          {          {
746                  $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              print STDERR $conn->errorMessage unless ($quiet);
747                  return($result) if $result;              return(-1);
748          }          }
749            next if $result->cmdTuples == 1;        # updated
750            
751            if ($result->cmdTuples > 1)
752            {
753                printf STDERR "Duplicate keys\n" unless ($quiet);
754                return(-2);
755            }
756    
757            # no key - copy
758            push @CopyBuf, "$istring\n";
759            $CBufLen += length($istring);
760            
761            if ($CBufLen >= $CBufMax)
762            {
763                $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
764                return($result) if $result;
765                @CopyBuf = ();
766                $CBufLen = 0;
767            }
768        }
769        
770        if (! $ok)
771        {
772            printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
773            return(-2);
774        }
775        
776        if ($CBufLen)
777        {
778            print "@CopyBuf\n" if $debug;
779            $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
780            return($result) if $result;
781        }
782    
783          return(0);      return(0);
784  }  }
785    
786    sub DoInsert
 sub DoCopy  
787  {  {
788          my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
789    
790          my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .      # only insert rows into tables that the slave wants
791  "FROM STDIN";      if (! defined($Stables{$tabname})) {
792          my $result = $conn->exec($sql);          print "Not configured to insert rows from table $tabname\n" unless $quiet;
793          if ($result->resultStatus ne PGRES_COPY_IN)          while (<$inpf>) {
794          {              $istring = $_;
795                  print STDERR $conn->errorMessage unless ($quiet);              $istring =~ s/\n//;
796                  return(-1);              last if ($istring eq '\.');
797          }          }
798            return(0);
799        }
800    
801          foreach $str (@{$CBuf})      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
802          {      
803                  $conn->putline($str);      my @CopyBuf = ();
804          }      my $CBufLen = 0;
805        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
806          $conn->putline("\\.\n");      
807        my $istring;
808          if ($conn->endcopy)      my $ok = 0;
809        while(<$inpf>)
810        {
811            if ($_ !~ /\n$/)
812            {
813                printf STDERR "Invalid format\n" unless ($quiet);
814                return(-2);
815            }
816            $istring = $_;
817            $istring =~ s/\n//;
818            if ($istring eq '\.')
819            {
820                $ok = 1;
821                last;
822            }
823    
824            # no key - copy
825            push @CopyBuf, "$istring\n";
826            $CBufLen += length($istring);
827            
828            if ($CBufLen >= $CBufMax)
829          {          {
830                  print STDERR $conn->errorMessage unless ($quiet);              $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
831                  return(-1);              return($result) if $result;
832          }              @CopyBuf = ();
833                $CBufLen = 0;
834            }
835        }
836        
837        if (! $ok)
838        {
839            printf STDERR "No end of input in INSERT section\n" unless ($quiet);
840            return(-2);
841        }
842        
843        if ($CBufLen)
844        {
845            print "@CopyBuf\n" if $debug;
846            $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
847            return($result) if $result;
848        }
849        
850        return(0);
851    }
852    
         return(0);  
853    
854    sub DoCopy
855    {
856        my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
857        
858        my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
859            "FROM STDIN";
860        my $result = $conn->exec($sql);
861        if ($result->resultStatus ne PGRES_COPY_IN)
862        {
863            print STDERR $conn->errorMessage unless ($quiet);
864            return(-1);
865        }
866        
867        foreach $str (@{$CBuf})
868        {
869            $conn->putline($str);
870        }
871        
872        $conn->putline("\\.\n");
873        
874        if ($conn->endcopy)
875        {
876            print STDERR $conn->errorMessage unless ($quiet);
877            return(-1);
878        }
879        
880        return(0);
881  }  }
882    
883    
# Line 679  sub DoCopy Line 886  sub DoCopy
886  #  #
887  sub GetSyncID  sub GetSyncID
888  {  {
889          my ($conn) = @_; # (@_[0]);      my ($conn) = @_; # (@_[0]);
890                
891          my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
892          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
893          {      {
894                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
895                  return(-1);          return(-1);
896          }      }
897          my @row = $result->fetchrow;      my @row = $result->fetchrow;
898          return(undef) unless defined $row[0];   # null      return(undef) unless defined $row[0];       # null
899          return($row[0]);      return($row[0]);
900  }  }
901    
902  #  #
# Line 697  sub GetSyncID Line 904  sub GetSyncID
904  #  #
905  sub SyncSyncID  sub SyncSyncID
906  {  {
907          my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
908                
909          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
910          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
911          {      {
912                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
913                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
914                  return(-1);          return(-1);
915          }      }
916        
917          $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
918                                                    " where server = $server and syncid = $syncid" .                            " where server = $server AND syncid = $syncid" .
919                                                    " for update");                            " for update");
920          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
921          {      {
922                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
923                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
924                  return(-1);          return(-1);
925          }      }
926          my @row = $result->fetchrow;      my @row = $result->fetchrow;
927          if (! defined $row[0])      if (! defined $row[0])
928          {      {
929                  printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
930                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
931                  return(0);          return(0);
932          }      }
933          if ($row[1] > 0)      if ($row[1] > 0)
934          {      {
935                  printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);          printf STDERR "SyncID $syncid for server ".
936                  $conn->exec("ROLLBACK");              "$server already updated\n" unless ($quiet);
937                  return(0);          $conn->exec("ROLLBACK");
938          }          return(0);
939          $result = $conn->exec("update _RSERV_SYNC_" .      }
940                                                    " set synctime = now(), status = 1" .      $result = $conn->exec("update _RSERV_SYNC_" .
941                                                    " where server = $server and syncid = $syncid");                            " set synctime = now(), status = 1" .
942          if ($result->resultStatus ne PGRES_COMMAND_OK)                            " where server = $server AND syncid = $syncid");
943          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
944                  print STDERR $conn->errorMessage unless ($quiet);      {
945                  $conn->exec("ROLLBACK");          print STDERR $conn->errorMessage unless ($quiet);
946                  return(-1);          $conn->exec("ROLLBACK");
947          }          return(-1);
948          $result = $conn->exec("delete from _RSERV_SYNC_" .      }
949                                                    " where server = $server and syncid < $syncid");      $result = $conn->exec("delete from _RSERV_SYNC_" .
950          if ($result->resultStatus ne PGRES_COMMAND_OK)                            " where server = $server AND syncid < $syncid");
951          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
952                  print STDERR $conn->errorMessage unless ($quiet);      {
953                  $conn->exec("ROLLBACK");          print STDERR $conn->errorMessage unless ($quiet);
954                  return(-1);          $conn->exec("ROLLBACK");
955          }          return(-1);
956        }
957          $result = $conn->exec("COMMIT");      
958          if ($result->resultStatus ne PGRES_COMMAND_OK)      $result = $conn->exec("COMMIT");
959          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
960                  print STDERR $conn->errorMessage unless ($quiet);      {
961                  $conn->exec("ROLLBACK");          print STDERR $conn->errorMessage unless ($quiet);
962                  return(-1);          $conn->exec("ROLLBACK");
963          }          return(-1);
964        }
965          return(1);      
966        return(1);
967  }  }
968    
969  1;  1;

Legend:
Removed from v.1.1  
changed lines
  Added in v.1.2

  ViewVC Help
Powered by ViewVC 1.1.26