/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1.1.1 by dpavlin, Wed Dec 20 17:22:35 2000 UTC revision 1.18 by dpavlin, Mon Nov 3 21:30:39 2003 UTC
# Line 6  package RServ; Line 6  package RServ;
6    
7  require Exporter;  require Exporter;
8  @ISA = qw(Exporter);  @ISA = qw(Exporter);
9  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);  @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10            Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11            $debug $quiet $verbose
12            );
13  @EXPORT_OK = qw();  @EXPORT_OK = qw();
14    use strict;
15  use Pg;  use Pg;
16    
17  $debug = 0;  my $debug = 0;
18  $quiet = 1;  my $quiet = 1;
19    my $verbose = 0;
20    
21    $debug = 1;
22    $quiet = 0;
23    $verbose = 1;
24    
25  my %Mtables = ();  my %Mtables = ();
26  my %Stables = ();  my %Stables = ();
27    
28  sub PrepareSnapshot  sub GetServerId
29  {  {
30          my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);      my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32          my $result = $conn->exec("BEGIN");      print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $result = $conn->exec("set transaction isolation level serializable");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
33    
34          # MAP oid --> tabname, keyname      my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35          $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .                            " host='$Host' AND dbase='$DB'");
                                                   " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .  
                                                   " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .  
                                                   " and pga.attnum = rt.key");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
36    
37          my @row;      if ($result->resultStatus ne PGRES_TUPLES_OK)
38          while (@row = $result->fetchrow)      {
39          {          print STDERR $mconn->errorMessage unless ($quiet);
40          #       printf "$row[0], $row[1], $row[2]\n";          return(-1);
41                  push @{$Mtables{$row[0]}}, $row[1], $row[2];      }
42          }      
43        if ($result->cmdTuples && $result->cmdTuples > 1)
44        {
45            printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46            return(-2);
47        }
48    
49          # Read last succeeded sync      my @row = $result->fetchrow;
         $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .  
                 " where server = $server and syncid = (select max(syncid) from" .  
                         " _RSERV_SYNC_ where server = $server and status > 0)";  
50    
51          printf "$sql\n" if $debug;      print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52    
53          $result = $conn->exec($sql);      return $row[0];
54          if ($result->resultStatus ne PGRES_TUPLES_OK)  }
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
   
         my @lastsync = $result->fetchrow;  
   
         my $sinfo = "";  
         if ($lastsync[3] ne '') # sync info  
         {  
                 $sinfo = "and (l.logid >= $lastsync[3]";  
                 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';  
                 $sinfo .= ")";  
         }  
   
         my $havedeal = 0;  
   
         # DELETED rows  
         $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .  
                 " where l.deleted = 1 $sinfo order by l.reloid";  
55    
56          printf "$sql\n" if $debug;  sub PrepareSnapshot
57    {
58        my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59    
60          $result = $conn->exec($sql);      if ($mserver == $sserver) {
61            print STDERR "master and slave numbers are same [$mserver] !\n";
62            return(-1);
63        }
64    
65        print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66    
67        # dump master server ID into snapshot file (to prevent replication
68        # of colums from master back to slave)
69        print $outf "-- SERVER $mserver\n";
70    
71        # first, we must know for wich tables the slave subscribed
72        my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73        return (-1) if ($result == -1);
74    
75        my @row;
76        while (@row = $result->fetchrow) {
77            $Stables{$row[0]} = 1;
78        }
79        
80        print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81    
82        Exec($mconn,"BEGIN");
83        Exec($mconn,"set transaction isolation level serializable");
84        
85        # MAP oid --> tabname, keyname, key_type
86        my $sql = qq{
87            select pgc.oid, pgc.relname, pga.attname, pgt.typname
88                    from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89                            pg_type pgt
90                    where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91                            AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92        };
93        $result = Exec($mconn,$sql);
94    
95        while (@row = $result->fetchrow)
96        {
97                    printf "$row[0], $row[1], $row[2]\n" if ($debug);
98                    if (ref($onlytables) eq 'HASH') {
99                            next unless (exists $onlytables->{$row[1]});
100                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101                    }
102            push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103        }
104    
105        print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106        if (! %Mtables) {
107            print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108            RollbackAndQuit($mconn);
109        }
110    
111        # Read last succeeded sync
112        $sql = qq{
113            select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114            where server = $sserver AND syncid =
115                    (select max(syncid) from _RSERV_SYNC_
116                            where server = $sserver AND status > 0)
117        };
118        
119        $result = Exec($mconn,$sql);
120    
121        my @lastsync = $result->fetchrow;
122        print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123    
124        # exclude data which originated from master server
125        my $sel_server = " and l.server = $mserver ";
126    
127        my $sinfo = "";
128        if (@lastsync && $lastsync[3] ne '')        # sync info
129        {
130            $sinfo = "and (l.logid >= $lastsync[3]";
131            $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132            $sinfo .= ")";
133        }
134    
135        my $havedeal = 0;
136        
137        # DELETED rows
138        $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
139            " where l.delete = 1 $sinfo $sel_server order by l.reloid";
140        
141        printf "DELETED: $sql\n" if $debug;
142        
143        $result = $mconn->exec($sql);
144        if ($result->resultStatus ne PGRES_TUPLES_OK) {
145            print STDERR $mconn->errorMessage unless ($quiet);
146            $mconn->exec("ROLLBACK");
147            return(-1);
148        }
149        
150        my $lastoid = -1;
151        while (@row = $result->fetchrow) {
152            next unless exists $Mtables{$row[0]};
153            next unless exists $Stables{$Mtables{$row[0]}[0]};
154    
155            if ($lastoid != $row[0]) {
156                if ($lastoid == -1) {
157                    my $syncid = GetSYNCID($mconn, $outf);
158                    return($syncid) if $syncid < 0;
159                    $havedeal = 1;
160                } else {
161                    printf $outf "\\.\n";
162                }
163                printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
164                $lastoid = $row[0];
165            }
166            if (! defined $row[1]) {
167                print STDERR "NULL key\n" unless ($quiet);
168                $mconn->exec("ROLLBACK");
169                return(-2);
170            }
171            printf $outf "%s\n", OutputValue($row[1]);
172        }
173        printf $outf "\\.\n" if ($lastoid != -1);
174        
175        # UPDATED rows
176        
177        my ($taboid, $tabname, $tabkey);
178        foreach $taboid (keys %Mtables)
179        {
180            my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
181            next unless exists $Stables{$tabname};
182    
183            my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
184    
185            $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
186              "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
187              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
188              $sel_server;
189            
190            printf "UPDATED: $sql\n" if $debug;
191            
192            $result = $mconn->exec($sql);
193          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
194          {          {
195                  print STDERR $conn->errorMessage unless ($quiet);              printf $outf "-- ERROR\n" if $havedeal;
196                  $conn->exec("ROLLBACK");              print STDERR $mconn->errorMessage unless ($quiet);
197                  return(-1);              $mconn->exec("ROLLBACK");
198                return(-1);
199            }
200            next if $result->ntuples <= 0;
201            if (! $havedeal)
202            {
203                my $syncid = GetSYNCID($mconn, $outf);
204                return($syncid) if $syncid < 0;
205                $havedeal = 1;
206          }          }
207            printf $outf "-- UPDATE $tabname\n";
208          $lastoid = '';          printf "-- UPDATE $tabname\n" if $debug;
209          while (@row = $result->fetchrow)          while (@row = $result->fetchrow)
210          {          {
211                  next unless exists $Mtables{$row[0]};              for (my $i = 0; $i <= $#row; $i++)
212                  if ($lastoid != $row[0])              {
213                  {                  printf $outf "  " if $i;
214                          if ($lastoid eq '')                  printf "        " if $i && $debug;
215                          {                  printf $outf "%s", OutputValue($row[$i]);
216                                  my $syncid = GetSYNCID($conn, $outf);                  printf "%s", OutputValue($row[$i]) if $debug;;
217                                  return($syncid) if $syncid < 0;              }
218                                  $havedeal = 1;              printf $outf "\n";
219                          }              printf "\n" if $debug;
220                          else          }
221                          {          printf $outf "\\.\n";
222                                  printf $outf "\\.\n";          printf "\\.\n" if $debug;;
223                          }      }
224                          printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";  
225                          $lastoid = $row[0];      # INSERTED rows
226                  }  
227                  if (! defined $row[1])      foreach $taboid (keys %Mtables)
228                  {      {
229                          print STDERR "NULL key\n" unless ($quiet);          my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
230                          $conn->exec("ROLLBACK");          next unless exists $Stables{$tabname};
231                          return(-2);  
232                  }          my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
233                  printf $outf "%s\n", OutputValue($row[1]);  
234          }          $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
235          printf $outf "\\.\n" if $lastoid ne '';            "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
236              " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
237          # UPDATED rows            $sel_server;
238            
239          my ($taboid, $tabname, $tabkey);          printf "INSERTED: $sql\n" if $debug;
240          foreach $taboid (keys %Mtables)          
241          {          $result = $mconn->exec($sql);
                 ($tabname, $tabkey) = @{$Mtables{$taboid}};  
                 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';  
                 $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .  
                         " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .  
                                 " and l.key = _$tabname.${tabkey}::text";  
   
                 printf "$sql\n" if $debug;  
   
                 $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_TUPLES_OK)  
                 {  
                         printf $outf "-- ERROR\n" if $havedeal;  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-1);  
                 }  
                 next if $result->ntuples <= 0;  
                 if (! $havedeal)  
                 {  
                         my $syncid = GetSYNCID($conn, $outf);  
                         return($syncid) if $syncid < 0;  
                         $havedeal = 1;  
                 }  
                 printf $outf "-- UPDATE $tabname\n";  
                 while (@row = $result->fetchrow)  
                 {  
                         for ($i = 0; $i <= $#row; $i++)  
                         {  
                                 printf $outf "  " if $i;  
                                 printf $outf "%s", OutputValue($row[$i]);  
                         }  
                         printf $outf "\n";  
                 }  
                 printf $outf "\\.\n";  
         }  
   
         unless ($havedeal)  
         {  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
   
         # Remember this snapshot info  
         $result = $conn->exec("select _rserv_sync_($server)");  
242          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_TUPLES_OK)
243          {          {
244                  printf $outf "-- ERROR\n";              printf $outf "-- ERROR\n" if $havedeal;
245                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $mconn->errorMessage unless ($quiet);
246                  $conn->exec("ROLLBACK");              $mconn->exec("ROLLBACK");
247                  return(-1);              return(-1);
248            }
249            next if $result->ntuples <= 0;
250            if (! $havedeal)
251            {
252                my $syncid = GetSYNCID($mconn, $outf);
253                return($syncid) if $syncid < 0;
254                $havedeal = 1;
255          }          }
256            printf $outf "-- INSERT $tabname\n";
257          $result = $conn->exec("COMMIT");          printf "-- INSERT $tabname\n" if $debug;
258          if ($result->resultStatus ne PGRES_COMMAND_OK)          while (@row = $result->fetchrow)
259          {          {
260                  printf $outf "-- ERROR\n";              for (my $i = 0; $i <= $#row; $i++)
261                  print STDERR $conn->errorMessage unless ($quiet);              {
262                  $conn->exec("ROLLBACK");                  printf $outf "  " if $i;
263                  return(-1);                  printf "        " if $i && $debug;
264          }                  printf $outf "%s", OutputValue($row[$i]);
265          printf $outf "-- OK\n";                  printf "%s", OutputValue($row[$i]) if $debug;
266                }
267          return(1);              printf $outf "\n";
268                printf "\n" if $debug;
269            }
270            printf $outf "\\.\n";
271            printf "\\.\n" if $debug;;
272        }
273        
274        
275        unless ($havedeal)
276        {
277            print STDERR "hon't have deal, rollback...\n" if ($debug);
278            $mconn->exec("ROLLBACK");
279            return(0);
280        }
281        
282        # Remember this snapshot info
283        $result = $mconn->exec("select _rserv_sync_($sserver)");
284        if ($result->resultStatus ne PGRES_TUPLES_OK)
285        {
286            printf $outf "-- ERROR\n";
287            print STDERR $mconn->errorMessage unless ($quiet);
288            $mconn->exec("ROLLBACK");
289            return(-1);
290        }
291        
292        $result = $mconn->exec("COMMIT");
293        if ($result->resultStatus ne PGRES_COMMAND_OK)
294        {
295            printf $outf "-- ERROR\n";
296            print STDERR $mconn->errorMessage unless ($quiet);
297            $mconn->exec("ROLLBACK");
298            return(-1);
299        }
300    
301        printf $outf "-- OK\n";
302        printf "-- OK\n" if $debug;
303        
304        return(1);
305        
306  }  }
307    
308  sub OutputValue  sub OutputValue
# Line 213  sub OutputValue Line 322  sub OutputValue
322  # Get syncid for new snapshot  # Get syncid for new snapshot
323  sub GetSYNCID  sub GetSYNCID
324  {  {
325          my ($conn, $outf) = @_; # (@_[0], @_[1]);      my ($conn, $outf) = @_; # (@_[0], @_[1]);
326        
327          my $result = $conn->exec("select nextval('_rserv_sync_seq_')");      my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
328          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_TUPLES_OK)
329          {      {
330                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
331                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
332                  return(-1);          return(-1);
333          }      }
334    
335          my @row = $result->fetchrow;      my @row = $result->fetchrow;
336        
337          printf $outf "-- SYNCID $row[0]\n";      printf $outf "-- SYNCID $row[0]\n";
338          return($row[0]);      printf "-- SYNCID $row[0]\n" if $debug;
339        return($row[0]);
340  }  }
341    
342    
343  sub CleanLog  sub CleanLog
344  {  {
345          my ($conn, $howold) = @_; # (@_[0], @_[1]);      my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
346        
347          my $result = $conn->exec("BEGIN");      my $result = $conn->exec("BEGIN");
348          if ($result->resultStatus ne PGRES_COMMAND_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK)
349          {      {
350                  print STDERR $conn->errorMessage unless ($quiet);          print STDERR $conn->errorMessage unless ($quiet);
351                  $conn->exec("ROLLBACK");          $conn->exec("ROLLBACK");
352                  return(-1);          return(-1);
353          }      }
354        
355        my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
356            " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
357            " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
358        
359        printf "$sql\n" if $debug;
360        
361        $result = $conn->exec($sql);
362        if ($result->resultStatus ne PGRES_TUPLES_OK)
363        {
364            print STDERR $conn->errorMessage unless ($quiet);
365            return(-1);
366        }
367        my $maxid = '';
368        my %active = ();
369        while (my @row = $result->fetchrow)
370        {
371            $maxid = $row[0] if $maxid eq '';
372            last if $row[0] > $maxid;
373            my @ids = split(/[      ]+,[    ]+/, $row[1]);
374            foreach my $aid (@ids)
375            {
376                $active{$aid} = 1 unless exists $active{$aid};
377            }
378        }
379        if ($maxid eq '')
380        {
381            print STDERR "No Sync IDs\n" unless ($quiet);
382            return(0);
383        }
384        my $alist = join(',', keys %active);
385        my $sinfo = "logid < $maxid";
386        $sinfo .= " AND logid not in ($alist)" if $alist ne '';
387        #if (ref($onlytables) eq 'HASH') {
388        #   foreach my $onlytable (keys %{$onlytables}) {
389        #           $sinfo
390        #   }
391        #}
392        $sql = "delete from _RSERV_LOG_ where " .
393            "logtime < now() - '$howold second'::interval AND $sinfo";
394        
395        printf "$sql\n" if $debug;
396        
397        $result = $conn->exec($sql);
398        if ($result->resultStatus ne PGRES_COMMAND_OK)
399        {
400            print STDERR $conn->errorMessage unless ($quiet);
401            $conn->exec("ROLLBACK");
402            return(-1);
403        }
404        $maxid = $result->cmdTuples;
405        
406        $result = $conn->exec("COMMIT");
407        if ($result->resultStatus ne PGRES_COMMAND_OK)
408        {
409            print STDERR $conn->errorMessage unless ($quiet);
410            $conn->exec("ROLLBACK");
411            return(-1);
412        }
413        
414        return($maxid);
415    }
416    
417          my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .  sub ApplySnapshot
418                  " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .  {
419                          " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";      my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
420    
421          printf "$sql\n" if $debug;      my $serverId;
422    
423          $result = $conn->exec($sql);      my $result = $sconn->exec("BEGIN");
424          if ($result->resultStatus ne PGRES_TUPLES_OK)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
425          {          print STDERR $sconn->errorMessage unless ($quiet);
426                  print STDERR $conn->errorMessage unless ($quiet);          $sconn->exec("ROLLBACK");
427                  return(-1);          return(-1);
428          }      }
429          my $maxid = '';      
430          my %active = ();      $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
431          while (my @row = $result->fetchrow)      if ($result->resultStatus ne PGRES_COMMAND_OK) {
432          {          print STDERR $sconn->errorMessage unless ($quiet);
433                  $maxid = $row[0] if $maxid eq '';          $sconn->exec("ROLLBACK");
434                  last if $row[0] > $maxid;          return(-1);
435                  my @ids = split(/[      ]+,[    ]+/, $row[1]);      }
436                  foreach $aid (@ids)      
437                  {      # MAP name --> oid, keyname, keynum
438                          $active{$aid} = 1 unless exists $active{$aid};      my $sql = qq{
439                  }          select pgc.oid, pgc.relname, pga.attname, rt.key
440          }          from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
441          if ($maxid eq '')          where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
442          {                  AND pga.attnum = rt.key
443                  print STDERR "No Sync IDs\n" unless ($quiet);      };
444        
445        $result = $sconn->exec($sql);
446        if ($result->resultStatus ne PGRES_TUPLES_OK) {
447            print STDERR $sconn->errorMessage unless ($quiet);
448            $sconn->exec("ROLLBACK");
449            return(-1);
450        }
451        %Stables = ();
452        while (my @row = $result->fetchrow) {
453            #       printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
454                    if (ref($onlytables) eq 'HASH') {
455                            next unless (exists $onlytables->{$row[1]});
456                            $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
457                    }
458            push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
459        }
460    
461        print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
462    
463        my $ok = 0;
464        my $syncid = -1;
465        while(<$inpf>) {
466            $_ =~ s/\n//;
467            my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);
468            die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
469            if ($cmt ne '--') {
470                printf STDERR "Invalid format\n" unless ($quiet);
471                $sconn->exec("ROLLBACK");
472                return(-2);
473            }
474            if ($cmd eq 'DELETE') {
475                if ($syncid == -1) {
476                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
477                    $sconn->exec("ROLLBACK");
478                    return(-2);
479                }
480                $result = DoDelete($sconn, $inpf, $prm);
481                if ($result) {
482                    $sconn->exec("ROLLBACK");
483                    return($result);
484                }
485            } elsif ($cmd eq 'INSERT') {
486                if ($syncid == -1) {
487                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
488                    $sconn->exec("ROLLBACK");
489                    return(-2);
490                }
491                $result = DoInsert($sconn, $inpf, $prm);
492                if ($result) {
493                    $sconn->exec("ROLLBACK");
494                    return($result);
495                }
496            } elsif ($cmd eq 'UPDATE') {
497                if ($syncid == -1) {
498                    printf STDERR "Sync ID unspecified\n" unless ($quiet);
499                    $sconn->exec("ROLLBACK");
500                    return(-2);
501                }
502                $result = DoUpdate($sconn, $inpf, $prm);
503                if ($result) {
504                    $sconn->exec("ROLLBACK");
505                    return($result);
506                }
507            } elsif ($cmd eq 'SYNCID') {
508                if ($syncid != -1) {
509                    printf STDERR "Second Sync ID ?!\n" unless ($quiet);
510                    $sconn->exec("ROLLBACK");
511                    return(-2);
512                }
513                if ($prm !~ /^\d+$/) {
514                    printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
515                    $sconn->exec("ROLLBACK");
516                    return(-2);
517                }
518                $syncid = $prm;
519                
520                printf STDERR "Sync ID $syncid\n" unless ($quiet);
521                
522                $result = $sconn->exec(qq{
523                    select syncid, synctime
524                    from _RSERV_SLAVE_SYNC_
525                    where syncid =
526                            (select max(syncid) from _RSERV_SLAVE_SYNC_)
527                });
528                if ($result->resultStatus ne PGRES_TUPLES_OK) {
529                    print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
530                    $sconn->exec("ROLLBACK");
531                    return(-1);
532                }
533    
534                my @row = $result->fetchrow;
535                print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
536                if (! defined $row[0]) {
537                    $result = Exec($sconn,qq{
538                            insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
539                            values ($syncid, now())
540                    });
541                } elsif ($row[0] >= $prm) {
542                    printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
543                    $sconn->exec("ROLLBACK");
544                  return(0);                  return(0);
545          }              } else {
546          my $alist = join(',', keys %active);                  $result = Exec($sconn,qq{
547          my $sinfo = "logid < $maxid";                          update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
548          $sinfo .= " and logid not in ($alist)" if $alist ne '';                  });
549                        }
550          $sql = "delete from _RSERV_LOG_ where " .              if ($result->resultStatus ne PGRES_COMMAND_OK) {
551                  "logtime < now() - '$howold second'::interval and $sinfo";                  print STDERR $sconn->errorMessage unless ($quiet);
552                    $sconn->exec("ROLLBACK");
553          printf "$sql\n" if $debug;                  return(-1);
554                }
555          $result = $conn->exec($sql);          } elsif ($cmd eq 'OK') {
556          if ($result->resultStatus ne PGRES_COMMAND_OK)              $ok = 1;
557          {              if ($multimaster) {
558                  print STDERR $conn->errorMessage unless ($quiet);              # now, update server in _rserv_log_ based on transaction xid
559                  $conn->exec("ROLLBACK");                  ExecFatch($sconn,"select count(*) from _rserv_log_");
560                  return(-1);                  ExecDebug($sconn,"select * from _rserv_log_");
561          }                  my $keys_sql = qq{
562          $maxid = $result->cmdTuples;                          update _rserv_log_ set server=$serverId
563                            where logid = (select _rserv_xid_())
564          $result = $conn->exec("COMMIT");                  };
565          if ($result->resultStatus ne PGRES_COMMAND_OK)                                  
566          {                  Exec($sconn,$keys_sql);
567                  print STDERR $conn->errorMessage unless ($quiet);              }
568                  $conn->exec("ROLLBACK");              last;
569                  return(-1);          } elsif ($cmd eq 'ERROR') {
570          }              printf STDERR "ERROR signaled\n" unless ($quiet);
571                $sconn->exec("ROLLBACK");
572          return($maxid);              return(-2);
573            } elsif ($cmd eq 'SERVER') {
574                if ($prm !~ /^\d+$/) {
575                    printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
576                    $sconn->exec("ROLLBACK");
577                    return(-2);
578                }
579                $serverId = $prm;
580                print STDERR "Server ID $serverId\n" unless ($quiet);
581            } else {
582                printf STDERR "Unknown command $cmd\n" unless ($quiet);
583                $sconn->exec("ROLLBACK");
584                return(-2);
585            }
586        }
587        
588        if (! $ok) {
589            printf STDERR "No OK flag in input\n" unless ($quiet);
590            $sconn->exec("ROLLBACK");
591            return(-2);
592        }
593        
594        $result = $sconn->exec("COMMIT");
595        if ($result->resultStatus ne PGRES_COMMAND_OK) {
596            print STDERR $sconn->errorMessage unless ($quiet);
597            $sconn->exec("ROLLBACK");
598            return(-1);
599        }
600        
601        return(1);
602  }  }
603    
604  sub ApplySnapshot  sub DoDelete
605  {  {
606          my ($conn, $inpf) = @_; # (@_[0], @_[1]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
607    
608          my $result = $conn->exec("BEGIN");      # only delete tables that the slave wants
609          if ($result->resultStatus ne PGRES_COMMAND_OK)      if (! defined($Stables{$tabname})) {
610          {          print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
611                  print STDERR $conn->errorMessage unless ($quiet);          while (<$inpf>) {
612                  $conn->exec("ROLLBACK");              my $istring = $_;
613                  return(-1);              $istring =~ s/\n//;
614                last if ($istring eq '\.');
615          }          }
616            return(0);
617        }
618    
619          $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");      my $ok = 0;
620          if ($result->resultStatus ne PGRES_COMMAND_OK)      while(<$inpf>)
621        {
622            if ($_ !~ /\n$/)
623            {
624                printf STDERR "Invalid format\n" unless ($quiet);
625                return(-2);
626            }
627            my $key = $_;
628            $key =~ s/\n//;
629            if ($key eq '\.')
630          {          {
631                  print STDERR $conn->errorMessage unless ($quiet);              $ok = 1;
632                  $conn->exec("ROLLBACK");              last;
                 return(-1);  
633          }          }
634            
635          # MAP name --> oid, keyname, keynum          my $sql = "delete from \"$tabname\" where ".
636          my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .              "\"$Stables{$tabname}->[1]\" = '$key'";
637                  " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .          
638                          " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .          printf "$sql\n" if $debug;
639                                  " and pga.attnum = rt.key";          
640          $result = $conn->exec($sql);          my $result = $sconn->exec($sql);
641          if ($result->resultStatus ne PGRES_TUPLES_OK)          if ($result->resultStatus ne PGRES_COMMAND_OK)
642          {          {
643                  print STDERR $conn->errorMessage unless ($quiet);              print STDERR $sconn->errorMessage unless ($quiet);
644                  $conn->exec("ROLLBACK");              return(-1);
                 return(-1);  
645          }          }
646        }
647        
648        if (! $ok)
649        {
650            printf STDERR "No end of input in DELETE section\n" unless ($quiet);
651            return(-2);
652        }
653        
654        return(0);
655    }
656    
         while (@row = $result->fetchrow)  
         {  
         #       printf "        %s      %s\n", $row[1], $row[0];  
                 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];  
         }  
657    
658          my $ok = 0;  sub DoUpdate
659          my $syncid = '';  {
660          while(<$inpf>)      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
661          {  
662                  $_ =~ s/\n//;      # only update the tables that the slave wants
663                  my ($cmt, $cmd, $prm) = split (/[       ]+/, $_, 3);      if (! defined($Stables{$tabname})) {
664                  if ($cmt ne '--')          print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
665                  {          while (<$inpf>) {
666                          printf STDERR "Invalid format\n" unless ($quiet);              my $istring = $_;
667                          $conn->exec("ROLLBACK");              $istring =~ s/\n//;
668                          return(-2);              last if ($istring eq '\.');
                 }  
                 if ($cmd eq 'DELETE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoDelete($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'UPDATE')  
                 {  
                         if ($syncid eq '')  
                         {  
                                 printf STDERR "Sync ID unspecified\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $result = DoUpdate($conn, $inpf, $prm);  
                         if ($result)  
                         {  
                                 $conn->exec("ROLLBACK");  
                                 return($result);  
                         }  
                 }  
                 elsif ($cmd eq 'SYNCID')  
                 {  
                         if ($syncid ne '')  
                         {  
                                 printf STDERR "Second Sync ID ?!\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         if ($prm !~ /^\d+$/)  
                         {  
                                 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-2);  
                         }  
                         $syncid = $prm;  
   
                         printf STDERR "Sync ID $syncid\n" unless ($quiet);  
   
                         $result = $conn->exec("select syncid, synctime from " .  
                                                                   "_RSERV_SLAVE_SYNC_ where syncid = " .  
                                                                   "(select max(syncid) from _RSERV_SLAVE_SYNC_)");  
                         if ($result->resultStatus ne PGRES_TUPLES_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                         my @row = $result->fetchrow;  
                         if (! defined $row[0])  
                         {  
                                 $result = $conn->exec("insert into" .  
                                                                           " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");  
                         }  
                         elsif ($row[0] >= $prm)  
                         {  
                                 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(0);  
                         }  
                         else  
                         {  
                                 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .  
                                                                           " set syncid = $syncid, synctime = now()");  
                         }  
                         if ($result->resultStatus ne PGRES_COMMAND_OK)  
                         {  
                                 print STDERR $conn->errorMessage unless ($quiet);  
                                 $conn->exec("ROLLBACK");  
                                 return(-1);  
                         }  
                 }  
                 elsif ($cmd eq 'OK')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
                 elsif ($cmd eq 'ERROR')  
                 {  
                         printf STDERR "ERROR signaled\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
                 else  
                 {  
                         printf STDERR "Unknown command $cmd\n" unless ($quiet);  
                         $conn->exec("ROLLBACK");  
                         return(-2);  
                 }  
669          }          }
670            return(0);
671        }
672    
673          if (! $ok)      my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
674          {      
675                  printf STDERR "No OK flag in input\n" unless ($quiet);      my @CopyBuf = ();
676                  $conn->exec("ROLLBACK");      my $CBufLen = 0;
677        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
678    
679        my $sql = "select attnum, attname from pg_attribute" .
680            " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
681        
682        my $result = $sconn->exec($sql);
683        if ($result->resultStatus ne PGRES_TUPLES_OK)
684        {
685            print STDERR $sconn->errorMessage unless ($quiet);
686            return(-1);
687        }
688        
689        my @anames = ();
690        while (my @row = $result->fetchrow) {
691            $anames[$row[0]] = $row[1];
692        }
693        
694        my $istring;
695        my $ok = 0;
696        while(<$inpf>) {
697            if ($_ !~ /\n$/) {
698                printf STDERR "Invalid format\n" unless ($quiet);
699                return(-2);
700            }
701            $istring = $_;
702            $istring =~ s/\n//;
703            if ($istring eq '\.') {
704                $ok = 1;
705                last;
706            }
707            my @vals = split(/      /, $istring);
708            if ($oidkey) {
709                if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
710                    printf STDERR "Invalid OID\n" unless ($quiet);
711                  return(-2);                  return(-2);
712                }
713                $oidkey = $vals[0];
714            } else {
715                unshift @vals, '';
716          }          }
717            
718          $result = $conn->exec("COMMIT");          $sql = "update \"$tabname\" set ";
719          if ($result->resultStatus ne PGRES_COMMAND_OK)          my $ocnt = 0;
720          {          for (my $i = 1; $i <= $#anames; $i++) {
721                  print STDERR $conn->errorMessage unless ($quiet);              if ($vals[$i] eq '\N') {
722                  $conn->exec("ROLLBACK");                  if ($i == $Stables{$tabname}->[2]) {
723                  return(-1);                      printf STDERR "NULL key\n" unless ($quiet);
724                        return(-2);
725                    }
726                    $vals[$i] = 'null';
727                } else {
728                    $vals[$i] = "'" . $vals[$i] . "'";
729                    next if $i == $Stables{$tabname}->[2];
730                }
731                $ocnt++;
732                $sql .= ', ' if $ocnt > 1;
733                $sql .= "\"$anames[$i]\" = $vals[$i]";
734            } if ($oidkey) {
735                $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
736            } else {
737                $sql .= " where \"$Stables{$tabname}->[1]\" = ".
738                    $vals[$Stables{$tabname}->[2]];
739          }          }
740            
741            printf "$sql\n" if $debug;
742            
743            $result = $sconn->exec($sql);
744            
745            if ($result->resultStatus ne PGRES_COMMAND_OK) {
746                print STDERR $sconn->errorMessage unless ($quiet);
747                return(-1);
748            }
749            next if $result->cmdTuples == 1;        # updated
750            
751            if ($result->cmdTuples > 1) {
752                printf STDERR "Duplicate keys\n" unless ($quiet);
753                return(-2);
754            }
755    
756            # no key - copy
757            push @CopyBuf, "$istring\n";
758            $CBufLen += length($istring);
759            
760            if ($CBufLen >= $CBufMax) {
761                $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
762                return($result) if $result;
763                @CopyBuf = ();
764                $CBufLen = 0;
765            }
766        }
767        
768        if (! $ok) {
769            printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
770            return(-2);
771        }
772        
773        if ($CBufLen) {
774            print STDERR "@CopyBuf\n" if $debug;
775            $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776            return($result) if $result;
777        }
778    
779          return(1);      return(0);
780  }  }
781    
782  sub DoDelete  sub DoInsert
783  {  {
784          my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);      my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
785    
786          my $ok = 0;      # only insert rows into tables that the slave wants
787          while(<$inpf>)      if (! defined($Stables{$tabname})) {
788          {          print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
789                  if ($_ !~ /\n$/)          while (<$inpf>) {
790                  {              my $istring = $_;
791                          printf STDERR "Invalid format\n" unless ($quiet);              $istring =~ s/\n//;
792                          return(-2);              last if ($istring eq '\.');
                 }  
                 my $key = $_;  
                 $key =~ s/\n//;  
                 if ($key eq '\.')  
                 {  
                         $ok = 1;  
                         last;  
                 }  
   
                 my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";  
   
                 printf "$sql\n" if $debug;  
   
                 my $result = $conn->exec($sql);  
                 if ($result->resultStatus ne PGRES_COMMAND_OK)  
                 {  
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
793          }          }
   
         if (! $ok)  
         {  
                 printf STDERR "No end of input in DELETE section\n" unless ($quiet);  
                 return(-2);  
         }  
   
794          return(0);          return(0);
795  }      }
796    
797        my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
798        
799        my @CopyBuf = ();
800        my $CBufLen = 0;
801        my $CBufMax = 16 * 1024 * 1024;     # max size of buf for copy
802        
803        my $istring;
804        my $ok = 0;
805        while(<$inpf>) {
806            if ($_ !~ /\n$/) {
807                printf STDERR "Invalid format\n" unless ($quiet);
808                return(-2);
809            }
810            $istring = $_;
811            $istring =~ s/\n//;
812            if ($istring eq '\.') {
813                $ok = 1;
814                last;
815            }
816    
817            # no key - copy
818            push @CopyBuf, "$istring\n";
819            $CBufLen += length($istring);
820            
821            if ($CBufLen >= $CBufMax) {
822                my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
823                return($result) if $result;
824                @CopyBuf = ();
825                $CBufLen = 0;
826            }
827        }
828        
829        if (! $ok) {
830            printf STDERR "No end of input in INSERT section\n" unless ($quiet);
831            return(-2);
832        }
833        
834        if ($CBufLen) {
835            print STDERR "@CopyBuf\n" if $debug;
836            my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
837            return($result) if $result;
838        }
839        
840        return(0);
841    }
842    
 sub DoUpdate  
 {  
         my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);  
         my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;  
   
         my @CopyBuf = ();  
         my $CBufLen = 0;  
         my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy  
843    
844          my $sql = "select attnum, attname from pg_attribute" .  sub DoCopy {
845                  " where attrelid = $Stables{$tabname}->[0] and attnum > 0";      my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
846        
847        my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
848            "FROM STDIN";
849        my $result = $sconn->exec($sql);
850        if ($result->resultStatus ne PGRES_COPY_IN) {
851            print STDERR $sconn->errorMessage unless ($quiet);
852            return(-1);
853        }
854        
855        foreach my $str (@{$CBuf}) {
856            $sconn->putline($str);
857        }
858        
859        $sconn->putline("\\.\n");
860        
861        if ($sconn->endcopy) {
862            print STDERR $sconn->errorMessage unless ($quiet);
863            return(-1);
864        }
865        
866        return(0);
867    }
868    
         my $result = $conn->exec($sql);  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 return(-1);  
         }  
869    
870          my @anames = ();  #
871          while (@row = $result->fetchrow)  # Returns last SyncID applied on Slave
872          {  #
873                  $anames[$row[0]] = $row[1];  sub GetSyncID {
874          }      my ($sconn) = @_; # (@_[0]);
875        
876        my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
877        if ($result->resultStatus ne PGRES_TUPLES_OK) {
878            print STDERR $sconn->errorMessage unless ($quiet);
879            return(-1);
880        }
881        my @row = $result->fetchrow;
882        print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
883        return(undef) unless defined $row[0];       # null
884        return($row[0]);
885    }
886    
887          my $istring;  #
888          my $ok = 0;  # Updates _RSERV_SYNC_ on Master with Slave SyncID
889          while(<$inpf>)  #
890          {  sub SyncSyncID {
891                  if ($_ !~ /\n$/)      my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
892                  {      
893                          printf STDERR "Invalid format\n" unless ($quiet);      my $result = $mconn->exec("BEGIN");
894                          return(-2);      if ($result->resultStatus ne PGRES_COMMAND_OK) {
895                  }          print STDERR $mconn->errorMessage unless ($quiet);
896                  $istring = $_;          $mconn->exec("ROLLBACK");
897                  $istring =~ s/\n//;          return(-1);
898                  if ($istring eq '\.')      }
899                  {      
900                          $ok = 1;      $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
901                          last;                            " where server = $sserver AND syncid = $syncid" .
902                  }                            " for update");
903                  my @vals = split(/      /, $istring);      if ($result->resultStatus ne PGRES_TUPLES_OK) {
904                  if ($oidkey)          print STDERR $mconn->errorMessage unless ($quiet);
905                  {          $mconn->exec("ROLLBACK");
906                          if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)          return(-1);
907                          {      }
908                                  printf STDERR "Invalid OID\n" unless ($quiet);      my @row = $result->fetchrow;
909                                  return(-2);      if (! defined $row[0]) {
910                          }          printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
911                          $oidkey = $vals[0];          $mconn->exec("ROLLBACK");
912                  }          return(0);
913                  else      }
914                  {      if ($row[1] > 0) {
915                          unshift @vals, '';          printf STDERR "SyncID $syncid for server ".
916                  }              "$sserver already updated\n" unless ($quiet);
917            $mconn->exec("ROLLBACK");
918            return(0);
919        }
920        $result = $mconn->exec("update _RSERV_SYNC_" .
921                              " set synctime = now(), status = 1" .
922                              " where server = $sserver AND syncid = $syncid");
923        if ($result->resultStatus ne PGRES_COMMAND_OK) {
924            print STDERR $mconn->errorMessage unless ($quiet);
925            $mconn->exec("ROLLBACK");
926            return(-1);
927        }
928        $result = $mconn->exec("delete from _RSERV_SYNC_" .
929                              " where server = $sserver AND syncid < $syncid");
930        if ($result->resultStatus ne PGRES_COMMAND_OK) {
931            print STDERR $mconn->errorMessage unless ($quiet);
932            $mconn->exec("ROLLBACK");
933            return(-1);
934        }
935        
936        $result = $mconn->exec("COMMIT");
937        if ($result->resultStatus ne PGRES_COMMAND_OK) {
938            print STDERR $mconn->errorMessage unless ($quiet);
939            $mconn->exec("ROLLBACK");
940            return(-1);
941        }
942        
943        return(1);
944    }
945    
946                  $sql = "update $tabname set ";  # stuff moved from perl scripts for better re-use
                 my $ocnt = 0;  
                 for (my $i = 1; $i <= $#anames; $i++)  
                 {  
                         if ($vals[$i] eq '\N')  
                         {  
                                 if ($i == $Stables{$tabname}->[2])  
                                 {  
                                         printf STDERR "NULL key\n" unless ($quiet);  
                                         return(-2);  
                                 }  
                                 $vals[$i] = 'null';  
                         }  
                         else  
                         {  
                                 $vals[$i] = "'" . $vals[$i] . "'";  
                                 next if $i == $Stables{$tabname}->[2];  
                         }  
                         $ocnt++;  
                         $sql .= ', ' if $ocnt > 1;  
                         $sql .= "$anames[$i] = $vals[$i]";  
                 }  
                 if ($oidkey)  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $oidkey";  
                 }  
                 else  
                 {  
                         $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";  
                 }  
947    
948                  printf "$sql\n" if $debug;  sub Rollback {
949        my $conn = shift @_;
950    
951                  $result = $conn->exec($sql);      print STDERR $conn->errorMessage unless ($quiet);
952                  if ($result->resultStatus ne PGRES_COMMAND_OK)      $conn->exec("ROLLBACK");
953                  {  }
                         print STDERR $conn->errorMessage unless ($quiet);  
                         return(-1);  
                 }  
                 next if $result->cmdTuples == 1;        # updated  
954    
955                  if ($result->cmdTuples > 1)  sub RollbackAndQuit {
956                  {      my $conn = shift @_;
                         printf STDERR "Duplicate keys\n" unless ($quiet);  
                         return(-2);  
                 }  
957    
958                  # no key - copy      Rollback($conn);
959                  push @CopyBuf, "$istring\n";      exit (-1);
960                  $CBufLen += length($istring);  }
   
                 if ($CBufLen >= $CBufMax)  
                 {  
                         $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);  
                         return($result) if $result;  
                         @CopyBuf = ();  
                         $CBufLen = 0;  
                 }  
         }  
961    
962          if (! $ok)  sub Connect {
963          {          my $info = shift @_;
                 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);  
                 return(-2);  
         }  
964    
965          if ($CBufLen)          print("Connecting to $info\n") if ($debug || $verbose);
966          {          my $conn = Pg::connectdb($info);
967                  $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);          if ($conn->status != PGRES_CONNECTION_OK) {
968                  return($result) if $result;              die "Failed opening $info";
969          }          }
970            return $conn;
         return(0);  
971  }  }
972    
973    sub Exec {
974  sub DoCopy          my $conn = shift || die "Exec needs connection!";
975  {          my $sql = shift || die "Exec needs SQL statement!";
976          my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);          # used to return error code if no tuples are retured
977            my $return_code = shift;
978          my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .  
979  "FROM STDIN";          if ($debug) {
980          my $result = $conn->exec($sql);                  # re-format SQL in one line (for nicer output)
981          if ($result->resultStatus ne PGRES_COPY_IN)                  $sql =~ s/[\s\n\r]+/ /gs;
982          {                  print STDERR "Exec: $sql\n";
                 print STDERR $conn->errorMessage unless ($quiet);  
                 return(-1);  
983          }          }
984            my $result = $conn->exec($sql);
985          foreach $str (@{$CBuf})          if ($result->resultStatus eq PGRES_COMMAND_OK) {
986          {                  return $result;
987                  $conn->putline($str);          } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
988                    print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
989                    return $result;
990            } else {
991                    if (defined($return_code)) {
992                            print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
993                            return($return_code);
994                    } else {
995                            RollbackAndQuit($conn)
996                    }
997          }          }
998    }
999    
1000          $conn->putline("\\.\n");  sub Exec2 {
1001            my $mconn = shift @_;
1002            my $sconn = shift @_;
1003            my $sql = shift @_;
1004    
1005            my $result = $mconn->exec($sql);
1006            RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1007            $result = $sconn->exec($sql);
1008            RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1009            # XXX TODO: return results?!
1010    }
1011    
1012          if ($conn->endcopy)  # exec sql query and return one row from it
1013          {  sub ExecFatch {
1014                  print STDERR $conn->errorMessage unless ($quiet);          my $conn = shift || die "ExecFatch need conn!";
1015                  return(-1);          my $sql = shift || die "ExecFatch need SQL!";
1016    
1017            if ($debug) {
1018                    # re-format SQL in one line (for nicer output)
1019                    $sql =~ s/[\s\n\r]+/ /gs;
1020                    print STDERR "Exec: $sql\n";
1021          }          }
1022    
1023          return(0);          my $result = $conn->exec($sql);
1024            RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
 }  
1025    
1026            print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1027    
 #  
 # Returns last SyncID applied on Slave  
 #  
 sub GetSyncID  
 {  
         my ($conn) = @_; # (@_[0]);  
           
         my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 return(-1);  
         }  
1028          my @row = $result->fetchrow;          my @row = $result->fetchrow;
1029          return(undef) unless defined $row[0];   # null          print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1030          return($row[0]);          return @row;
1031  }  }
1032    
1033  #  # exec sql query and dump all rows retured to STDERR (great for debugging)
1034  # Updates _RSERV_SYNC_ on Master with Slave SyncID  sub ExecDebug {
1035  #          return if (! $debug);
1036  sub SyncSyncID  
1037  {          my $conn = shift || die "ExecDebug need conn!";
1038          my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);          my $sql = shift || die "ExecDebug need SQL!";
1039            
1040          my $result = $conn->exec("BEGIN");          if ($debug) {
1041          if ($result->resultStatus ne PGRES_COMMAND_OK)                  # re-format SQL in one line (for nicer output)
1042          {                  $sql =~ s/[\s\n\r]+/ /gs;
1043                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR "Exec: $sql\n";
                 $conn->exec("ROLLBACK");  
                 return(-1);  
1044          }          }
1045    
1046          $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .          my $result = $conn->exec($sql);
1047                                                    " where server = $server and syncid = $syncid" .          RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
                                                   " for update");  
         if ($result->resultStatus ne PGRES_TUPLES_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         my @row = $result->fetchrow;  
         if (! defined $row[0])  
         {  
                 printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
         if ($row[1] > 0)  
         {  
                 printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(0);  
         }  
         $result = $conn->exec("update _RSERV_SYNC_" .  
                                                   " set synctime = now(), status = 1" .  
                                                   " where server = $server and syncid = $syncid");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
         $result = $conn->exec("delete from _RSERV_SYNC_" .  
                                                   " where server = $server and syncid < $syncid");  
         if ($result->resultStatus ne PGRES_COMMAND_OK)  
         {  
                 print STDERR $conn->errorMessage unless ($quiet);  
                 $conn->exec("ROLLBACK");  
                 return(-1);  
         }  
1048    
1049          $result = $conn->exec("COMMIT");          print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1050          if ($result->resultStatus ne PGRES_COMMAND_OK)  
1051          {          while (my @row = $result->fetchrow) {
1052                  print STDERR $conn->errorMessage unless ($quiet);                  print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
                 $conn->exec("ROLLBACK");  
                 return(-1);  
1053          }          }
1054            return $result->ntuples;
1055    }
1056    sub MkInfo {
1057            my $db = shift || die "need database name!";
1058            my $host = shift;
1059            my $port = shift;
1060            my $user = shift;
1061            my $password = shift;
1062    
1063            my $info = "dbname=$db";
1064            $info = "$info host=$host" if (defined($host));
1065            $info = "$info port=$port" if (defined($port));
1066            $info = "$info user=$user" if (defined($user));
1067            $info = "$info password=$password" if (defined($password));
1068    
1069          return(1);          return $info;
1070  }  }
1071    
1072  1;  1;

Legend:
Removed from v.1.1.1.1  
changed lines
  Added in v.1.18

  ViewVC Help
Powered by ViewVC 1.1.26