/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1.1.1 by dpavlin, Wed Dec 20 17:22:35 2000 UTC revision 1.11 by dpavlin, Fri Oct 31 00:07:37 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14    use strict;
15  use Pg;  use Pg;
16    
17  $debug = 0;  my $debug = 0;
18  $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21  my %Mtables = ();  my %Mtables = ();
22  my %Stables = ();  my %Stables = ();
23    
24  sub PrepareSnapshot  sub GetServerId
25  {  {
26          my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
27    
28          my $result = $conn->exec("BEGIN");      print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $result = $conn->exec("set transaction isolation level serializable");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
29    
30          # MAP oid --> tabname, keyname      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
31          $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .                            " host='$Host' AND dbase='$DB'");
                                                   " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .  
                                                   " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                                   " and pga.attnum = rt.key");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
32    
33          my @row;      if ($result->resultStatus ne PGRES_TUPLES_OK)
34          while (@row = $result->fetchrow)      {
35          {          print STDERR $mconn->errorMessage unless ($quiet);
36          #       printf "$row[0], $row[1], $row[2]\n";          return(-1);
37                  push @{$Mtables{$row[0]}}, $row[1], $row[2];      }
38          }      
39        if ($result->cmdTuples && $result->cmdTuples > 1)
40        {
41            printf STDERR "Duplicate host definitions.\n" unless ($quiet);
42            return(-2);
43        }
44    
45          # Read last succeeded sync      my @row = $result->fetchrow;
         $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .  
                 " where server = $server and syncid = (select max(syncid) from" .  
                         " _RSERV_SYNC_ where server = $server and status > 0)";  
46    
47          printf "$sql\n" if $debug;      print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
48    
49          $result = $conn->exec($sql);      return $row[0];
50          if ($result->resultStatus ne PGRES_TUPLES_OK)  }
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
   
         my @lastsync = $result->fetchrow;  
   
         my $sinfo = "";  
         if ($lastsync[3] ne '') # sync info  
         {  
                 $sinfo = "and (l.logid >= $lastsync[3]";  
                 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';  
                 $sinfo .= ")";  
         }  
   
         my $havedeal = 0;  
   
         # DELETED rows  
         $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .  
                 " where l.deleted = 1 $sinfo order by l.reloid";  
51    
52          printf "$sql\n" if $debug;  sub PrepareSnapshot
53    {
54        my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
55    
56          $result = $conn->exec($sql);      if ($mserver == $sserver) {
57            print STDERR "master and slave numbers are same [$mserver] !\n";
58            return(-1);
59        }
60    
61        print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
62    
63        # first, we must know for wich tables the slave subscribed
64        my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
65        if ($result->resultStatus ne PGRES_TUPLES_OK)
66        {
67            print STDERR $mconn->errorMessage unless ($quiet);
68            return(-1);
69        }
70        
71        my @row;
72        while (@row = $result->fetchrow) {
73            $Stables{$row[0]} = 1;
74        }
75        
76        $result = $mconn->exec("BEGIN");
77        if ($result->resultStatus ne PGRES_COMMAND_OK)
78        {
79            print STDERR $mconn->errorMessage unless ($quiet);
80            $mconn->exec("ROLLBACK");
81            return(-1);
82        }
83        $result = $mconn->exec("set transaction isolation level serializable");
84        if ($result->resultStatus ne PGRES_COMMAND_OK)
85        {
86            print STDERR $mconn->errorMessage unless ($quiet);
87            $mconn->exec("ROLLBACK");
88            return(-1);
89        }
90        
91        # MAP oid --> tabname, keyname, key_type
92        $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
93                              " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
94                              ", pg_type pgt".
95                              " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
96                              " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
97        if ($result->resultStatus ne PGRES_TUPLES_OK)
98        {
99            print STDERR $mconn->errorMessage unless ($quiet);
100            $mconn->exec("ROLLBACK");
101            return(-1);
102        }
103        
104        while (@row = $result->fetchrow)
105        {
106            #       printf "$row[0], $row[1], $row[2]\n";
107                    if (ref($onlytables) eq 'HASH') {
108                            next unless (exists $onlytables->{$row[1]});
109                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
110                    }
111            push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
112        }
113        
114        # Read last succeeded sync
115        my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
116            " where server = $sserver AND syncid = (select max(syncid) from" .
117            " _RSERV_SYNC_ where server = $sserver AND status > 0)";
118        
119        printf "$sql\n" if $debug;
120    
121        $result = $mconn->exec($sql);
122        if ($result->resultStatus ne PGRES_TUPLES_OK)
123        {
124            print STDERR $mconn->errorMessage unless ($quiet);
125            $mconn->exec("ROLLBACK");
126            return(-1);
127        }
128        
129        my @lastsync = $result->fetchrow;
130    
131        # exclude data which originated from master server
132        my $sel_server = " and l.server = $mserver ";
133    
134        my $sinfo = "";
135        if (@lastsync && $lastsync[3] ne '')        # sync info
136        {
137            $sinfo = "and (l.logid >= $lastsync[3]";
138            $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
139            $sinfo .= ")";
140        }
141        
142        my $havedeal = 0;
143        
144        # DELETED rows
145        $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
146            " where l.delete = 1 $sinfo $sel_server order by l.reloid";
147        
148        printf "DELETED: $sql\n" if $debug;
149        
150        $result = $mconn->exec($sql);
151        if ($result->resultStatus ne PGRES_TUPLES_OK)
152        {
153            print STDERR $mconn->errorMessage unless ($quiet);
154            $mconn->exec("ROLLBACK");
155            return(-1);
156        }
157        
158        my $lastoid = -1;
159        while (@row = $result->fetchrow)
160        {
161            next unless exists $Mtables{$row[0]};
162            next unless exists $Stables{$Mtables{$row[0]}[0]};
163    
164            if ($lastoid != $row[0])
165            {
166                if ($lastoid == -1)
167                {
168                    my $syncid = GetSYNCID($mconn, $outf);
169                    return($syncid) if $syncid < 0;
170                    $havedeal = 1;
171                }
172                else
173                {
174                    printf $outf "\\.\n";
175                }
176                printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
177                $lastoid = $row[0];
178            }
179            if (! defined $row[1])
180            {
181                print STDERR "NULL key\n" unless ($quiet);
182                $mconn->exec("ROLLBACK");
183                return(-2);
184            }
185            printf $outf "%s\n", OutputValue($row[1]);
186        }
187        printf $outf "\\.\n" if ($lastoid != -1);
188        
189        # UPDATED rows
190        
191        my ($taboid, $tabname, $tabkey);
192        foreach $taboid (keys %Mtables)
193        {
194            my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
195            next unless exists $Stables{$tabname};
196    
197            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
198    
199            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
200              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
201              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
202              $sel_server;
203            
204            printf "UPDATED: $sql\n" if $debug;
205            
206            $result = $mconn->exec($sql);
207          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
208          {          {
209                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
210                  $conn->exec("ROLLBACK");              print STDERR $mconn->errorMessage unless ($quiet);
211                  return(-1);              $mconn->exec("ROLLBACK");
212                return(-1);
213            }
214            next if $result->ntuples <= 0;
215            if (! $havedeal)
216            {
217                my $syncid = GetSYNCID($mconn, $outf);
218                return($syncid) if $syncid < 0;
219                $havedeal = 1;
220          }          }
221            printf $outf "-- UPDATE $tabname\n";
222          $lastoid = '';          printf "-- UPDATE $tabname\n" if $debug;
223          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
224          {          {
225                  next unless exists $Mtables{$row[0]};              for (my $i = 0; $i <= $#row; $i++)
226                  if ($lastoid != $row[0])              {
227                  {                  printf $outf "  " if $i;
228                          if ($lastoid eq '')                  printf "        " if $i && $debug;
229                          {                  printf $outf "%s", OutputValue($row[$i]);
230                                  my $syncid = GetSYNCID($conn, $outf);                  printf "%s", OutputValue($row[$i]) if $debug;;
231                                  return($syncid) if $syncid < 0;              }
232                                  $havedeal = 1;              printf $outf "\n";
233                          }              printf "\n" if $debug;
234                          else          }
235                          {          printf $outf "\\.\n";
236                                  printf $outf "\\.\n";          printf "\\.\n" if $debug;;
237                          }      }
238                          printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";  
239                          $lastoid = $row[0];      # INSERTED rows
240                  }  
241                  if (! defined $row[1])      foreach $taboid (keys %Mtables)
242                  {      {
243                          print STDERR "NULL key\n" unless ($quiet);          my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
244                          $conn->exec("ROLLBACK");          next unless exists $Stables{$tabname};
245                          return(-2);  
246                  }          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
247                  printf $outf "%s\n", OutputValue($row[1]);  
248          }          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
249          printf $outf "\\.\n" if $lastoid ne '';            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
250              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
251          # UPDATED rows            $sel_server;
252            
253          my ($taboid, $tabname, $tabkey);          printf "INSERTED: $sql\n" if $debug;
254          foreach $taboid (keys %Mtables)          
255          {          $result = $mconn->exec($sql);
                 ($tabname, $tabkey) = @{$Mtables{$taboid}};  
                 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';  
                 $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .  
                         " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .  
                                 " and l.key = _$tabname.${tabkey}::text";  
   
                 printf "$sql\n" if $debug;  
   
                 $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_TUPLES_OK)  
                 {  
                         printf $outf "-- ERROR\n" if $havedeal;  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-1);  
                 }  
                 next if $result->ntuples <= 0;  
                 if (! $havedeal)  
                 {  
                         my $syncid = GetSYNCID($conn, $outf);  
                         return($syncid) if $syncid < 0;  
                         $havedeal = 1;  
                 }  
                 printf $outf "-- UPDATE $tabname\n";  
                 while (@row = $result->fetchrow)  
                 {  
                         for ($i = 0; $i <= $#row; $i++)  
                         {  
                                 printf $outf "  " if $i;  
                                 printf $outf "%s", OutputValue($row[$i]);  
                         }  
                         printf $outf "\n";  
                 }  
                 printf $outf "\\.\n";  
         }  
   
         unless ($havedeal)  
         {  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
   
         # Remember this snapshot info  
         $result = $conn->exec("select _rserv_sync_($server)");  
256          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
257          {          {
258                  printf $outf "-- ERROR\n";              printf $outf "-- ERROR\n" if $havedeal;
259                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
260                  $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
261                  return(-1);              return(-1);
262            }
263            next if $result->ntuples <= 0;
264            if (! $havedeal)
265            {
266                my $syncid = GetSYNCID($mconn, $outf);
267                return($syncid) if $syncid < 0;
268                $havedeal = 1;
269          }          }
270            printf $outf "-- INSERT $tabname\n";
271          $result = $conn->exec("COMMIT");          printf "-- INSERT $tabname\n" if $debug;
272          if ($result->resultStatus ne PGRES_COMMAND_OK)          while (@row = $result->fetchrow)
273          {          {
274                  printf $outf "-- ERROR\n";              for (my $i = 0; $i <= $#row; $i++)
275                  print STDERR $conn->errorMessage unless ($quiet);              {
276                  $conn->exec("ROLLBACK");                  printf $outf "  " if $i;
277                  return(-1);                  printf "        " if $i && $debug;
278          }                  printf $outf "%s", OutputValue($row[$i]);
279          printf $outf "-- OK\n";                  printf "%s", OutputValue($row[$i]) if $debug;;
280                }
281          return(1);              printf $outf "\n";
282                printf "\n" if $debug;
283            }
284            printf $outf "\\.\n";
285            printf "\\.\n" if $debug;;
286        }
287        
288        
289        unless ($havedeal)
290        {
291            $mconn->exec("ROLLBACK");
292            return(0);
293        }
294        
295        # Remember this snapshot info
296        $result = $mconn->exec("select _rserv_sync_($sserver)");
297        if ($result->resultStatus ne PGRES_TUPLES_OK)
298        {
299            printf $outf "-- ERROR\n";
300            print STDERR $mconn->errorMessage unless ($quiet);
301            $mconn->exec("ROLLBACK");
302            return(-1);
303        }
304        
305        $result = $mconn->exec("COMMIT");
306        if ($result->resultStatus ne PGRES_COMMAND_OK)
307        {
308            printf $outf "-- ERROR\n";
309            print STDERR $mconn->errorMessage unless ($quiet);
310            $mconn->exec("ROLLBACK");
311            return(-1);
312        }
313        printf $outf "-- OK\n";
314        printf "-- OK\n" if $debug;
315        
316        return(1);
317        
318  }  }
319    
320  sub OutputValue  sub OutputValue
# Line 213  sub OutputValue Line 334  sub OutputValue
334  # Get syncid for new snapshot  # Get syncid for new snapshot
335  sub GetSYNCID  sub GetSYNCID
336  {  {
337          my ($conn, $outf) = @_; # (@_[0], @_[1]);      my ($conn, $outf) = @_; # (@_[0], @_[1]);
338        
339          my $result = $conn->exec("select nextval('_rserv_sync_seq_')");      my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
340          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
341          {      {
342                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
343                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
344                  return(-1);          return(-1);
345          }      }
346        
347          my @row = $result->fetchrow;      my @row = $result->fetchrow;
348        
349          printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
350          return($row[0]);      printf "-- SYNCID $row[0]\n" if $debug;
351        return($row[0]);
352  }  }
353    
354    
355  sub CleanLog  sub CleanLog
356  {  {
357          my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
358        
359          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
360          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
361          {      {
362                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
363                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
364                  return(-1);          return(-1);
365          }      }
366        
367        my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
368            " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
369            " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
370        
371        printf "$sql\n" if $debug;
372        
373        $result = $conn->exec($sql);
374        if ($result->resultStatus ne PGRES_TUPLES_OK)
375        {
376            print STDERR $conn->errorMessage unless ($quiet);
377            return(-1);
378        }
379        my $maxid = '';
380        my %active = ();
381        while (my @row = $result->fetchrow)
382        {
383            $maxid = $row[0] if $maxid eq '';
384            last if $row[0] > $maxid;
385            my @ids = split(/[      ]+,[    ]+/, $row[1]);
386            foreach my $aid (@ids)
387            {
388                $active{$aid} = 1 unless exists $active{$aid};
389            }
390        }
391        if ($maxid eq '')
392        {
393            print STDERR "No Sync IDs\n" unless ($quiet);
394            return(0);
395        }
396        my $alist = join(',', keys %active);
397        my $sinfo = "logid < $maxid";
398        $sinfo .= " AND logid not in ($alist)" if $alist ne '';
399        #if (ref($onlytables) eq 'HASH') {
400        #   foreach my $onlytable (keys %{$onlytables}) {
401        #           $sinfo
402        #   }
403        #}
404        $sql = "delete from _RSERV_LOG_ where " .
405            "logtime < now() - '$howold second'::interval AND $sinfo";
406        
407        printf "$sql\n" if $debug;
408        
409        $result = $conn->exec($sql);
410        if ($result->resultStatus ne PGRES_COMMAND_OK)
411        {
412            print STDERR $conn->errorMessage unless ($quiet);
413            $conn->exec("ROLLBACK");
414            return(-1);
415        }
416        $maxid = $result->cmdTuples;
417        
418        $result = $conn->exec("COMMIT");
419        if ($result->resultStatus ne PGRES_COMMAND_OK)
420        {
421            print STDERR $conn->errorMessage unless ($quiet);
422            $conn->exec("ROLLBACK");
423            return(-1);
424        }
425        
426        return($maxid);
427    }
428    
429          my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .  sub ApplySnapshot
430                  " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .  {
431                          " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";      my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
432        
433        my $result = $sconn->exec("BEGIN");
434        if ($result->resultStatus ne PGRES_COMMAND_OK)
435        {
436            print STDERR $sconn->errorMessage unless ($quiet);
437            $sconn->exec("ROLLBACK");
438            return(-1);
439        }
440        
441        $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
442        if ($result->resultStatus ne PGRES_COMMAND_OK)
443        {
444            print STDERR $sconn->errorMessage unless ($quiet);
445            $sconn->exec("ROLLBACK");
446            return(-1);
447        }
448        
449        # MAP name --> oid, keyname, keynum
450        my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
451            " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
452            " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
453            " AND pga.attnum = rt.key";
454        
455        $result = $sconn->exec($sql);
456        if ($result->resultStatus ne PGRES_TUPLES_OK)
457        {
458            print STDERR $sconn->errorMessage unless ($quiet);
459            $sconn->exec("ROLLBACK");
460            return(-1);
461        }
462        %Stables = ();
463        while (my @row = $result->fetchrow)
464        {
465            #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
466                    if (ref($onlytables) eq 'HASH') {
467                            next unless (exists $onlytables->{$row[1]});
468                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
469                    }
470            push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
471        }
472    
473        my $ok = 0;
474        my $syncid = -1;
475        while(<$inpf>)
476        {
477            $_ =~ s/\n//;
478            my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
479            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
480            if ($cmt ne '--')
481            {
482                printf STDERR "Invalid format\n" unless ($quiet);
483                $sconn->exec("ROLLBACK");
484                return(-2);
485            }
486            if ($cmd eq 'DELETE')
487            {
488                if ($syncid == -1)
489                {
490                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
491                    $sconn->exec("ROLLBACK");
492                    return(-2);
493                }
494                $result = DoDelete($sconn, $inpf, $prm);
495                if ($result)
496                {
497                    $sconn->exec("ROLLBACK");
498                    return($result);
499                }
500            }
501            elsif ($cmd eq 'INSERT')
502            {
503                if ($syncid == -1)
504                {
505                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
506                    $sconn->exec("ROLLBACK");
507                    return(-2);
508                }
509                $result = DoInsert($sconn, $inpf, $prm);
510                if ($result)
511                {
512                    $sconn->exec("ROLLBACK");
513                    return($result);
514                }
515            }
516            elsif ($cmd eq 'UPDATE')
517            {
518                if ($syncid == -1)
519                {
520                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
521                    $sconn->exec("ROLLBACK");
522                    return(-2);
523                }
524                $result = DoUpdate($sconn, $inpf, $prm);
525                if ($result)
526                {
527                    $sconn->exec("ROLLBACK");
528                    return($result);
529                }
530            }
531            elsif ($cmd eq 'SYNCID')
532            {
533                if ($syncid != -1)
534                {
535                    printf STDERR "Second Sync ID ?!\n" unless ($quiet);
536                    $sconn->exec("ROLLBACK");
537                    return(-2);
538                }
539                if ($prm !~ /^\d+$/)
540                {
541                    printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
542                    $sconn->exec("ROLLBACK");
543                    return(-2);
544                }
545                $syncid = $prm;
546                
547                printf STDERR "Sync ID $syncid\n" unless ($quiet);
548                
549                $result = $sconn->exec("select syncid, synctime from " .
550                                      "_RSERV_SLAVE_SYNC_ where syncid = " .
551                                      "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
552                if ($result->resultStatus ne PGRES_TUPLES_OK)
553                {
554                    print STDERR $sconn->errorMessage unless ($quiet);
555                    $sconn->exec("ROLLBACK");
556                    return(-1);
557                }
558                my @row = $result->fetchrow;
559                if (! defined $row[0])
560                {
561                    $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
562                                          "(syncid, synctime) values ($syncid, now())");
563                }
564                elsif ($row[0] >= $prm)
565                {
566                    printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
567                    $sconn->exec("ROLLBACK");
568                    return(0);
569                }
570                else
571                {
572                    $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
573                                          " set syncid = $syncid, synctime = now()");
574                }
575                if ($result->resultStatus ne PGRES_COMMAND_OK)
576                {
577                    print STDERR $sconn->errorMessage unless ($quiet);
578                    $sconn->exec("ROLLBACK");
579                    return(-1);
580                }
581            }
582            elsif ($cmd eq 'OK')
583            {
584                $ok = 1;
585                last;
586            }
587            elsif ($cmd eq 'ERROR')
588            {
589                printf STDERR "ERROR signaled\n" unless ($quiet);
590                $sconn->exec("ROLLBACK");
591                return(-2);
592            }
593            else
594            {
595                printf STDERR "Unknown command $cmd\n" unless ($quiet);
596                $sconn->exec("ROLLBACK");
597                return(-2);
598            }
599        }
600        
601        if (! $ok)
602        {
603            printf STDERR "No OK flag in input\n" unless ($quiet);
604            $sconn->exec("ROLLBACK");
605            return(-2);
606        }
607        
608        $result = $sconn->exec("COMMIT");
609        if ($result->resultStatus ne PGRES_COMMAND_OK)
610        {
611            print STDERR $sconn->errorMessage unless ($quiet);
612            $sconn->exec("ROLLBACK");
613            return(-1);
614        }
615        
616        return(1);
617    }
618    
619          printf "$sql\n" if $debug;  sub DoDelete
620    {
621        my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
622    
623          $result = $conn->exec($sql);      # only delete tables that the slave wants
624          if ($result->resultStatus ne PGRES_TUPLES_OK)      if (! defined($Stables{$tabname})) {
625          {          print "Not configured to delete rows from table $tabname\n" unless $quiet;
626                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
627                  return(-1);              my $istring = $_;
628                $istring =~ s/\n//;
629                last if ($istring eq '\.');
630          }          }
631          my $maxid = '';          return(0);
632          my %active = ();      }
633          while (my @row = $result->fetchrow)  
634          {      my $ok = 0;
635                  $maxid = $row[0] if $maxid eq '';      while(<$inpf>)
636                  last if $row[0] > $maxid;      {
637                  my @ids = split(/[      ]+,[    ]+/, $row[1]);          if ($_ !~ /\n$/)
638                  foreach $aid (@ids)          {
639                  {              printf STDERR "Invalid format\n" unless ($quiet);
640                          $active{$aid} = 1 unless exists $active{$aid};              return(-2);
641                  }          }
642          }          my $key = $_;
643          if ($maxid eq '')          $key =~ s/\n//;
644            if ($key eq '\.')
645          {          {
646                  print STDERR "No Sync IDs\n" unless ($quiet);              $ok = 1;
647                  return(0);              last;
648          }          }
         my $alist = join(',', keys %active);  
         my $sinfo = "logid < $maxid";  
         $sinfo .= " and logid not in ($alist)" if $alist ne '';  
649                    
650          $sql = "delete from _RSERV_LOG_ where " .          my $sql = "delete from \"$tabname\" where ".
651                  "logtime < now() - '$howold second'::interval and $sinfo";              "\"$Stables{$tabname}->[1]\" = '$key'";
652            
653          printf "$sql\n" if $debug;          printf "$sql\n" if $debug;
654            
655          $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $maxid = $result->cmdTuples;  
   
         $result = $conn->exec("COMMIT");  
656          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
657          {          {
658                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
659                  $conn->exec("ROLLBACK");              return(-1);
                 return(-1);  
660          }          }
661        }
662          return($maxid);      
663        if (! $ok)
664        {
665            printf STDERR "No end of input in DELETE section\n" unless ($quiet);
666            return(-2);
667        }
668        
669        return(0);
670  }  }
671    
672  sub ApplySnapshot  
673    sub DoUpdate
674  {  {
675          my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
676    
677          my $result = $conn->exec("BEGIN");      # only update the tables that the slave wants
678          if ($result->resultStatus ne PGRES_COMMAND_OK)      if (! defined($Stables{$tabname})) {
679          {          print "Not configured to update rows from table $tabname\n" unless $quiet;
680                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
681                  $conn->exec("ROLLBACK");              my $istring = $_;
682                  return(-1);              $istring =~ s/\n//;
683                last if ($istring eq '\.');
684          }          }
685            return(0);
686        }
687    
688          $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
689          if ($result->resultStatus ne PGRES_COMMAND_OK)      
690          {      my @CopyBuf = ();
691                  print STDERR $conn->errorMessage unless ($quiet);      my $CBufLen = 0;
692                  $conn->exec("ROLLBACK");      my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
693                  return(-1);  
694        my $sql = "select attnum, attname from pg_attribute" .
695            " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
696        
697        my $result = $sconn->exec($sql);
698        if ($result->resultStatus ne PGRES_TUPLES_OK)
699        {
700            print STDERR $sconn->errorMessage unless ($quiet);
701            return(-1);
702        }
703        
704        my @anames = ();
705        while (my @row = $result->fetchrow)
706        {
707            $anames[$row[0]] = $row[1];
708        }
709        
710        my $istring;
711        my $ok = 0;
712        while(<$inpf>)
713        {
714            if ($_ !~ /\n$/)
715            {
716                printf STDERR "Invalid format\n" unless ($quiet);
717                return(-2);
718            }
719            $istring = $_;
720            $istring =~ s/\n//;
721            if ($istring eq '\.')
722            {
723                $ok = 1;
724                last;
725            }
726            my @vals = split(/      /, $istring);
727            if ($oidkey)
728            {
729                if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
730                {
731                    printf STDERR "Invalid OID\n" unless ($quiet);
732                    return(-2);
733                }
734                $oidkey = $vals[0];
735          }          }
736            else
         # MAP name --> oid, keyname, keynum  
         my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .  
                 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .  
                         " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                 " and pga.attnum = rt.key";  
         $result = $conn->exec($sql);  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
737          {          {
738                  print STDERR $conn->errorMessage unless ($quiet);              unshift @vals, '';
                 $conn->exec("ROLLBACK");  
                 return(-1);  
739          }          }
740            
741          while (@row = $result->fetchrow)          $sql = "update \"$tabname\" set ";
742          {          my $ocnt = 0;
743          #       printf "        %s      %s\n", $row[1], $row[0];          for (my $i = 1; $i <= $#anames; $i++)
744                  push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];          {
745                if ($vals[$i] eq '\N')
746                {
747                    if ($i == $Stables{$tabname}->[2])
748                    {
749                        printf STDERR "NULL key\n" unless ($quiet);
750                        return(-2);
751                    }
752                    $vals[$i] = 'null';
753                }
754                else
755                {
756                    $vals[$i] = "'" . $vals[$i] . "'";
757                    next if $i == $Stables{$tabname}->[2];
758                }
759                $ocnt++;
760                $sql .= ', ' if $ocnt > 1;
761                $sql .= "\"$anames[$i]\" = $vals[$i]";
762          }          }
763            if ($oidkey)
         my $ok = 0;  
         my $syncid = '';  
         while(<$inpf>)  
764          {          {
765                  $_ =~ s/\n//;              $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
                 my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);  
                 if ($cmt ne '--')  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 if ($cmd eq 'DELETE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoDelete($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'UPDATE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoUpdate($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'SYNCID')  
                 {  
                         if ($syncid ne '')  
                         {  
                                 printf STDERR "Second Sync ID ?!\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         if ($prm !~ /^\d+$/)  
                         {  
                                 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $syncid = $prm;  
   
                         printf STDERR "Sync ID $syncid\n" unless ($quiet);  
   
                         $result = $conn->exec("select syncid, synctime from " .  
                                                                   "_RSERV_SLAVE_SYNC_ where syncid = " .  
                                                                   "(select max(syncid) from _RSERV_SLAVE_SYNC_)");  
                         if ($result->resultStatus ne PGRES_TUPLES_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                         my @row = $result->fetchrow;  
                         if (! defined $row[0])  
                         {  
                                 $result = $conn->exec("insert into" .  
                                                                           " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");  
                         }  
                         elsif ($row[0] >= $prm)  
                         {  
                                 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(0);  
                         }  
                         else  
                         {  
                                 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .  
                                                                           " set syncid = $syncid, synctime = now()");  
                         }  
                         if ($result->resultStatus ne PGRES_COMMAND_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                 }  
                 elsif ($cmd eq 'OK')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 elsif ($cmd eq 'ERROR')  
                 {  
                         printf STDERR "ERROR signaled\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 else  
                 {  
                         printf STDERR "Unknown command $cmd\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
766          }          }
767            else
         if (! $ok)  
768          {          {
769                  printf STDERR "No OK flag in input\n" unless ($quiet);              $sql .= " where \"$Stables{$tabname}->[1]\" = ".
770                  $conn->exec("ROLLBACK");                  $vals[$Stables{$tabname}->[2]];
                 return(-2);  
771          }          }
772            
773          $result = $conn->exec("COMMIT");          printf "$sql\n" if $debug;
774            
775            $result = $sconn->exec($sql);
776            
777          if ($result->resultStatus ne PGRES_COMMAND_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
778          {          {
779                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
780                  $conn->exec("ROLLBACK");              return(-1);
                 return(-1);  
781          }          }
782            next if $result->cmdTuples == 1;        # updated
783          return(1);          
784  }          if ($result->cmdTuples > 1)
   
 sub DoDelete  
 {  
         my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);  
   
         my $ok = 0;  
         while(<$inpf>)  
785          {          {
786                  if ($_ !~ /\n$/)              printf STDERR "Duplicate keys\n" unless ($quiet);
787                  {              return(-2);
788                          printf STDERR "Invalid format\n" unless ($quiet);          }
789                          return(-2);  
790                  }          # no key - copy
791                  my $key = $_;          push @CopyBuf, "$istring\n";
792                  $key =~ s/\n//;          $CBufLen += length($istring);
793                  if ($key eq '\.')          
794                  {          if ($CBufLen >= $CBufMax)
                         $ok = 1;  
                         last;  
                 }  
   
                 my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";  
   
                 printf "$sql\n" if $debug;  
   
                 my $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_COMMAND_OK)  
                 {  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
         }  
   
         if (! $ok)  
795          {          {
796                  printf STDERR "No end of input in DELETE section\n" unless ($quiet);              $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
797                  return(-2);              return($result) if $result;
798          }              @CopyBuf = ();
799                $CBufLen = 0;
800            }
801        }
802        
803        if (! $ok)
804        {
805            printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
806            return(-2);
807        }
808        
809        if ($CBufLen)
810        {
811            print "@CopyBuf\n" if $debug;
812            $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
813            return($result) if $result;
814        }
815    
816          return(0);      return(0);
817  }  }
818    
819    sub DoInsert
 sub DoUpdate  
820  {  {
821          my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
         my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;  
822    
823          my @CopyBuf = ();      # only insert rows into tables that the slave wants
824          my $CBufLen = 0;      if (! defined($Stables{$tabname})) {
825          my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy          print "Not configured to insert rows from table $tabname\n" unless $quiet;
826            while (<$inpf>) {
827          my $sql = "select attnum, attname from pg_attribute" .              my $istring = $_;
828                  " where attrelid = $Stables{$tabname}->[0] and attnum > 0";              $istring =~ s/\n//;
829                last if ($istring eq '\.');
         my $result = $conn->exec($sql);  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 return(-1);  
         }  
   
         my @anames = ();  
         while (@row = $result->fetchrow)  
         {  
                 $anames[$row[0]] = $row[1];  
         }  
   
         my $istring;  
         my $ok = 0;  
         while(<$inpf>)  
         {  
                 if ($_ !~ /\n$/)  
                 {  
                         printf STDERR "Invalid format\n" unless ($quiet);  
                         return(-2);  
                 }  
                 $istring = $_;  
                 $istring =~ s/\n//;  
                 if ($istring eq '\.')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 my @vals = split(/      /, $istring);  
                 if ($oidkey)  
                 {  
                         if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)  
                         {  
                                 printf STDERR "Invalid OID\n" unless ($quiet);  
                                 return(-2);  
                         }  
                         $oidkey = $vals[0];  
                 }  
                 else  
                 {  
                         unshift @vals, '';  
                 }  
   
                 $sql = "update $tabname set ";  
                 my $ocnt = 0;  
                 for (my $i = 1; $i <= $#anames; $i++)  
                 {  
                         if ($vals[$i] eq '\N')  
                         {  
                                 if ($i == $Stables{$tabname}->[2])  
                                 {  
                                         printf STDERR "NULL key\n" unless ($quiet);  
                                         return(-2);  
                                 }  
                                 $vals[$i] = 'null';  
                         }  
                         else  
                         {  
                                 $vals[$i] = "'" . $vals[$i] . "'";  
                                 next if $i == $Stables{$tabname}->[2];  
                         }  
                         $ocnt++;  
                         $sql .= ', ' if $ocnt > 1;  
                         $sql .= "$anames[$i] = $vals[$i]";  
                 }  
                 if ($oidkey)  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $oidkey";  
                 }  
                 else  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";  
                 }  
   
                 printf "$sql\n" if $debug;  
   
                 $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_COMMAND_OK)  
                 {  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
                 next if $result->cmdTuples == 1;        # updated  
   
                 if ($result->cmdTuples > 1)  
                 {  
                         printf STDERR "Duplicate keys\n" unless ($quiet);  
                         return(-2);  
                 }  
   
                 # no key - copy  
                 push @CopyBuf, "$istring\n";  
                 $CBufLen += length($istring);  
   
                 if ($CBufLen >= $CBufMax)  
                 {  
                         $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);  
                         return($result) if $result;  
                         @CopyBuf = ();  
                         $CBufLen = 0;  
                 }  
         }  
   
         if (! $ok)  
         {  
                 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);  
                 return(-2);  
830          }          }
831            return(0);
832        }
833    
834          if ($CBufLen)      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
835        
836        my @CopyBuf = ();
837        my $CBufLen = 0;
838        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
839        
840        my $istring;
841        my $ok = 0;
842        while(<$inpf>)
843        {
844            if ($_ !~ /\n$/)
845            {
846                printf STDERR "Invalid format\n" unless ($quiet);
847                return(-2);
848            }
849            $istring = $_;
850            $istring =~ s/\n//;
851            if ($istring eq '\.')
852            {
853                $ok = 1;
854                last;
855            }
856    
857            # no key - copy
858            push @CopyBuf, "$istring\n";
859            $CBufLen += length($istring);
860            
861            if ($CBufLen >= $CBufMax)
862          {          {
863                  $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);              my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
864                  return($result) if $result;              return($result) if $result;
865          }              @CopyBuf = ();
866                $CBufLen = 0;
867          return(0);          }
868        }
869        
870        if (! $ok)
871        {
872            printf STDERR "No end of input in INSERT section\n" unless ($quiet);
873            return(-2);
874        }
875        
876        if ($CBufLen)
877        {
878            print "@CopyBuf\n" if $debug;
879            my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
880            return($result) if $result;
881        }
882        
883        return(0);
884  }  }
885    
886    
887  sub DoCopy  sub DoCopy
888  {  {
889          my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
890        
891          my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .      my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
892  "FROM STDIN";          "FROM STDIN";
893          my $result = $conn->exec($sql);      my $result = $sconn->exec($sql);
894          if ($result->resultStatus ne PGRES_COPY_IN)      if ($result->resultStatus ne PGRES_COPY_IN)
895          {      {
896                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
897                  return(-1);          return(-1);
898          }      }
899        
900          foreach $str (@{$CBuf})      foreach my $str (@{$CBuf})
901          {      {
902                  $conn->putline($str);          $sconn->putline($str);
903          }      }
904        
905          $conn->putline("\\.\n");      $sconn->putline("\\.\n");
906        
907          if ($conn->endcopy)      if ($sconn->endcopy)
908          {      {
909                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
910                  return(-1);          return(-1);
911          }      }
912        
913          return(0);      return(0);
   
914  }  }
915    
916    
# Line 679  sub DoCopy Line 919  sub DoCopy
919  #  #
920  sub GetSyncID  sub GetSyncID
921  {  {
922          my ($conn) = @_; # (@_[0]);      my ($sconn) = @_; # (@_[0]);
923                
924          my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");      my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
925          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
926          {      {
927                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $sconn->errorMessage unless ($quiet);
928                  return(-1);          return(-1);
929          }      }
930          my @row = $result->fetchrow;      my @row = $result->fetchrow;
931          return(undef) unless defined $row[0];   # null      return(undef) unless defined $row[0];       # null
932          return($row[0]);      return($row[0]);
933  }  }
934    
935  #  #
# Line 697  sub GetSyncID Line 937  sub GetSyncID
937  #  #
938  sub SyncSyncID  sub SyncSyncID
939  {  {
940          my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
941                
942          my $result = $conn->exec("BEGIN");      my $result = $mconn->exec("BEGIN");
943          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
944          {      {
945                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $mconn->errorMessage unless ($quiet);
946                  $conn->exec("ROLLBACK");          $mconn->exec("ROLLBACK");
947                  return(-1);          return(-1);
948          }      }
949        
950        $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
951                              " where server = $sserver AND syncid = $syncid" .
952                              " for update");
953        if ($result->resultStatus ne PGRES_TUPLES_OK)
954        {
955            print STDERR $mconn->errorMessage unless ($quiet);
956            $mconn->exec("ROLLBACK");
957            return(-1);
958        }
959        my @row = $result->fetchrow;
960        if (! defined $row[0])
961        {
962            printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
963            $mconn->exec("ROLLBACK");
964            return(0);
965        }
966        if ($row[1] > 0)
967        {
968            printf STDERR "SyncID $syncid for server ".
969                "$sserver already updated\n" unless ($quiet);
970            $mconn->exec("ROLLBACK");
971            return(0);
972        }
973        $result = $mconn->exec("update _RSERV_SYNC_" .
974                              " set synctime = now(), status = 1" .
975                              " where server = $sserver AND syncid = $syncid");
976        if ($result->resultStatus ne PGRES_COMMAND_OK)
977        {
978            print STDERR $mconn->errorMessage unless ($quiet);
979            $mconn->exec("ROLLBACK");
980            return(-1);
981        }
982        $result = $mconn->exec("delete from _RSERV_SYNC_" .
983                              " where server = $sserver AND syncid < $syncid");
984        if ($result->resultStatus ne PGRES_COMMAND_OK)
985        {
986            print STDERR $mconn->errorMessage unless ($quiet);
987            $mconn->exec("ROLLBACK");
988            return(-1);
989        }
990        
991        $result = $mconn->exec("COMMIT");
992        if ($result->resultStatus ne PGRES_COMMAND_OK)
993        {
994            print STDERR $mconn->errorMessage unless ($quiet);
995            $mconn->exec("ROLLBACK");
996            return(-1);
997        }
998        
999        return(1);
1000    }
1001    
1002          $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .  # stuff moved from perl scripts for better re-use
                                                   " where server = $server and syncid = $syncid" .  
                                                   " for update");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         my @row = $result->fetchrow;  
         if (! defined $row[0])  
         {  
                 printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
         if ($row[1] > 0)  
         {  
                 printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
         $result = $conn->exec("update _RSERV_SYNC_" .  
                                                   " set synctime = now(), status = 1" .  
                                                   " where server = $server and syncid = $syncid");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $result = $conn->exec("delete from _RSERV_SYNC_" .  
                                                   " where server = $server and syncid < $syncid");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
1003    
1004          $result = $conn->exec("COMMIT");  sub Rollback {
1005          if ($result->resultStatus ne PGRES_COMMAND_OK)      my $conn = shift @_;
1006          {  
1007                  print STDERR $conn->errorMessage unless ($quiet);      print STDERR $conn->errorMessage;
1008                  $conn->exec("ROLLBACK");      $conn->exec("ROLLBACK");
1009                  return(-1);  }
1010    
1011    sub RollbackAndQuit {
1012        my $conn = shift @_;
1013    
1014        Rollback($conn);
1015        exit (-1);
1016    }
1017    
1018    sub Connect {
1019            my $info = shift @_;
1020    
1021            print("Connecting to $info\n") if ($debug || $verbose);
1022            my $conn = Pg::connectdb($info);
1023            if ($conn->status != PGRES_CONNECTION_OK) {
1024                print STDERR "Failed opening $info\n";
1025                exit 1;
1026          }          }
1027            return $conn;
1028    }
1029    
1030    sub Exec {
1031            my $conn = shift @_;
1032            my $sql = shift @_;
1033    
1034            my $result = $conn->exec($sql);
1035            print STDERR "$sql\n" if ($debug);
1036            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1037    }
1038    
1039          return(1);  sub Exec2 {
1040            my $mconn = shift @_;
1041            my $sconn = shift @_;
1042            my $sql = shift @_;
1043    
1044            my $result = $mconn->exec($sql);
1045            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1046            $result = $sconn->exec($sql);
1047            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1048  }  }
1049    
1050  1;  1;

Legend:
Removed from v.1.1.1.1  
changed lines
  Added in v.1.11

  ViewVC Help
Powered by ViewVC 1.1.26