/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.16 - (hide annotations)
Sun Nov 2 15:43:08 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.15: +102 -73 lines
added SERVER (id of source server) and KEYS (which keys are transfered in
this snapshot) in snapshot format

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.11 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 dpavlin 1.13 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11 dpavlin 1.11 $debug $quiet $verbose
12     );
13 dpavlin 1.1 @EXPORT_OK = qw();
14 dpavlin 1.3 use strict;
15 dpavlin 1.1 use Pg;
16    
17 dpavlin 1.11 my $debug = 0;
18     my $quiet = 1;
19     my $verbose = 0;
20 dpavlin 1.1
21 dpavlin 1.12 $debug = 1;
22     $quiet = 0;
23     $verbose = 1;
24    
25 dpavlin 1.1 my %Mtables = ();
26     my %Stables = ();
27    
28 dpavlin 1.11 sub GetServerId
29 dpavlin 1.1 {
30 dpavlin 1.11 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32     print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33 dpavlin 1.1
34 dpavlin 1.6 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 dpavlin 1.11 " host='$Host' AND dbase='$DB'");
36 dpavlin 1.3
37 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
38     {
39 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
40 dpavlin 1.2 return(-1);
41     }
42    
43 dpavlin 1.3 if ($result->cmdTuples && $result->cmdTuples > 1)
44 dpavlin 1.2 {
45 dpavlin 1.11 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 dpavlin 1.2 return(-2);
47     }
48    
49     my @row = $result->fetchrow;
50 dpavlin 1.10
51 dpavlin 1.11 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52 dpavlin 1.10
53 dpavlin 1.2 return $row[0];
54     }
55 dpavlin 1.1
56 dpavlin 1.2 sub PrepareSnapshot
57     {
58 dpavlin 1.16 my ($mconn, $sconn, $outf, $mserver, $sserver, $multiplemaster, $onlytables) = @_;
59 dpavlin 1.9
60     if ($mserver == $sserver) {
61 dpavlin 1.10 print STDERR "master and slave numbers are same [$mserver] !\n";
62 dpavlin 1.9 return(-1);
63     }
64    
65     print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66 dpavlin 1.1
67 dpavlin 1.16 # dump master server ID into snapshot file (to prevent replication
68     # of colums from master back to slave)
69     print $outf "-- SERVER $mserver\n";
70    
71 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
72 dpavlin 1.15 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73     return (-1) if ($result == -1);
74    
75 dpavlin 1.2 my @row;
76     while (@row = $result->fetchrow) {
77     $Stables{$row[0]} = 1;
78     }
79    
80 dpavlin 1.14 print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81    
82 dpavlin 1.15 Exec($mconn,"BEGIN");
83     Exec($mconn,"set transaction isolation level serializable");
84 dpavlin 1.2
85     # MAP oid --> tabname, keyname, key_type
86 dpavlin 1.15 my $sql = qq{
87     select pgc.oid, pgc.relname, pga.attname, pgt.typname
88     from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89     pg_type pgt
90     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91     AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92     };
93     $result = Exec($mconn,$sql);
94    
95 dpavlin 1.2 while (@row = $result->fetchrow)
96     {
97 dpavlin 1.16 printf "$row[0], $row[1], $row[2]\n" if ($debug);
98 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
99     next unless (exists $onlytables->{$row[1]});
100     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101     }
102 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103     }
104 dpavlin 1.12
105 dpavlin 1.14 print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106 dpavlin 1.15 if (! %Mtables) {
107     print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108     RollbackAndQuit($mconn);
109     }
110 dpavlin 1.12
111 dpavlin 1.2 # Read last succeeded sync
112 dpavlin 1.15 $sql = qq{
113     select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114     where server = $sserver AND syncid =
115     (select max(syncid) from _RSERV_SYNC_
116     where server = $sserver AND status > 0)
117     };
118 dpavlin 1.2
119 dpavlin 1.15 $result = Exec($mconn,$sql);
120 dpavlin 1.16
121 dpavlin 1.2 my @lastsync = $result->fetchrow;
122 dpavlin 1.14 print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123 dpavlin 1.9
124     # exclude data which originated from master server
125     my $sel_server = " and l.server = $mserver ";
126    
127 dpavlin 1.2 my $sinfo = "";
128 dpavlin 1.4 if (@lastsync && $lastsync[3] ne '') # sync info
129 dpavlin 1.2 {
130     $sinfo = "and (l.logid >= $lastsync[3]";
131     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132     $sinfo .= ")";
133     }
134 dpavlin 1.16
135     my @keys; # keys in this snapshot
136    
137 dpavlin 1.2 my $havedeal = 0;
138    
139     # DELETED rows
140     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
141 dpavlin 1.9 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
142 dpavlin 1.2
143 dpavlin 1.9 printf "DELETED: $sql\n" if $debug;
144 dpavlin 1.2
145 dpavlin 1.6 $result = $mconn->exec($sql);
146 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
147     {
148 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
149     $mconn->exec("ROLLBACK");
150 dpavlin 1.2 return(-1);
151     }
152    
153 dpavlin 1.7 my $lastoid = -1;
154 dpavlin 1.2 while (@row = $result->fetchrow)
155     {
156     next unless exists $Mtables{$row[0]};
157     next unless exists $Stables{$Mtables{$row[0]}[0]};
158    
159     if ($lastoid != $row[0])
160     {
161 dpavlin 1.7 if ($lastoid == -1)
162 dpavlin 1.2 {
163 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
164 dpavlin 1.2 return($syncid) if $syncid < 0;
165     $havedeal = 1;
166     }
167     else
168     {
169     printf $outf "\\.\n";
170     }
171     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
172     $lastoid = $row[0];
173     }
174     if (! defined $row[1])
175     {
176     print STDERR "NULL key\n" unless ($quiet);
177 dpavlin 1.6 $mconn->exec("ROLLBACK");
178 dpavlin 1.2 return(-2);
179     }
180     printf $outf "%s\n", OutputValue($row[1]);
181 dpavlin 1.16 push @keys,OutputKey($row[2]);
182 dpavlin 1.2 }
183 dpavlin 1.7 printf $outf "\\.\n" if ($lastoid != -1);
184 dpavlin 1.2
185     # UPDATED rows
186    
187     my ($taboid, $tabname, $tabkey);
188     foreach $taboid (keys %Mtables)
189     {
190 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
191 dpavlin 1.2 next unless exists $Stables{$tabname};
192    
193     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
194    
195 dpavlin 1.16 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\",$oidkey \"_$tabname\".* FROM \"$tabname\" ".
196 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
197 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
198     $sel_server;
199 dpavlin 1.2
200 dpavlin 1.9 printf "UPDATED: $sql\n" if $debug;
201 dpavlin 1.2
202 dpavlin 1.6 $result = $mconn->exec($sql);
203 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
204     {
205 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
206 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
207     $mconn->exec("ROLLBACK");
208 dpavlin 1.2 return(-1);
209     }
210     next if $result->ntuples <= 0;
211     if (! $havedeal)
212     {
213 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
214 dpavlin 1.2 return($syncid) if $syncid < 0;
215     $havedeal = 1;
216 dpavlin 1.1 }
217 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
218     printf "-- UPDATE $tabname\n" if $debug;
219     while (@row = $result->fetchrow)
220 dpavlin 1.1 {
221 dpavlin 1.16 push @keys,OutputKey(shift @row);
222 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
223 dpavlin 1.2 {
224     printf $outf " " if $i;
225     printf " " if $i && $debug;
226     printf $outf "%s", OutputValue($row[$i]);
227     printf "%s", OutputValue($row[$i]) if $debug;;
228     }
229     printf $outf "\n";
230     printf "\n" if $debug;
231     }
232     printf $outf "\\.\n";
233     printf "\\.\n" if $debug;;
234     }
235    
236     # INSERTED rows
237    
238     foreach $taboid (keys %Mtables)
239     {
240 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
241 dpavlin 1.2 next unless exists $Stables{$tabname};
242    
243     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
244    
245 dpavlin 1.16 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\", $oidkey \"_$tabname\".* FROM \"$tabname\" ".
246 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
247 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
248     $sel_server;
249 dpavlin 1.2
250 dpavlin 1.9 printf "INSERTED: $sql\n" if $debug;
251 dpavlin 1.2
252 dpavlin 1.6 $result = $mconn->exec($sql);
253 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
254     {
255 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
256 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
257     $mconn->exec("ROLLBACK");
258 dpavlin 1.2 return(-1);
259     }
260     next if $result->ntuples <= 0;
261     if (! $havedeal)
262     {
263 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
264 dpavlin 1.2 return($syncid) if $syncid < 0;
265     $havedeal = 1;
266 dpavlin 1.1 }
267 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
268     printf "-- INSERT $tabname\n" if $debug;
269 dpavlin 1.1 while (@row = $result->fetchrow)
270     {
271 dpavlin 1.16 push @keys,OutputKey(shift @row);
272 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
273 dpavlin 1.2 {
274     printf $outf " " if $i;
275     printf " " if $i && $debug;
276     printf $outf "%s", OutputValue($row[$i]);
277     printf "%s", OutputValue($row[$i]) if $debug;;
278     }
279     printf $outf "\n";
280     printf "\n" if $debug;
281     }
282     printf $outf "\\.\n";
283     printf "\\.\n" if $debug;;
284     }
285    
286    
287     unless ($havedeal)
288     {
289 dpavlin 1.6 $mconn->exec("ROLLBACK");
290 dpavlin 1.2 return(0);
291     }
292    
293     # Remember this snapshot info
294 dpavlin 1.8 $result = $mconn->exec("select _rserv_sync_($sserver)");
295 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
296     {
297     printf $outf "-- ERROR\n";
298 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
299     $mconn->exec("ROLLBACK");
300 dpavlin 1.2 return(-1);
301     }
302    
303 dpavlin 1.6 $result = $mconn->exec("COMMIT");
304 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
305     {
306     printf $outf "-- ERROR\n";
307 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
308     $mconn->exec("ROLLBACK");
309 dpavlin 1.2 return(-1);
310     }
311 dpavlin 1.16
312     if ($multiplemaster) {
313     # save keys
314     my $key_out = "-- KEYS ".($#keys+1)."\n".join(",",@keys)."\n";
315     print $outf $key_out;
316     print $key_out if ($debug);
317     }
318    
319 dpavlin 1.2 printf $outf "-- OK\n";
320     printf "-- OK\n" if $debug;
321    
322     return(1);
323    
324 dpavlin 1.1 }
325    
326     sub OutputValue
327     {
328     my ($val) = @_; # @_[0];
329    
330     return("\\N") unless defined $val;
331    
332     $val =~ s/\\/\\\\/g;
333     $val =~ s/ /\\011/g;
334     $val =~ s/\n/\\012/g;
335     $val =~ s/\'/\\047/g;
336    
337     return($val);
338     }
339    
340 dpavlin 1.16 sub OutputKey {
341     my $val = shift;
342    
343     return if (! defined($val));
344    
345     if ($val =~ m/^\d+$/) {
346     return $val;
347     } else {
348     return "'$val'";
349     }
350     }
351    
352 dpavlin 1.1 # Get syncid for new snapshot
353     sub GetSYNCID
354     {
355 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
356    
357     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
358     if ($result->resultStatus ne PGRES_TUPLES_OK)
359     {
360     print STDERR $conn->errorMessage unless ($quiet);
361     $conn->exec("ROLLBACK");
362     return(-1);
363     }
364 dpavlin 1.16
365 dpavlin 1.2 my @row = $result->fetchrow;
366    
367     printf $outf "-- SYNCID $row[0]\n";
368     printf "-- SYNCID $row[0]\n" if $debug;
369     return($row[0]);
370 dpavlin 1.1 }
371    
372    
373     sub CleanLog
374     {
375 dpavlin 1.5 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
376 dpavlin 1.2
377     my $result = $conn->exec("BEGIN");
378     if ($result->resultStatus ne PGRES_COMMAND_OK)
379     {
380     print STDERR $conn->errorMessage unless ($quiet);
381     $conn->exec("ROLLBACK");
382     return(-1);
383     }
384    
385     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
386     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
387     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
388    
389     printf "$sql\n" if $debug;
390    
391     $result = $conn->exec($sql);
392     if ($result->resultStatus ne PGRES_TUPLES_OK)
393     {
394     print STDERR $conn->errorMessage unless ($quiet);
395     return(-1);
396     }
397     my $maxid = '';
398     my %active = ();
399     while (my @row = $result->fetchrow)
400     {
401     $maxid = $row[0] if $maxid eq '';
402     last if $row[0] > $maxid;
403     my @ids = split(/[ ]+,[ ]+/, $row[1]);
404 dpavlin 1.3 foreach my $aid (@ids)
405 dpavlin 1.2 {
406     $active{$aid} = 1 unless exists $active{$aid};
407     }
408     }
409     if ($maxid eq '')
410     {
411     print STDERR "No Sync IDs\n" unless ($quiet);
412     return(0);
413     }
414     my $alist = join(',', keys %active);
415     my $sinfo = "logid < $maxid";
416     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
417 dpavlin 1.5 #if (ref($onlytables) eq 'HASH') {
418     # foreach my $onlytable (keys %{$onlytables}) {
419     # $sinfo
420     # }
421     #}
422 dpavlin 1.2 $sql = "delete from _RSERV_LOG_ where " .
423     "logtime < now() - '$howold second'::interval AND $sinfo";
424    
425     printf "$sql\n" if $debug;
426    
427     $result = $conn->exec($sql);
428     if ($result->resultStatus ne PGRES_COMMAND_OK)
429     {
430     print STDERR $conn->errorMessage unless ($quiet);
431     $conn->exec("ROLLBACK");
432     return(-1);
433     }
434     $maxid = $result->cmdTuples;
435    
436     $result = $conn->exec("COMMIT");
437     if ($result->resultStatus ne PGRES_COMMAND_OK)
438     {
439     print STDERR $conn->errorMessage unless ($quiet);
440     $conn->exec("ROLLBACK");
441     return(-1);
442     }
443    
444     return($maxid);
445     }
446 dpavlin 1.1
447 dpavlin 1.2 sub ApplySnapshot
448     {
449 dpavlin 1.16 my ($sconn, $inpf, $multiplemaster, $onlytables) = @_; # (@_[0], @_[1]);
450    
451     my $serverId;
452    
453 dpavlin 1.6 my $result = $sconn->exec("BEGIN");
454 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
455 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
456     $sconn->exec("ROLLBACK");
457 dpavlin 1.2 return(-1);
458     }
459    
460 dpavlin 1.6 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
461 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
462 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
463     $sconn->exec("ROLLBACK");
464 dpavlin 1.2 return(-1);
465     }
466    
467     # MAP name --> oid, keyname, keynum
468 dpavlin 1.16 my $sql = qq{
469     select pgc.oid, pgc.relname, pga.attname, rt.key
470     from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
471     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
472     AND pga.attnum = rt.key
473     };
474 dpavlin 1.2
475 dpavlin 1.6 $result = $sconn->exec($sql);
476 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
477 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
478     $sconn->exec("ROLLBACK");
479 dpavlin 1.2 return(-1);
480     }
481     %Stables = ();
482 dpavlin 1.16 while (my @row = $result->fetchrow) {
483 dpavlin 1.2 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
484 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
485     next unless (exists $onlytables->{$row[1]});
486     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
487     }
488 dpavlin 1.2 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
489     }
490    
491 dpavlin 1.12 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
492    
493 dpavlin 1.16 # save keys from snapshot because we want to update _rserv_log_ with
494     # correct source server later...
495     my @keys;
496    
497 dpavlin 1.2 my $ok = 0;
498 dpavlin 1.9 my $syncid = -1;
499 dpavlin 1.16 while(<$inpf>) {
500 dpavlin 1.2 $_ =~ s/\n//;
501     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
502 dpavlin 1.11 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
503 dpavlin 1.16 if ($cmt ne '--') {
504 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
505 dpavlin 1.6 $sconn->exec("ROLLBACK");
506 dpavlin 1.2 return(-2);
507     }
508 dpavlin 1.16 if ($cmd eq 'DELETE') {
509     if ($syncid == -1) {
510 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
511 dpavlin 1.6 $sconn->exec("ROLLBACK");
512 dpavlin 1.2 return(-2);
513     }
514 dpavlin 1.6 $result = DoDelete($sconn, $inpf, $prm);
515 dpavlin 1.16 if ($result) {
516 dpavlin 1.6 $sconn->exec("ROLLBACK");
517 dpavlin 1.2 return($result);
518     }
519 dpavlin 1.16 } elsif ($cmd eq 'INSERT') {
520     if ($syncid == -1) {
521 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
522 dpavlin 1.6 $sconn->exec("ROLLBACK");
523 dpavlin 1.2 return(-2);
524     }
525 dpavlin 1.6 $result = DoInsert($sconn, $inpf, $prm);
526 dpavlin 1.16 if ($result) {
527 dpavlin 1.6 $sconn->exec("ROLLBACK");
528 dpavlin 1.2 return($result);
529     }
530 dpavlin 1.16 } elsif ($cmd eq 'UPDATE') {
531     if ($syncid == -1) {
532 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
533 dpavlin 1.6 $sconn->exec("ROLLBACK");
534 dpavlin 1.2 return(-2);
535     }
536 dpavlin 1.6 $result = DoUpdate($sconn, $inpf, $prm);
537 dpavlin 1.16 if ($result) {
538 dpavlin 1.6 $sconn->exec("ROLLBACK");
539 dpavlin 1.2 return($result);
540     }
541 dpavlin 1.16 } elsif ($cmd eq 'SYNCID') {
542     if ($syncid != -1) {
543 dpavlin 1.2 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
544 dpavlin 1.6 $sconn->exec("ROLLBACK");
545 dpavlin 1.2 return(-2);
546     }
547 dpavlin 1.16 if ($prm !~ /^\d+$/) {
548 dpavlin 1.2 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
549 dpavlin 1.6 $sconn->exec("ROLLBACK");
550 dpavlin 1.2 return(-2);
551     }
552     $syncid = $prm;
553    
554     printf STDERR "Sync ID $syncid\n" unless ($quiet);
555    
556 dpavlin 1.6 $result = $sconn->exec("select syncid, synctime from " .
557 dpavlin 1.2 "_RSERV_SLAVE_SYNC_ where syncid = " .
558     "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
559 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
560 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
561     $sconn->exec("ROLLBACK");
562 dpavlin 1.1 return(-1);
563 dpavlin 1.2 }
564     my @row = $result->fetchrow;
565 dpavlin 1.16 if (! defined $row[0]) {
566 dpavlin 1.6 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
567 dpavlin 1.2 "(syncid, synctime) values ($syncid, now())");
568 dpavlin 1.16 } elsif ($row[0] >= $prm) {
569 dpavlin 1.2 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
570 dpavlin 1.6 $sconn->exec("ROLLBACK");
571 dpavlin 1.2 return(0);
572 dpavlin 1.16 } else {
573 dpavlin 1.6 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
574 dpavlin 1.2 " set syncid = $syncid, synctime = now()");
575     }
576 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
577 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
578     $sconn->exec("ROLLBACK");
579 dpavlin 1.1 return(-1);
580 dpavlin 1.2 }
581 dpavlin 1.16 } elsif ($cmd eq 'OK') {
582 dpavlin 1.2 $ok = 1;
583     last;
584 dpavlin 1.16 } elsif ($cmd eq 'ERROR') {
585 dpavlin 1.2 printf STDERR "ERROR signaled\n" unless ($quiet);
586 dpavlin 1.6 $sconn->exec("ROLLBACK");
587 dpavlin 1.2 return(-2);
588 dpavlin 1.16 } elsif ($cmd eq 'SERVER') {
589     if ($prm !~ /^\d+$/) {
590     printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
591     $sconn->exec("ROLLBACK");
592     return(-2);
593     }
594     $serverId = $prm;
595     print STDERR "Server ID $serverId\n" unless ($quiet);
596     } elsif ($cmd eq 'KEYS') {
597     if ($prm !~ /^\d+$/) {
598     printf STDERR "Invalid numer of keys $prm\n" unless ($quiet);
599     $sconn->exec("ROLLBACK");
600     return(-2);
601     }
602     my $keys = <$inpf>;
603     chomp($keys);
604     if ($multiplemaster) {
605     $result = $sconn->exec("update _rserv_log_ set server=$serverId where key in ($keys)");
606     if ($result->resultStatus ne PGRES_COMMAND_OK || $result->ntuples != $prm) {
607     print STDERR "FATAL: Cannot update source server in _rserv_log_.\n";
608     print STDERR "expected $prm updates, got ",$result->ntuples,"\n" if ($result->ntuples != $prm);
609     print STDERR $sconn->errorMessage unless ($quiet);
610     $sconn->exec("ROLLBACK");
611     return(-1);
612     }
613     }
614     } else {
615 dpavlin 1.2 printf STDERR "Unknown command $cmd\n" unless ($quiet);
616 dpavlin 1.6 $sconn->exec("ROLLBACK");
617 dpavlin 1.2 return(-2);
618     }
619     }
620    
621     if (! $ok)
622     {
623     printf STDERR "No OK flag in input\n" unless ($quiet);
624 dpavlin 1.6 $sconn->exec("ROLLBACK");
625 dpavlin 1.2 return(-2);
626     }
627    
628 dpavlin 1.6 $result = $sconn->exec("COMMIT");
629 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
630     {
631 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
632     $sconn->exec("ROLLBACK");
633 dpavlin 1.2 return(-1);
634     }
635    
636     return(1);
637 dpavlin 1.1 }
638    
639     sub DoDelete
640     {
641 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
642 dpavlin 1.2
643     # only delete tables that the slave wants
644     if (! defined($Stables{$tabname})) {
645     print "Not configured to delete rows from table $tabname\n" unless $quiet;
646     while (<$inpf>) {
647 dpavlin 1.3 my $istring = $_;
648 dpavlin 1.2 $istring =~ s/\n//;
649     last if ($istring eq '\.');
650     }
651     return(0);
652     }
653 dpavlin 1.1
654 dpavlin 1.2 my $ok = 0;
655     while(<$inpf>)
656     {
657     if ($_ !~ /\n$/)
658     {
659     printf STDERR "Invalid format\n" unless ($quiet);
660     return(-2);
661     }
662     my $key = $_;
663     $key =~ s/\n//;
664     if ($key eq '\.')
665 dpavlin 1.1 {
666 dpavlin 1.2 $ok = 1;
667     last;
668 dpavlin 1.1 }
669 dpavlin 1.2
670     my $sql = "delete from \"$tabname\" where ".
671     "\"$Stables{$tabname}->[1]\" = '$key'";
672    
673     printf "$sql\n" if $debug;
674    
675 dpavlin 1.6 my $result = $sconn->exec($sql);
676 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
677 dpavlin 1.1 {
678 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
679 dpavlin 1.2 return(-1);
680 dpavlin 1.1 }
681 dpavlin 1.2 }
682    
683     if (! $ok)
684     {
685     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
686     return(-2);
687     }
688    
689     return(0);
690 dpavlin 1.1 }
691    
692    
693     sub DoUpdate
694     {
695 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
696 dpavlin 1.1
697 dpavlin 1.2 # only update the tables that the slave wants
698     if (! defined($Stables{$tabname})) {
699     print "Not configured to update rows from table $tabname\n" unless $quiet;
700     while (<$inpf>) {
701 dpavlin 1.3 my $istring = $_;
702 dpavlin 1.2 $istring =~ s/\n//;
703     last if ($istring eq '\.');
704     }
705     return(0);
706     }
707 dpavlin 1.1
708 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
709    
710     my @CopyBuf = ();
711     my $CBufLen = 0;
712     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
713    
714     my $sql = "select attnum, attname from pg_attribute" .
715     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
716    
717 dpavlin 1.6 my $result = $sconn->exec($sql);
718 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
719     {
720 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
721 dpavlin 1.2 return(-1);
722     }
723    
724     my @anames = ();
725 dpavlin 1.3 while (my @row = $result->fetchrow)
726 dpavlin 1.2 {
727     $anames[$row[0]] = $row[1];
728     }
729    
730     my $istring;
731     my $ok = 0;
732     while(<$inpf>)
733     {
734     if ($_ !~ /\n$/)
735     {
736     printf STDERR "Invalid format\n" unless ($quiet);
737     return(-2);
738     }
739     $istring = $_;
740     $istring =~ s/\n//;
741     if ($istring eq '\.')
742     {
743     $ok = 1;
744     last;
745     }
746     my @vals = split(/ /, $istring);
747     if ($oidkey)
748     {
749     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
750     {
751     printf STDERR "Invalid OID\n" unless ($quiet);
752     return(-2);
753     }
754     $oidkey = $vals[0];
755     }
756     else
757 dpavlin 1.1 {
758 dpavlin 1.2 unshift @vals, '';
759     }
760    
761     $sql = "update \"$tabname\" set ";
762     my $ocnt = 0;
763     for (my $i = 1; $i <= $#anames; $i++)
764     {
765     if ($vals[$i] eq '\N')
766     {
767     if ($i == $Stables{$tabname}->[2])
768     {
769     printf STDERR "NULL key\n" unless ($quiet);
770     return(-2);
771     }
772     $vals[$i] = 'null';
773     }
774     else
775     {
776     $vals[$i] = "'" . $vals[$i] . "'";
777     next if $i == $Stables{$tabname}->[2];
778     }
779     $ocnt++;
780     $sql .= ', ' if $ocnt > 1;
781     $sql .= "\"$anames[$i]\" = $vals[$i]";
782 dpavlin 1.1 }
783 dpavlin 1.2 if ($oidkey)
784 dpavlin 1.1 {
785 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
786 dpavlin 1.1 }
787 dpavlin 1.2 else
788 dpavlin 1.1 {
789 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
790     $vals[$Stables{$tabname}->[2]];
791 dpavlin 1.1 }
792 dpavlin 1.2
793     printf "$sql\n" if $debug;
794    
795 dpavlin 1.6 $result = $sconn->exec($sql);
796 dpavlin 1.2
797     if ($result->resultStatus ne PGRES_COMMAND_OK)
798 dpavlin 1.1 {
799 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
800 dpavlin 1.2 return(-1);
801 dpavlin 1.1 }
802 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
803    
804     if ($result->cmdTuples > 1)
805     {
806     printf STDERR "Duplicate keys\n" unless ($quiet);
807     return(-2);
808     }
809    
810     # no key - copy
811     push @CopyBuf, "$istring\n";
812     $CBufLen += length($istring);
813    
814     if ($CBufLen >= $CBufMax)
815 dpavlin 1.1 {
816 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
817 dpavlin 1.2 return($result) if $result;
818     @CopyBuf = ();
819     $CBufLen = 0;
820     }
821     }
822    
823     if (! $ok)
824     {
825     printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
826     return(-2);
827     }
828    
829     if ($CBufLen)
830     {
831     print "@CopyBuf\n" if $debug;
832 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
833 dpavlin 1.2 return($result) if $result;
834     }
835 dpavlin 1.1
836 dpavlin 1.2 return(0);
837 dpavlin 1.1 }
838    
839 dpavlin 1.2 sub DoInsert
840 dpavlin 1.1 {
841 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
842 dpavlin 1.1
843 dpavlin 1.2 # only insert rows into tables that the slave wants
844     if (! defined($Stables{$tabname})) {
845     print "Not configured to insert rows from table $tabname\n" unless $quiet;
846     while (<$inpf>) {
847 dpavlin 1.3 my $istring = $_;
848 dpavlin 1.2 $istring =~ s/\n//;
849     last if ($istring eq '\.');
850 dpavlin 1.1 }
851 dpavlin 1.2 return(0);
852     }
853 dpavlin 1.1
854 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
855    
856     my @CopyBuf = ();
857     my $CBufLen = 0;
858     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
859    
860     my $istring;
861     my $ok = 0;
862     while(<$inpf>)
863     {
864     if ($_ !~ /\n$/)
865     {
866     printf STDERR "Invalid format\n" unless ($quiet);
867     return(-2);
868     }
869     $istring = $_;
870     $istring =~ s/\n//;
871     if ($istring eq '\.')
872     {
873     $ok = 1;
874     last;
875     }
876    
877     # no key - copy
878     push @CopyBuf, "$istring\n";
879     $CBufLen += length($istring);
880    
881     if ($CBufLen >= $CBufMax)
882 dpavlin 1.1 {
883 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
884 dpavlin 1.2 return($result) if $result;
885     @CopyBuf = ();
886     $CBufLen = 0;
887     }
888     }
889    
890     if (! $ok)
891     {
892     printf STDERR "No end of input in INSERT section\n" unless ($quiet);
893     return(-2);
894     }
895    
896     if ($CBufLen)
897     {
898     print "@CopyBuf\n" if $debug;
899 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
900 dpavlin 1.2 return($result) if $result;
901     }
902    
903     return(0);
904     }
905 dpavlin 1.1
906    
907 dpavlin 1.2 sub DoCopy
908     {
909 dpavlin 1.6 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
910 dpavlin 1.2
911     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
912     "FROM STDIN";
913 dpavlin 1.6 my $result = $sconn->exec($sql);
914 dpavlin 1.2 if ($result->resultStatus ne PGRES_COPY_IN)
915     {
916 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
917 dpavlin 1.2 return(-1);
918     }
919    
920 dpavlin 1.3 foreach my $str (@{$CBuf})
921 dpavlin 1.2 {
922 dpavlin 1.6 $sconn->putline($str);
923 dpavlin 1.2 }
924    
925 dpavlin 1.6 $sconn->putline("\\.\n");
926 dpavlin 1.2
927 dpavlin 1.6 if ($sconn->endcopy)
928 dpavlin 1.2 {
929 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
930 dpavlin 1.2 return(-1);
931     }
932    
933     return(0);
934 dpavlin 1.1 }
935    
936    
937     #
938     # Returns last SyncID applied on Slave
939     #
940     sub GetSyncID
941     {
942 dpavlin 1.6 my ($sconn) = @_; # (@_[0]);
943 dpavlin 1.2
944 dpavlin 1.6 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
945 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
946     {
947 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
948 dpavlin 1.2 return(-1);
949     }
950     my @row = $result->fetchrow;
951     return(undef) unless defined $row[0]; # null
952     return($row[0]);
953 dpavlin 1.1 }
954    
955     #
956     # Updates _RSERV_SYNC_ on Master with Slave SyncID
957     #
958     sub SyncSyncID
959     {
960 dpavlin 1.8 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
961 dpavlin 1.2
962 dpavlin 1.6 my $result = $mconn->exec("BEGIN");
963 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
964     {
965 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
966     $mconn->exec("ROLLBACK");
967 dpavlin 1.2 return(-1);
968     }
969    
970 dpavlin 1.6 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
971 dpavlin 1.8 " where server = $sserver AND syncid = $syncid" .
972 dpavlin 1.2 " for update");
973     if ($result->resultStatus ne PGRES_TUPLES_OK)
974     {
975 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
976     $mconn->exec("ROLLBACK");
977 dpavlin 1.2 return(-1);
978     }
979     my @row = $result->fetchrow;
980     if (! defined $row[0])
981     {
982 dpavlin 1.8 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
983 dpavlin 1.6 $mconn->exec("ROLLBACK");
984 dpavlin 1.2 return(0);
985     }
986     if ($row[1] > 0)
987     {
988     printf STDERR "SyncID $syncid for server ".
989 dpavlin 1.8 "$sserver already updated\n" unless ($quiet);
990 dpavlin 1.6 $mconn->exec("ROLLBACK");
991 dpavlin 1.2 return(0);
992     }
993 dpavlin 1.6 $result = $mconn->exec("update _RSERV_SYNC_" .
994 dpavlin 1.2 " set synctime = now(), status = 1" .
995 dpavlin 1.8 " where server = $sserver AND syncid = $syncid");
996 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
997     {
998 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
999     $mconn->exec("ROLLBACK");
1000 dpavlin 1.2 return(-1);
1001     }
1002 dpavlin 1.6 $result = $mconn->exec("delete from _RSERV_SYNC_" .
1003 dpavlin 1.8 " where server = $sserver AND syncid < $syncid");
1004 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
1005     {
1006 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
1007     $mconn->exec("ROLLBACK");
1008 dpavlin 1.2 return(-1);
1009     }
1010    
1011 dpavlin 1.6 $result = $mconn->exec("COMMIT");
1012 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
1013     {
1014 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
1015     $mconn->exec("ROLLBACK");
1016 dpavlin 1.2 return(-1);
1017     }
1018    
1019     return(1);
1020 dpavlin 1.1 }
1021    
1022 dpavlin 1.11 # stuff moved from perl scripts for better re-use
1023    
1024     sub Rollback {
1025     my $conn = shift @_;
1026    
1027 dpavlin 1.15 print STDERR $conn->errorMessage unless ($quiet);
1028 dpavlin 1.11 $conn->exec("ROLLBACK");
1029     }
1030    
1031     sub RollbackAndQuit {
1032     my $conn = shift @_;
1033    
1034     Rollback($conn);
1035     exit (-1);
1036     }
1037    
1038     sub Connect {
1039     my $info = shift @_;
1040    
1041     print("Connecting to $info\n") if ($debug || $verbose);
1042     my $conn = Pg::connectdb($info);
1043     if ($conn->status != PGRES_CONNECTION_OK) {
1044     print STDERR "Failed opening $info\n";
1045     exit 1;
1046     }
1047     return $conn;
1048     }
1049    
1050     sub Exec {
1051 dpavlin 1.15 my $conn = shift || die "Exec needs connection!";
1052     my $sql = shift || die "Exec needs SQL statement!";
1053     # used to return error code if no tuples are retured
1054     my $return_code = shift;
1055    
1056     if ($debug) {
1057     # re-format SQL in one line (for nicer output)
1058     $sql =~ s/[\s\n\r]+/ /gs;
1059     print STDERR "Exec: $sql\n";
1060     }
1061 dpavlin 1.11 my $result = $conn->exec($sql);
1062 dpavlin 1.15 if ($result->resultStatus eq PGRES_COMMAND_OK) {
1063     return;
1064     } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
1065     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1066     return $result;
1067     } else {
1068     if (defined($return_code)) {
1069     print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
1070     return($return_code);
1071     } else {
1072     RollbackAndQuit($conn)
1073     }
1074     }
1075 dpavlin 1.11 }
1076    
1077     sub Exec2 {
1078     my $mconn = shift @_;
1079     my $sconn = shift @_;
1080     my $sql = shift @_;
1081    
1082     my $result = $mconn->exec($sql);
1083     RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1084     $result = $sconn->exec($sql);
1085     RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1086 dpavlin 1.15 # XXX TODO: return results?!
1087 dpavlin 1.12 }
1088    
1089     sub MkInfo {
1090     my $db = shift || die "need database name!";
1091     my $host = shift;
1092     my $port = shift;
1093     my $user = shift;
1094     my $password = shift;
1095    
1096     my $info = "dbname=$db";
1097     $info = "$info host=$host" if (defined($host));
1098     $info = "$info port=$port" if (defined($port));
1099     $info = "$info user=$user" if (defined($user));
1100     $info = "$info password=$password" if (defined($password));
1101    
1102     return $info;
1103 dpavlin 1.11 }
1104    
1105 dpavlin 1.1 1;

  ViewVC Help
Powered by ViewVC 1.1.26