/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1.1.1 by dpavlin, Wed Dec 20 17:22:35 2000 UTC revision 1.8 by dpavlin, Tue Oct 28 20:16:39 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10  @EXPORT_OK = qw();  @EXPORT_OK = qw();
11    use strict;
12  use Pg;  use Pg;
13    
14  $debug = 0;  my $debug = 0;
15  $quiet = 1;  my $quiet = 1;
16    
17  my %Mtables = ();  my %Mtables = ();
18  my %Stables = ();  my %Stables = ();
19    
20  sub PrepareSnapshot  sub GetSlaveId
21  {  {
22          my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
   
         my $result = $conn->exec("BEGIN");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $result = $conn->exec("set transaction isolation level serializable");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
23    
24          # MAP oid --> tabname, keyname      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25          $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .                            " host='$slaveHost' AND dbase='$slaveDB'");
                                                   " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .  
                                                   " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                                   " and pga.attnum = rt.key");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
26    
27          my @row;      if ($result->resultStatus ne PGRES_TUPLES_OK)
28          while (@row = $result->fetchrow)      {
29          {          print STDERR $mconn->errorMessage unless ($quiet);
30          #       printf "$row[0], $row[1], $row[2]\n";          return(-1);
31                  push @{$Mtables{$row[0]}}, $row[1], $row[2];      }
32          }      
33        if ($result->cmdTuples && $result->cmdTuples > 1)
34        {
35            printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
36            return(-2);
37        }
38    
39        my @row = $result->fetchrow;
40        
41        return $row[0];
42    }
43    
44          # Read last succeeded sync  sub PrepareSnapshot
45          $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .  {
46                  " where server = $server and syncid = (select max(syncid) from" .      my ($mconn, $sconn, $outf, $sserver, $onlytables) = @_;
                         " _RSERV_SYNC_ where server = $server and status > 0)";  
47    
48        # first, we must know for wich tables the slave subscribed
49        my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
50        if ($result->resultStatus ne PGRES_TUPLES_OK)
51        {
52            print STDERR $mconn->errorMessage unless ($quiet);
53            return(-1);
54        }
55        
56        my @row;
57        while (@row = $result->fetchrow) {
58            $Stables{$row[0]} = 1;
59        }
60        
61        $result = $mconn->exec("BEGIN");
62        if ($result->resultStatus ne PGRES_COMMAND_OK)
63        {
64            print STDERR $mconn->errorMessage unless ($quiet);
65            $mconn->exec("ROLLBACK");
66            return(-1);
67        }
68        $result = $mconn->exec("set transaction isolation level serializable");
69        if ($result->resultStatus ne PGRES_COMMAND_OK)
70        {
71            print STDERR $mconn->errorMessage unless ($quiet);
72            $mconn->exec("ROLLBACK");
73            return(-1);
74        }
75        
76        # MAP oid --> tabname, keyname, key_type
77        $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
78                              " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
79                              ", pg_type pgt".
80                              " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
81                              " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
82        if ($result->resultStatus ne PGRES_TUPLES_OK)
83        {
84            print STDERR $mconn->errorMessage unless ($quiet);
85            $mconn->exec("ROLLBACK");
86            return(-1);
87        }
88        
89        while (@row = $result->fetchrow)
90        {
91            #       printf "$row[0], $row[1], $row[2]\n";
92                    if (ref($onlytables) eq 'HASH') {
93                            next unless (exists $onlytables->{$row[1]});
94                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
95                    }
96            push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
97        }
98        
99        # Read last succeeded sync
100        my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
101            " where server = $sserver AND syncid = (select max(syncid) from" .
102            " _RSERV_SYNC_ where server = $sserver AND status > 0)";
103        
104        printf "$sql\n" if $debug;
105    
106        $result = $mconn->exec($sql);
107        if ($result->resultStatus ne PGRES_TUPLES_OK)
108        {
109            print STDERR $mconn->errorMessage unless ($quiet);
110            $mconn->exec("ROLLBACK");
111            return(-1);
112        }
113        
114        my @lastsync = $result->fetchrow;
115        
116        my $sinfo = "";
117        if (@lastsync && $lastsync[3] ne '')        # sync info
118        {
119            $sinfo = "and (l.logid >= $lastsync[3]";
120            $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
121            $sinfo .= ")";
122        }
123        
124        my $havedeal = 0;
125        
126        # DELETED rows
127        $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
128            " where l.delete = 1 $sinfo order by l.reloid";
129        
130        printf "$sql\n" if $debug;
131        
132        $result = $mconn->exec($sql);
133        if ($result->resultStatus ne PGRES_TUPLES_OK)
134        {
135            print STDERR $mconn->errorMessage unless ($quiet);
136            $mconn->exec("ROLLBACK");
137            return(-1);
138        }
139        
140        my $lastoid = -1;
141        while (@row = $result->fetchrow)
142        {
143            next unless exists $Mtables{$row[0]};
144            next unless exists $Stables{$Mtables{$row[0]}[0]};
145    
146            if ($lastoid != $row[0])
147            {
148                if ($lastoid == -1)
149                {
150                    my $syncid = GetSYNCID($mconn, $outf);
151                    return($syncid) if $syncid < 0;
152                    $havedeal = 1;
153                }
154                else
155                {
156                    printf $outf "\\.\n";
157                }
158                printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
159                $lastoid = $row[0];
160            }
161            if (! defined $row[1])
162            {
163                print STDERR "NULL key\n" unless ($quiet);
164                $mconn->exec("ROLLBACK");
165                return(-2);
166            }
167            printf $outf "%s\n", OutputValue($row[1]);
168        }
169        printf $outf "\\.\n" if ($lastoid != -1);
170        
171        # UPDATED rows
172        
173        my ($taboid, $tabname, $tabkey);
174        foreach $taboid (keys %Mtables)
175        {
176            my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
177            next unless exists $Stables{$tabname};
178    
179            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
180    
181            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
182              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
183              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
184            
185          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
186            
187          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
188          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
189          {          {
190                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
191                  $conn->exec("ROLLBACK");              print STDERR $mconn->errorMessage unless ($quiet);
192                  return(-1);              $mconn->exec("ROLLBACK");
193                return(-1);
194            }
195            next if $result->ntuples <= 0;
196            if (! $havedeal)
197            {
198                my $syncid = GetSYNCID($mconn, $outf);
199                return($syncid) if $syncid < 0;
200                $havedeal = 1;
201          }          }
202            printf $outf "-- UPDATE $tabname\n";
203          my @lastsync = $result->fetchrow;          printf "-- UPDATE $tabname\n" if $debug;
204            while (@row = $result->fetchrow)
         my $sinfo = "";  
         if ($lastsync[3] ne '') # sync info  
205          {          {
206                  $sinfo = "and (l.logid >= $lastsync[3]";              for (my $i = 0; $i <= $#row; $i++)
207                  $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';              {
208                  $sinfo .= ")";                  printf $outf "  " if $i;
209          }                  printf "        " if $i && $debug;
210                    printf $outf "%s", OutputValue($row[$i]);
211          my $havedeal = 0;                  printf "%s", OutputValue($row[$i]) if $debug;;
212                }
213          # DELETED rows              printf $outf "\n";
214          $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .              printf "\n" if $debug;
215                  " where l.deleted = 1 $sinfo order by l.reloid";          }
216            printf $outf "\\.\n";
217            printf "\\.\n" if $debug;;
218        }
219    
220        # INSERTED rows
221    
222        foreach $taboid (keys %Mtables)
223        {
224            my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
225            next unless exists $Stables{$tabname};
226    
227            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
228    
229            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
230              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
231              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
232            
233          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
234            
235          $result = $conn->exec($sql);          $result = $mconn->exec($sql);
236          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
237          {          {
238                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
239                  $conn->exec("ROLLBACK");              print STDERR $mconn->errorMessage unless ($quiet);
240                  return(-1);              $mconn->exec("ROLLBACK");
241                return(-1);
242            }
243            next if $result->ntuples <= 0;
244            if (! $havedeal)
245            {
246                my $syncid = GetSYNCID($mconn, $outf);
247                return($syncid) if $syncid < 0;
248                $havedeal = 1;
249          }          }
250            printf $outf "-- INSERT $tabname\n";
251          $lastoid = '';          printf "-- INSERT $tabname\n" if $debug;
252          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
253          {          {
254                  next unless exists $Mtables{$row[0]};              for (my $i = 0; $i <= $#row; $i++)
255                  if ($lastoid != $row[0])              {
256                  {                  printf $outf "  " if $i;
257                          if ($lastoid eq '')                  printf "        " if $i && $debug;
258                          {                  printf $outf "%s", OutputValue($row[$i]);
259                                  my $syncid = GetSYNCID($conn, $outf);                  printf "%s", OutputValue($row[$i]) if $debug;;
260                                  return($syncid) if $syncid < 0;              }
261                                  $havedeal = 1;              printf $outf "\n";
262                          }              printf "\n" if $debug;
263                          else          }
264                          {          printf $outf "\\.\n";
265                                  printf $outf "\\.\n";          printf "\\.\n" if $debug;;
266                          }      }
267                          printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";      
268                          $lastoid = $row[0];      
269                  }      unless ($havedeal)
270                  if (! defined $row[1])      {
271                  {          $mconn->exec("ROLLBACK");
272                          print STDERR "NULL key\n" unless ($quiet);          return(0);
273                          $conn->exec("ROLLBACK");      }
274                          return(-2);      
275                  }      # Remember this snapshot info
276                  printf $outf "%s\n", OutputValue($row[1]);      $result = $mconn->exec("select _rserv_sync_($sserver)");
277          }      if ($result->resultStatus ne PGRES_TUPLES_OK)
278          printf $outf "\\.\n" if $lastoid ne '';      {
279            printf $outf "-- ERROR\n";
280          # UPDATED rows          print STDERR $mconn->errorMessage unless ($quiet);
281            $mconn->exec("ROLLBACK");
282          my ($taboid, $tabname, $tabkey);          return(-1);
283          foreach $taboid (keys %Mtables)      }
284          {      
285                  ($tabname, $tabkey) = @{$Mtables{$taboid}};      $result = $mconn->exec("COMMIT");
286                  my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';      if ($result->resultStatus ne PGRES_COMMAND_OK)
287                  $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .      {
288                          " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .          printf $outf "-- ERROR\n";
289                                  " and l.key = _$tabname.${tabkey}::text";          print STDERR $mconn->errorMessage unless ($quiet);
290            $mconn->exec("ROLLBACK");
291                  printf "$sql\n" if $debug;          return(-1);
292        }
293                  $result = $conn->exec($sql);      printf $outf "-- OK\n";
294                  if ($result->resultStatus ne PGRES_TUPLES_OK)      printf "-- OK\n" if $debug;
295                  {      
296                          printf $outf "-- ERROR\n" if $havedeal;      return(1);
297                          print STDERR $conn->errorMessage unless ($quiet);      
                         $conn->exec("ROLLBACK");  
                         return(-1);  
                 }  
                 next if $result->ntuples <= 0;  
                 if (! $havedeal)  
                 {  
                         my $syncid = GetSYNCID($conn, $outf);  
                         return($syncid) if $syncid < 0;  
                         $havedeal = 1;  
                 }  
                 printf $outf "-- UPDATE $tabname\n";  
                 while (@row = $result->fetchrow)  
                 {  
                         for ($i = 0; $i <= $#row; $i++)  
                         {  
                                 printf $outf "  " if $i;  
                                 printf $outf "%s", OutputValue($row[$i]);  
                         }  
                         printf $outf "\n";  
                 }  
                 printf $outf "\\.\n";  
         }  
   
         unless ($havedeal)  
         {  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
   
         # Remember this snapshot info  
         $result = $conn->exec("select _rserv_sync_($server)");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 printf $outf "-- ERROR\n";  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
   
         $result = $conn->exec("COMMIT");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 printf $outf "-- ERROR\n";  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         printf $outf "-- OK\n";  
   
         return(1);  
   
298  }  }
299    
300  sub OutputValue  sub OutputValue
# Line 213  sub OutputValue Line 314  sub OutputValue
314  # Get syncid for new snapshot  # Get syncid for new snapshot
315  sub GetSYNCID  sub GetSYNCID
316  {  {
317          my ($conn, $outf) = @_; # (@_[0], @_[1]);      my ($conn, $outf) = @_; # (@_[0], @_[1]);
318        
319          my $result = $conn->exec("select nextval('_rserv_sync_seq_')");      my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
320          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
321          {      {
322                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
323                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
324                  return(-1);          return(-1);
325          }      }
326        
327          my @row = $result->fetchrow;      my @row = $result->fetchrow;
328        
329          printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
330          return($row[0]);      printf "-- SYNCID $row[0]\n" if $debug;
331        return($row[0]);
332  }  }
333    
334    
335  sub CleanLog  sub CleanLog
336  {  {
337          my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
338        
339          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
340          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
341          {      {
342                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
343                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
344                  return(-1);          return(-1);
345          }      }
346        
347        my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
348            " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
349            " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
350        
351        printf "$sql\n" if $debug;
352        
353        $result = $conn->exec($sql);
354        if ($result->resultStatus ne PGRES_TUPLES_OK)
355        {
356            print STDERR $conn->errorMessage unless ($quiet);
357            return(-1);
358        }
359        my $maxid = '';
360        my %active = ();
361        while (my @row = $result->fetchrow)
362        {
363            $maxid = $row[0] if $maxid eq '';
364            last if $row[0] > $maxid;
365            my @ids = split(/[      ]+,[    ]+/, $row[1]);
366            foreach my $aid (@ids)
367            {
368                $active{$aid} = 1 unless exists $active{$aid};
369            }
370        }
371        if ($maxid eq '')
372        {
373            print STDERR "No Sync IDs\n" unless ($quiet);
374            return(0);
375        }
376        my $alist = join(',', keys %active);
377        my $sinfo = "logid < $maxid";
378        $sinfo .= " AND logid not in ($alist)" if $alist ne '';
379        #if (ref($onlytables) eq 'HASH') {
380        #   foreach my $onlytable (keys %{$onlytables}) {
381        #           $sinfo
382        #   }
383        #}
384        $sql = "delete from _RSERV_LOG_ where " .
385            "logtime < now() - '$howold second'::interval AND $sinfo";
386        
387        printf "$sql\n" if $debug;
388        
389        $result = $conn->exec($sql);
390        if ($result->resultStatus ne PGRES_COMMAND_OK)
391        {
392            print STDERR $conn->errorMessage unless ($quiet);
393            $conn->exec("ROLLBACK");
394            return(-1);
395        }
396        $maxid = $result->cmdTuples;
397        
398        $result = $conn->exec("COMMIT");
399        if ($result->resultStatus ne PGRES_COMMAND_OK)
400        {
401            print STDERR $conn->errorMessage unless ($quiet);
402            $conn->exec("ROLLBACK");
403            return(-1);
404        }
405        
406        return($maxid);
407    }
408    
409          my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .  sub ApplySnapshot
410                  " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .  {
411                          " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
412        
413        my $result = $sconn->exec("BEGIN");
414        if ($result->resultStatus ne PGRES_COMMAND_OK)
415        {
416            print STDERR $sconn->errorMessage unless ($quiet);
417            $sconn->exec("ROLLBACK");
418            return(-1);
419        }
420        
421        $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
422        if ($result->resultStatus ne PGRES_COMMAND_OK)
423        {
424            print STDERR $sconn->errorMessage unless ($quiet);
425            $sconn->exec("ROLLBACK");
426            return(-1);
427        }
428        
429        # MAP name --> oid, keyname, keynum
430        my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
431            " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
432            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
433            " AND pga.attnum = rt.key";
434        
435        $result = $sconn->exec($sql);
436        if ($result->resultStatus ne PGRES_TUPLES_OK)
437        {
438            print STDERR $sconn->errorMessage unless ($quiet);
439            $sconn->exec("ROLLBACK");
440            return(-1);
441        }
442        %Stables = ();
443        while (my @row = $result->fetchrow)
444        {
445            #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
446                    if (ref($onlytables) eq 'HASH') {
447                            next unless (exists $onlytables->{$row[1]});
448                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
449                    }
450            push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
451        }
452    
453        my $ok = 0;
454        my $syncid = '';
455        while(<$inpf>)
456        {
457            $_ =~ s/\n//;
458            my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
459            if ($cmt ne '--')
460            {
461                printf STDERR "Invalid format\n" unless ($quiet);
462                $sconn->exec("ROLLBACK");
463                return(-2);
464            }
465            if ($cmd eq 'DELETE')
466            {
467                if ($syncid eq '')
468                {
469                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
470                    $sconn->exec("ROLLBACK");
471                    return(-2);
472                }
473                $result = DoDelete($sconn, $inpf, $prm);
474                if ($result)
475                {
476                    $sconn->exec("ROLLBACK");
477                    return($result);
478                }
479            }
480            elsif ($cmd eq 'INSERT')
481            {
482                if ($syncid eq '')
483                {
484                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
485                    $sconn->exec("ROLLBACK");
486                    return(-2);
487                }
488                $result = DoInsert($sconn, $inpf, $prm);
489                if ($result)
490                {
491                    $sconn->exec("ROLLBACK");
492                    return($result);
493                }
494            }
495            elsif ($cmd eq 'UPDATE')
496            {
497                if ($syncid eq '')
498                {
499                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
500                    $sconn->exec("ROLLBACK");
501                    return(-2);
502                }
503                $result = DoUpdate($sconn, $inpf, $prm);
504                if ($result)
505                {
506                    $sconn->exec("ROLLBACK");
507                    return($result);
508                }
509            }
510            elsif ($cmd eq 'SYNCID')
511            {
512                if ($syncid ne '')
513                {
514                    printf STDERR "Second Sync ID ?!\n" unless ($quiet);
515                    $sconn->exec("ROLLBACK");
516                    return(-2);
517                }
518                if ($prm !~ /^\d+$/)
519                {
520                    printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
521                    $sconn->exec("ROLLBACK");
522                    return(-2);
523                }
524                $syncid = $prm;
525                
526                printf STDERR "Sync ID $syncid\n" unless ($quiet);
527                
528                $result = $sconn->exec("select syncid, synctime from " .
529                                      "_RSERV_SLAVE_SYNC_ where syncid = " .
530                                      "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
531                if ($result->resultStatus ne PGRES_TUPLES_OK)
532                {
533                    print STDERR $sconn->errorMessage unless ($quiet);
534                    $sconn->exec("ROLLBACK");
535                    return(-1);
536                }
537                my @row = $result->fetchrow;
538                if (! defined $row[0])
539                {
540                    $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
541                                          "(syncid, synctime) values ($syncid, now())");
542                }
543                elsif ($row[0] >= $prm)
544                {
545                    printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
546                    $sconn->exec("ROLLBACK");
547                    return(0);
548                }
549                else
550                {
551                    $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
552                                          " set syncid = $syncid, synctime = now()");
553                }
554                if ($result->resultStatus ne PGRES_COMMAND_OK)
555                {
556                    print STDERR $sconn->errorMessage unless ($quiet);
557                    $sconn->exec("ROLLBACK");
558                    return(-1);
559                }
560            }
561            elsif ($cmd eq 'OK')
562            {
563                $ok = 1;
564                last;
565            }
566            elsif ($cmd eq 'ERROR')
567            {
568                printf STDERR "ERROR signaled\n" unless ($quiet);
569                $sconn->exec("ROLLBACK");
570                return(-2);
571            }
572            else
573            {
574                printf STDERR "Unknown command $cmd\n" unless ($quiet);
575                $sconn->exec("ROLLBACK");
576                return(-2);
577            }
578        }
579        
580        if (! $ok)
581        {
582            printf STDERR "No OK flag in input\n" unless ($quiet);
583            $sconn->exec("ROLLBACK");
584            return(-2);
585        }
586        
587        $result = $sconn->exec("COMMIT");
588        if ($result->resultStatus ne PGRES_COMMAND_OK)
589        {
590            print STDERR $sconn->errorMessage unless ($quiet);
591            $sconn->exec("ROLLBACK");
592            return(-1);
593        }
594        
595        return(1);
596    }
597    
598          printf "$sql\n" if $debug;  sub DoDelete
599    {
600        my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
601    
602          $result = $conn->exec($sql);      # only delete tables that the slave wants
603          if ($result->resultStatus ne PGRES_TUPLES_OK)      if (! defined($Stables{$tabname})) {
604          {          print "Not configured to delete rows from table $tabname\n" unless $quiet;
605                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
606                  return(-1);              my $istring = $_;
607                $istring =~ s/\n//;
608                last if ($istring eq '\.');
609          }          }
610          my $maxid = '';          return(0);
611          my %active = ();      }
612          while (my @row = $result->fetchrow)  
613          {      my $ok = 0;
614                  $maxid = $row[0] if $maxid eq '';      while(<$inpf>)
615                  last if $row[0] > $maxid;      {
616                  my @ids = split(/[      ]+,[    ]+/, $row[1]);          if ($_ !~ /\n$/)
617                  foreach $aid (@ids)          {
618                  {              printf STDERR "Invalid format\n" unless ($quiet);
619                          $active{$aid} = 1 unless exists $active{$aid};              return(-2);
620                  }          }
621          }          my $key = $_;
622          if ($maxid eq '')          $key =~ s/\n//;
623            if ($key eq '\.')
624          {          {
625                  print STDERR "No Sync IDs\n" unless ($quiet);              $ok = 1;
626                  return(0);              last;
627          }          }
         my $alist = join(',', keys %active);  
         my $sinfo = "logid < $maxid";  
         $sinfo .= " and logid not in ($alist)" if $alist ne '';  
628                    
629          $sql = "delete from _RSERV_LOG_ where " .          my $sql = "delete from \"$tabname\" where ".
630                  "logtime < now() - '$howold second'::interval and $sinfo";              "\"$Stables{$tabname}->[1]\" = '$key'";
631            
632          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
633            
634          $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $maxid = $result->cmdTuples;  
   
         $result = $conn->exec("COMMIT");  
635          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
636          {          {
637                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
638                  $conn->exec("ROLLBACK");              return(-1);
                 return(-1);  
639          }          }
640        }
641          return($maxid);      
642        if (! $ok)
643        {
644            printf STDERR "No end of input in DELETE section\n" unless ($quiet);
645            return(-2);
646        }
647        
648        return(0);
649  }  }
650    
651  sub ApplySnapshot  
652    sub DoUpdate
653  {  {
654          my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
655    
656          my $result = $conn->exec("BEGIN");      # only update the tables that the slave wants
657          if ($result->resultStatus ne PGRES_COMMAND_OK)      if (! defined($Stables{$tabname})) {
658          {          print "Not configured to update rows from table $tabname\n" unless $quiet;
659                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
660                  $conn->exec("ROLLBACK");              my $istring = $_;
661                  return(-1);              $istring =~ s/\n//;
662                last if ($istring eq '\.');
663          }          }
664            return(0);
665        }
666    
667          $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
668          if ($result->resultStatus ne PGRES_COMMAND_OK)      
669          {      my @CopyBuf = ();
670                  print STDERR $conn->errorMessage unless ($quiet);      my $CBufLen = 0;
671                  $conn->exec("ROLLBACK");      my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
672                  return(-1);  
673        my $sql = "select attnum, attname from pg_attribute" .
674            " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
675        
676        my $result = $sconn->exec($sql);
677        if ($result->resultStatus ne PGRES_TUPLES_OK)
678        {
679            print STDERR $sconn->errorMessage unless ($quiet);
680            return(-1);
681        }
682        
683        my @anames = ();
684        while (my @row = $result->fetchrow)
685        {
686            $anames[$row[0]] = $row[1];
687        }
688        
689        my $istring;
690        my $ok = 0;
691        while(<$inpf>)
692        {
693            if ($_ !~ /\n$/)
694            {
695                printf STDERR "Invalid format\n" unless ($quiet);
696                return(-2);
697            }
698            $istring = $_;
699            $istring =~ s/\n//;
700            if ($istring eq '\.')
701            {
702                $ok = 1;
703                last;
704            }
705            my @vals = split(/      /, $istring);
706            if ($oidkey)
707            {
708                if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
709                {
710                    printf STDERR "Invalid OID\n" unless ($quiet);
711                    return(-2);
712                }
713                $oidkey = $vals[0];
714          }          }
715            else
         # MAP name --> oid, keyname, keynum  
         my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .  
                 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .  
                         " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                 " and pga.attnum = rt.key";  
         $result = $conn->exec($sql);  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
716          {          {
717                  print STDERR $conn->errorMessage unless ($quiet);              unshift @vals, '';
                 $conn->exec("ROLLBACK");  
                 return(-1);  
718          }          }
719            
720          while (@row = $result->fetchrow)          $sql = "update \"$tabname\" set ";
721          {          my $ocnt = 0;
722          #       printf "        %s      %s\n", $row[1], $row[0];          for (my $i = 1; $i <= $#anames; $i++)
723                  push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          {
724                if ($vals[$i] eq '\N')
725                {
726                    if ($i == $Stables{$tabname}->[2])
727                    {
728                        printf STDERR "NULL key\n" unless ($quiet);
729                        return(-2);
730                    }
731                    $vals[$i] = 'null';
732                }
733                else
734                {
735                    $vals[$i] = "'" . $vals[$i] . "'";
736                    next if $i == $Stables{$tabname}->[2];
737                }
738                $ocnt++;
739                $sql .= ', ' if $ocnt > 1;
740                $sql .= "\"$anames[$i]\" = $vals[$i]";
741          }          }
742            if ($oidkey)
         my $ok = 0;  
         my $syncid = '';  
         while(<$inpf>)  
743          {          {
744                  $_ =~ s/\n//;              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
                 my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);  
                 if ($cmt ne '--')  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 if ($cmd eq 'DELETE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoDelete($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'UPDATE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoUpdate($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'SYNCID')  
                 {  
                         if ($syncid ne '')  
                         {  
                                 printf STDERR "Second Sync ID ?!\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         if ($prm !~ /^\d+$/)  
                         {  
                                 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $syncid = $prm;  
   
                         printf STDERR "Sync ID $syncid\n" unless ($quiet);  
   
                         $result = $conn->exec("select syncid, synctime from " .  
                                                                   "_RSERV_SLAVE_SYNC_ where syncid = " .  
                                                                   "(select max(syncid) from _RSERV_SLAVE_SYNC_)");  
                         if ($result->resultStatus ne PGRES_TUPLES_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                         my @row = $result->fetchrow;  
                         if (! defined $row[0])  
                         {  
                                 $result = $conn->exec("insert into" .  
                                                                           " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");  
                         }  
                         elsif ($row[0] >= $prm)  
                         {  
                                 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(0);  
                         }  
                         else  
                         {  
                                 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .  
                                                                           " set syncid = $syncid, synctime = now()");  
                         }  
                         if ($result->resultStatus ne PGRES_COMMAND_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                 }  
                 elsif ($cmd eq 'OK')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 elsif ($cmd eq 'ERROR')  
                 {  
                         printf STDERR "ERROR signaled\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 else  
                 {  
                         printf STDERR "Unknown command $cmd\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
745          }          }
746            else
         if (! $ok)  
747          {          {
748                  printf STDERR "No OK flag in input\n" unless ($quiet);              $sql .= " where \"$Stables{$tabname}->[1]\" = ".
749                  $conn->exec("ROLLBACK");                  $vals[$Stables{$tabname}->[2]];
                 return(-2);  
750          }          }
751            
752          $result = $conn->exec("COMMIT");          printf "$sql\n" if $debug;
753            
754            $result = $sconn->exec($sql);
755            
756          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
757          {          {
758                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
759                  $conn->exec("ROLLBACK");              return(-1);
                 return(-1);  
760          }          }
761            next if $result->cmdTuples == 1;        # updated
762          return(1);          
763  }          if ($result->cmdTuples > 1)
   
 sub DoDelete  
 {  
         my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);  
   
         my $ok = 0;  
         while(<$inpf>)  
764          {          {
765                  if ($_ !~ /\n$/)              printf STDERR "Duplicate keys\n" unless ($quiet);
766                  {              return(-2);
767                          printf STDERR "Invalid format\n" unless ($quiet);          }
768                          return(-2);  
769                  }          # no key - copy
770                  my $key = $_;          push @CopyBuf, "$istring\n";
771                  $key =~ s/\n//;          $CBufLen += length($istring);
772                  if ($key eq '\.')          
773                  {          if ($CBufLen >= $CBufMax)
                         $ok = 1;  
                         last;  
                 }  
   
                 my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";  
   
                 printf "$sql\n" if $debug;  
   
                 my $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_COMMAND_OK)  
                 {  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
         }  
   
         if (! $ok)  
774          {          {
775                  printf STDERR "No end of input in DELETE section\n" unless ($quiet);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776                  return(-2);              return($result) if $result;
777          }              @CopyBuf = ();
778                $CBufLen = 0;
779            }
780        }
781        
782        if (! $ok)
783        {
784            printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
785            return(-2);
786        }
787        
788        if ($CBufLen)
789        {
790            print "@CopyBuf\n" if $debug;
791            $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
792            return($result) if $result;
793        }
794    
795          return(0);      return(0);
796  }  }
797    
798    sub DoInsert
 sub DoUpdate  
799  {  {
800          my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
         my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;  
   
         my @CopyBuf = ();  
         my $CBufLen = 0;  
         my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy  
   
         my $sql = "select attnum, attname from pg_attribute" .  
                 " where attrelid = $Stables{$tabname}->[0] and attnum > 0";  
   
         my $result = $conn->exec($sql);  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 return(-1);  
         }  
   
         my @anames = ();  
         while (@row = $result->fetchrow)  
         {  
                 $anames[$row[0]] = $row[1];  
         }  
   
         my $istring;  
         my $ok = 0;  
         while(<$inpf>)  
         {  
                 if ($_ !~ /\n$/)  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         return(-2);  
                 }  
                 $istring = $_;  
                 $istring =~ s/\n//;  
                 if ($istring eq '\.')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 my @vals = split(/      /, $istring);  
                 if ($oidkey)  
                 {  
                         if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)  
                         {  
                                 printf STDERR "Invalid OID\n" unless ($quiet);  
                                 return(-2);  
                         }  
                         $oidkey = $vals[0];  
                 }  
                 else  
                 {  
                         unshift @vals, '';  
                 }  
   
                 $sql = "update $tabname set ";  
                 my $ocnt = 0;  
                 for (my $i = 1; $i <= $#anames; $i++)  
                 {  
                         if ($vals[$i] eq '\N')  
                         {  
                                 if ($i == $Stables{$tabname}->[2])  
                                 {  
                                         printf STDERR "NULL key\n" unless ($quiet);  
                                         return(-2);  
                                 }  
                                 $vals[$i] = 'null';  
                         }  
                         else  
                         {  
                                 $vals[$i] = "'" . $vals[$i] . "'";  
                                 next if $i == $Stables{$tabname}->[2];  
                         }  
                         $ocnt++;  
                         $sql .= ', ' if $ocnt > 1;  
                         $sql .= "$anames[$i] = $vals[$i]";  
                 }  
                 if ($oidkey)  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $oidkey";  
                 }  
                 else  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";  
                 }  
801    
802                  printf "$sql\n" if $debug;      # only insert rows into tables that the slave wants
803        if (! defined($Stables{$tabname})) {
804                  $result = $conn->exec($sql);          print "Not configured to insert rows from table $tabname\n" unless $quiet;
805                  if ($result->resultStatus ne PGRES_COMMAND_OK)          while (<$inpf>) {
806                  {              my $istring = $_;
807                          print STDERR $conn->errorMessage unless ($quiet);              $istring =~ s/\n//;
808                          return(-1);              last if ($istring eq '\.');
                 }  
                 next if $result->cmdTuples == 1;        # updated  
   
                 if ($result->cmdTuples > 1)  
                 {  
                         printf STDERR "Duplicate keys\n" unless ($quiet);  
                         return(-2);  
                 }  
   
                 # no key - copy  
                 push @CopyBuf, "$istring\n";  
                 $CBufLen += length($istring);  
   
                 if ($CBufLen >= $CBufMax)  
                 {  
                         $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);  
                         return($result) if $result;  
                         @CopyBuf = ();  
                         $CBufLen = 0;  
                 }  
         }  
   
         if (! $ok)  
         {  
                 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);  
                 return(-2);  
809          }          }
810            return(0);
811        }
812    
813          if ($CBufLen)      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
814        
815        my @CopyBuf = ();
816        my $CBufLen = 0;
817        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
818        
819        my $istring;
820        my $ok = 0;
821        while(<$inpf>)
822        {
823            if ($_ !~ /\n$/)
824            {
825                printf STDERR "Invalid format\n" unless ($quiet);
826                return(-2);
827            }
828            $istring = $_;
829            $istring =~ s/\n//;
830            if ($istring eq '\.')
831            {
832                $ok = 1;
833                last;
834            }
835    
836            # no key - copy
837            push @CopyBuf, "$istring\n";
838            $CBufLen += length($istring);
839            
840            if ($CBufLen >= $CBufMax)
841          {          {
842                  $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
843                  return($result) if $result;              return($result) if $result;
844          }              @CopyBuf = ();
845                $CBufLen = 0;
846          return(0);          }
847        }
848        
849        if (! $ok)
850        {
851            printf STDERR "No end of input in INSERT section\n" unless ($quiet);
852            return(-2);
853        }
854        
855        if ($CBufLen)
856        {
857            print "@CopyBuf\n" if $debug;
858            my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
859            return($result) if $result;
860        }
861        
862        return(0);
863  }  }
864    
865    
866  sub DoCopy  sub DoCopy
867  {  {
868          my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
869        
870          my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
871  "FROM STDIN";          "FROM STDIN";
872          my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
873          if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
874          {      {
875                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
876                  return(-1);          return(-1);
877          }      }
878        
879          foreach $str (@{$CBuf})      foreach my $str (@{$CBuf})
880          {      {
881                  $conn->putline($str);          $sconn->putline($str);
882          }      }
883        
884          $conn->putline("\\.\n");      $sconn->putline("\\.\n");
885        
886          if ($conn->endcopy)      if ($sconn->endcopy)
887          {      {
888                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
889                  return(-1);          return(-1);
890          }      }
891        
892          return(0);      return(0);
   
893  }  }
894    
895    
# Line 679  sub DoCopy Line 898  sub DoCopy
898  #  #
899  sub GetSyncID  sub GetSyncID
900  {  {
901          my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
902                
903          my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
904          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
905          {      {
906                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
907                  return(-1);          return(-1);
908          }      }
909          my @row = $result->fetchrow;      my @row = $result->fetchrow;
910          return(undef) unless defined $row[0];   # null      return(undef) unless defined $row[0];       # null
911          return($row[0]);      return($row[0]);
912  }  }
913    
914  #  #
# Line 697  sub GetSyncID Line 916  sub GetSyncID
916  #  #
917  sub SyncSyncID  sub SyncSyncID
918  {  {
919          my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
920                
921          my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
922          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
923          {      {
924                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
925                  $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
926                  return(-1);          return(-1);
927          }      }
928        
929          $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
930                                                    " where server = $server and syncid = $syncid" .                            " where server = $sserver AND syncid = $syncid" .
931                                                    " for update");                            " for update");
932          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
933          {      {
934                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
935                  $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
936                  return(-1);          return(-1);
937          }      }
938          my @row = $result->fetchrow;      my @row = $result->fetchrow;
939          if (! defined $row[0])      if (! defined $row[0])
940          {      {
941                  printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
942                  $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
943                  return(0);          return(0);
944          }      }
945          if ($row[1] > 0)      if ($row[1] > 0)
946          {      {
947                  printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);          printf STDERR "SyncID $syncid for server ".
948                  $conn->exec("ROLLBACK");              "$sserver already updated\n" unless ($quiet);
949                  return(0);          $mconn->exec("ROLLBACK");
950          }          return(0);
951          $result = $conn->exec("update _RSERV_SYNC_" .      }
952                                                    " set synctime = now(), status = 1" .      $result = $mconn->exec("update _RSERV_SYNC_" .
953                                                    " where server = $server and syncid = $syncid");                            " set synctime = now(), status = 1" .
954          if ($result->resultStatus ne PGRES_COMMAND_OK)                            " where server = $sserver AND syncid = $syncid");
955          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
956                  print STDERR $conn->errorMessage unless ($quiet);      {
957                  $conn->exec("ROLLBACK");          print STDERR $mconn->errorMessage unless ($quiet);
958                  return(-1);          $mconn->exec("ROLLBACK");
959          }          return(-1);
960          $result = $conn->exec("delete from _RSERV_SYNC_" .      }
961                                                    " where server = $server and syncid < $syncid");      $result = $mconn->exec("delete from _RSERV_SYNC_" .
962          if ($result->resultStatus ne PGRES_COMMAND_OK)                            " where server = $sserver AND syncid < $syncid");
963          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
964                  print STDERR $conn->errorMessage unless ($quiet);      {
965                  $conn->exec("ROLLBACK");          print STDERR $mconn->errorMessage unless ($quiet);
966                  return(-1);          $mconn->exec("ROLLBACK");
967          }          return(-1);
968        }
969          $result = $conn->exec("COMMIT");      
970          if ($result->resultStatus ne PGRES_COMMAND_OK)      $result = $mconn->exec("COMMIT");
971          {      if ($result->resultStatus ne PGRES_COMMAND_OK)
972                  print STDERR $conn->errorMessage unless ($quiet);      {
973                  $conn->exec("ROLLBACK");          print STDERR $mconn->errorMessage unless ($quiet);
974                  return(-1);          $mconn->exec("ROLLBACK");
975          }          return(-1);
976        }
977          return(1);      
978        return(1);
979  }  }
980    
981  1;  1;

Legend:
Removed from v.1.1.1.1  
changed lines
  Added in v.1.8

  ViewVC Help
Powered by ViewVC 1.1.26