/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.16 - (show annotations)
Sun Nov 2 15:43:08 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.15: +102 -73 lines
added SERVER (id of source server) and KEYS (which keys are transfered in
this snapshot) in snapshot format

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11 $debug $quiet $verbose
12 );
13 @EXPORT_OK = qw();
14 use strict;
15 use Pg;
16
17 my $debug = 0;
18 my $quiet = 1;
19 my $verbose = 0;
20
21 $debug = 1;
22 $quiet = 0;
23 $verbose = 1;
24
25 my %Mtables = ();
26 my %Stables = ();
27
28 sub GetServerId
29 {
30 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31
32 print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33
34 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 " host='$Host' AND dbase='$DB'");
36
37 if ($result->resultStatus ne PGRES_TUPLES_OK)
38 {
39 print STDERR $mconn->errorMessage unless ($quiet);
40 return(-1);
41 }
42
43 if ($result->cmdTuples && $result->cmdTuples > 1)
44 {
45 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 return(-2);
47 }
48
49 my @row = $result->fetchrow;
50
51 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52
53 return $row[0];
54 }
55
56 sub PrepareSnapshot
57 {
58 my ($mconn, $sconn, $outf, $mserver, $sserver, $multiplemaster, $onlytables) = @_;
59
60 if ($mserver == $sserver) {
61 print STDERR "master and slave numbers are same [$mserver] !\n";
62 return(-1);
63 }
64
65 print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66
67 # dump master server ID into snapshot file (to prevent replication
68 # of colums from master back to slave)
69 print $outf "-- SERVER $mserver\n";
70
71 # first, we must know for wich tables the slave subscribed
72 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73 return (-1) if ($result == -1);
74
75 my @row;
76 while (@row = $result->fetchrow) {
77 $Stables{$row[0]} = 1;
78 }
79
80 print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81
82 Exec($mconn,"BEGIN");
83 Exec($mconn,"set transaction isolation level serializable");
84
85 # MAP oid --> tabname, keyname, key_type
86 my $sql = qq{
87 select pgc.oid, pgc.relname, pga.attname, pgt.typname
88 from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89 pg_type pgt
90 where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91 AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92 };
93 $result = Exec($mconn,$sql);
94
95 while (@row = $result->fetchrow)
96 {
97 printf "$row[0], $row[1], $row[2]\n" if ($debug);
98 if (ref($onlytables) eq 'HASH') {
99 next unless (exists $onlytables->{$row[1]});
100 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101 }
102 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103 }
104
105 print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106 if (! %Mtables) {
107 print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108 RollbackAndQuit($mconn);
109 }
110
111 # Read last succeeded sync
112 $sql = qq{
113 select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114 where server = $sserver AND syncid =
115 (select max(syncid) from _RSERV_SYNC_
116 where server = $sserver AND status > 0)
117 };
118
119 $result = Exec($mconn,$sql);
120
121 my @lastsync = $result->fetchrow;
122 print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123
124 # exclude data which originated from master server
125 my $sel_server = " and l.server = $mserver ";
126
127 my $sinfo = "";
128 if (@lastsync && $lastsync[3] ne '') # sync info
129 {
130 $sinfo = "and (l.logid >= $lastsync[3]";
131 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132 $sinfo .= ")";
133 }
134
135 my @keys; # keys in this snapshot
136
137 my $havedeal = 0;
138
139 # DELETED rows
140 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
141 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
142
143 printf "DELETED: $sql\n" if $debug;
144
145 $result = $mconn->exec($sql);
146 if ($result->resultStatus ne PGRES_TUPLES_OK)
147 {
148 print STDERR $mconn->errorMessage unless ($quiet);
149 $mconn->exec("ROLLBACK");
150 return(-1);
151 }
152
153 my $lastoid = -1;
154 while (@row = $result->fetchrow)
155 {
156 next unless exists $Mtables{$row[0]};
157 next unless exists $Stables{$Mtables{$row[0]}[0]};
158
159 if ($lastoid != $row[0])
160 {
161 if ($lastoid == -1)
162 {
163 my $syncid = GetSYNCID($mconn, $outf);
164 return($syncid) if $syncid < 0;
165 $havedeal = 1;
166 }
167 else
168 {
169 printf $outf "\\.\n";
170 }
171 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
172 $lastoid = $row[0];
173 }
174 if (! defined $row[1])
175 {
176 print STDERR "NULL key\n" unless ($quiet);
177 $mconn->exec("ROLLBACK");
178 return(-2);
179 }
180 printf $outf "%s\n", OutputValue($row[1]);
181 push @keys,OutputKey($row[2]);
182 }
183 printf $outf "\\.\n" if ($lastoid != -1);
184
185 # UPDATED rows
186
187 my ($taboid, $tabname, $tabkey);
188 foreach $taboid (keys %Mtables)
189 {
190 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
191 next unless exists $Stables{$tabname};
192
193 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
194
195 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\",$oidkey \"_$tabname\".* FROM \"$tabname\" ".
196 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
197 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
198 $sel_server;
199
200 printf "UPDATED: $sql\n" if $debug;
201
202 $result = $mconn->exec($sql);
203 if ($result->resultStatus ne PGRES_TUPLES_OK)
204 {
205 printf $outf "-- ERROR\n" if $havedeal;
206 print STDERR $mconn->errorMessage unless ($quiet);
207 $mconn->exec("ROLLBACK");
208 return(-1);
209 }
210 next if $result->ntuples <= 0;
211 if (! $havedeal)
212 {
213 my $syncid = GetSYNCID($mconn, $outf);
214 return($syncid) if $syncid < 0;
215 $havedeal = 1;
216 }
217 printf $outf "-- UPDATE $tabname\n";
218 printf "-- UPDATE $tabname\n" if $debug;
219 while (@row = $result->fetchrow)
220 {
221 push @keys,OutputKey(shift @row);
222 for (my $i = 0; $i <= $#row; $i++)
223 {
224 printf $outf " " if $i;
225 printf " " if $i && $debug;
226 printf $outf "%s", OutputValue($row[$i]);
227 printf "%s", OutputValue($row[$i]) if $debug;;
228 }
229 printf $outf "\n";
230 printf "\n" if $debug;
231 }
232 printf $outf "\\.\n";
233 printf "\\.\n" if $debug;;
234 }
235
236 # INSERTED rows
237
238 foreach $taboid (keys %Mtables)
239 {
240 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
241 next unless exists $Stables{$tabname};
242
243 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
244
245 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\", $oidkey \"_$tabname\".* FROM \"$tabname\" ".
246 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
247 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
248 $sel_server;
249
250 printf "INSERTED: $sql\n" if $debug;
251
252 $result = $mconn->exec($sql);
253 if ($result->resultStatus ne PGRES_TUPLES_OK)
254 {
255 printf $outf "-- ERROR\n" if $havedeal;
256 print STDERR $mconn->errorMessage unless ($quiet);
257 $mconn->exec("ROLLBACK");
258 return(-1);
259 }
260 next if $result->ntuples <= 0;
261 if (! $havedeal)
262 {
263 my $syncid = GetSYNCID($mconn, $outf);
264 return($syncid) if $syncid < 0;
265 $havedeal = 1;
266 }
267 printf $outf "-- INSERT $tabname\n";
268 printf "-- INSERT $tabname\n" if $debug;
269 while (@row = $result->fetchrow)
270 {
271 push @keys,OutputKey(shift @row);
272 for (my $i = 0; $i <= $#row; $i++)
273 {
274 printf $outf " " if $i;
275 printf " " if $i && $debug;
276 printf $outf "%s", OutputValue($row[$i]);
277 printf "%s", OutputValue($row[$i]) if $debug;;
278 }
279 printf $outf "\n";
280 printf "\n" if $debug;
281 }
282 printf $outf "\\.\n";
283 printf "\\.\n" if $debug;;
284 }
285
286
287 unless ($havedeal)
288 {
289 $mconn->exec("ROLLBACK");
290 return(0);
291 }
292
293 # Remember this snapshot info
294 $result = $mconn->exec("select _rserv_sync_($sserver)");
295 if ($result->resultStatus ne PGRES_TUPLES_OK)
296 {
297 printf $outf "-- ERROR\n";
298 print STDERR $mconn->errorMessage unless ($quiet);
299 $mconn->exec("ROLLBACK");
300 return(-1);
301 }
302
303 $result = $mconn->exec("COMMIT");
304 if ($result->resultStatus ne PGRES_COMMAND_OK)
305 {
306 printf $outf "-- ERROR\n";
307 print STDERR $mconn->errorMessage unless ($quiet);
308 $mconn->exec("ROLLBACK");
309 return(-1);
310 }
311
312 if ($multiplemaster) {
313 # save keys
314 my $key_out = "-- KEYS ".($#keys+1)."\n".join(",",@keys)."\n";
315 print $outf $key_out;
316 print $key_out if ($debug);
317 }
318
319 printf $outf "-- OK\n";
320 printf "-- OK\n" if $debug;
321
322 return(1);
323
324 }
325
326 sub OutputValue
327 {
328 my ($val) = @_; # @_[0];
329
330 return("\\N") unless defined $val;
331
332 $val =~ s/\\/\\\\/g;
333 $val =~ s/ /\\011/g;
334 $val =~ s/\n/\\012/g;
335 $val =~ s/\'/\\047/g;
336
337 return($val);
338 }
339
340 sub OutputKey {
341 my $val = shift;
342
343 return if (! defined($val));
344
345 if ($val =~ m/^\d+$/) {
346 return $val;
347 } else {
348 return "'$val'";
349 }
350 }
351
352 # Get syncid for new snapshot
353 sub GetSYNCID
354 {
355 my ($conn, $outf) = @_; # (@_[0], @_[1]);
356
357 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
358 if ($result->resultStatus ne PGRES_TUPLES_OK)
359 {
360 print STDERR $conn->errorMessage unless ($quiet);
361 $conn->exec("ROLLBACK");
362 return(-1);
363 }
364
365 my @row = $result->fetchrow;
366
367 printf $outf "-- SYNCID $row[0]\n";
368 printf "-- SYNCID $row[0]\n" if $debug;
369 return($row[0]);
370 }
371
372
373 sub CleanLog
374 {
375 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
376
377 my $result = $conn->exec("BEGIN");
378 if ($result->resultStatus ne PGRES_COMMAND_OK)
379 {
380 print STDERR $conn->errorMessage unless ($quiet);
381 $conn->exec("ROLLBACK");
382 return(-1);
383 }
384
385 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
386 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
387 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
388
389 printf "$sql\n" if $debug;
390
391 $result = $conn->exec($sql);
392 if ($result->resultStatus ne PGRES_TUPLES_OK)
393 {
394 print STDERR $conn->errorMessage unless ($quiet);
395 return(-1);
396 }
397 my $maxid = '';
398 my %active = ();
399 while (my @row = $result->fetchrow)
400 {
401 $maxid = $row[0] if $maxid eq '';
402 last if $row[0] > $maxid;
403 my @ids = split(/[ ]+,[ ]+/, $row[1]);
404 foreach my $aid (@ids)
405 {
406 $active{$aid} = 1 unless exists $active{$aid};
407 }
408 }
409 if ($maxid eq '')
410 {
411 print STDERR "No Sync IDs\n" unless ($quiet);
412 return(0);
413 }
414 my $alist = join(',', keys %active);
415 my $sinfo = "logid < $maxid";
416 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
417 #if (ref($onlytables) eq 'HASH') {
418 # foreach my $onlytable (keys %{$onlytables}) {
419 # $sinfo
420 # }
421 #}
422 $sql = "delete from _RSERV_LOG_ where " .
423 "logtime < now() - '$howold second'::interval AND $sinfo";
424
425 printf "$sql\n" if $debug;
426
427 $result = $conn->exec($sql);
428 if ($result->resultStatus ne PGRES_COMMAND_OK)
429 {
430 print STDERR $conn->errorMessage unless ($quiet);
431 $conn->exec("ROLLBACK");
432 return(-1);
433 }
434 $maxid = $result->cmdTuples;
435
436 $result = $conn->exec("COMMIT");
437 if ($result->resultStatus ne PGRES_COMMAND_OK)
438 {
439 print STDERR $conn->errorMessage unless ($quiet);
440 $conn->exec("ROLLBACK");
441 return(-1);
442 }
443
444 return($maxid);
445 }
446
447 sub ApplySnapshot
448 {
449 my ($sconn, $inpf, $multiplemaster, $onlytables) = @_; # (@_[0], @_[1]);
450
451 my $serverId;
452
453 my $result = $sconn->exec("BEGIN");
454 if ($result->resultStatus ne PGRES_COMMAND_OK) {
455 print STDERR $sconn->errorMessage unless ($quiet);
456 $sconn->exec("ROLLBACK");
457 return(-1);
458 }
459
460 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
461 if ($result->resultStatus ne PGRES_COMMAND_OK) {
462 print STDERR $sconn->errorMessage unless ($quiet);
463 $sconn->exec("ROLLBACK");
464 return(-1);
465 }
466
467 # MAP name --> oid, keyname, keynum
468 my $sql = qq{
469 select pgc.oid, pgc.relname, pga.attname, rt.key
470 from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
471 where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
472 AND pga.attnum = rt.key
473 };
474
475 $result = $sconn->exec($sql);
476 if ($result->resultStatus ne PGRES_TUPLES_OK) {
477 print STDERR $sconn->errorMessage unless ($quiet);
478 $sconn->exec("ROLLBACK");
479 return(-1);
480 }
481 %Stables = ();
482 while (my @row = $result->fetchrow) {
483 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
484 if (ref($onlytables) eq 'HASH') {
485 next unless (exists $onlytables->{$row[1]});
486 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
487 }
488 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
489 }
490
491 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
492
493 # save keys from snapshot because we want to update _rserv_log_ with
494 # correct source server later...
495 my @keys;
496
497 my $ok = 0;
498 my $syncid = -1;
499 while(<$inpf>) {
500 $_ =~ s/\n//;
501 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
502 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
503 if ($cmt ne '--') {
504 printf STDERR "Invalid format\n" unless ($quiet);
505 $sconn->exec("ROLLBACK");
506 return(-2);
507 }
508 if ($cmd eq 'DELETE') {
509 if ($syncid == -1) {
510 printf STDERR "Sync ID unspecified\n" unless ($quiet);
511 $sconn->exec("ROLLBACK");
512 return(-2);
513 }
514 $result = DoDelete($sconn, $inpf, $prm);
515 if ($result) {
516 $sconn->exec("ROLLBACK");
517 return($result);
518 }
519 } elsif ($cmd eq 'INSERT') {
520 if ($syncid == -1) {
521 printf STDERR "Sync ID unspecified\n" unless ($quiet);
522 $sconn->exec("ROLLBACK");
523 return(-2);
524 }
525 $result = DoInsert($sconn, $inpf, $prm);
526 if ($result) {
527 $sconn->exec("ROLLBACK");
528 return($result);
529 }
530 } elsif ($cmd eq 'UPDATE') {
531 if ($syncid == -1) {
532 printf STDERR "Sync ID unspecified\n" unless ($quiet);
533 $sconn->exec("ROLLBACK");
534 return(-2);
535 }
536 $result = DoUpdate($sconn, $inpf, $prm);
537 if ($result) {
538 $sconn->exec("ROLLBACK");
539 return($result);
540 }
541 } elsif ($cmd eq 'SYNCID') {
542 if ($syncid != -1) {
543 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
544 $sconn->exec("ROLLBACK");
545 return(-2);
546 }
547 if ($prm !~ /^\d+$/) {
548 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
549 $sconn->exec("ROLLBACK");
550 return(-2);
551 }
552 $syncid = $prm;
553
554 printf STDERR "Sync ID $syncid\n" unless ($quiet);
555
556 $result = $sconn->exec("select syncid, synctime from " .
557 "_RSERV_SLAVE_SYNC_ where syncid = " .
558 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
559 if ($result->resultStatus ne PGRES_TUPLES_OK) {
560 print STDERR $sconn->errorMessage unless ($quiet);
561 $sconn->exec("ROLLBACK");
562 return(-1);
563 }
564 my @row = $result->fetchrow;
565 if (! defined $row[0]) {
566 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
567 "(syncid, synctime) values ($syncid, now())");
568 } elsif ($row[0] >= $prm) {
569 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
570 $sconn->exec("ROLLBACK");
571 return(0);
572 } else {
573 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
574 " set syncid = $syncid, synctime = now()");
575 }
576 if ($result->resultStatus ne PGRES_COMMAND_OK) {
577 print STDERR $sconn->errorMessage unless ($quiet);
578 $sconn->exec("ROLLBACK");
579 return(-1);
580 }
581 } elsif ($cmd eq 'OK') {
582 $ok = 1;
583 last;
584 } elsif ($cmd eq 'ERROR') {
585 printf STDERR "ERROR signaled\n" unless ($quiet);
586 $sconn->exec("ROLLBACK");
587 return(-2);
588 } elsif ($cmd eq 'SERVER') {
589 if ($prm !~ /^\d+$/) {
590 printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
591 $sconn->exec("ROLLBACK");
592 return(-2);
593 }
594 $serverId = $prm;
595 print STDERR "Server ID $serverId\n" unless ($quiet);
596 } elsif ($cmd eq 'KEYS') {
597 if ($prm !~ /^\d+$/) {
598 printf STDERR "Invalid numer of keys $prm\n" unless ($quiet);
599 $sconn->exec("ROLLBACK");
600 return(-2);
601 }
602 my $keys = <$inpf>;
603 chomp($keys);
604 if ($multiplemaster) {
605 $result = $sconn->exec("update _rserv_log_ set server=$serverId where key in ($keys)");
606 if ($result->resultStatus ne PGRES_COMMAND_OK || $result->ntuples != $prm) {
607 print STDERR "FATAL: Cannot update source server in _rserv_log_.\n";
608 print STDERR "expected $prm updates, got ",$result->ntuples,"\n" if ($result->ntuples != $prm);
609 print STDERR $sconn->errorMessage unless ($quiet);
610 $sconn->exec("ROLLBACK");
611 return(-1);
612 }
613 }
614 } else {
615 printf STDERR "Unknown command $cmd\n" unless ($quiet);
616 $sconn->exec("ROLLBACK");
617 return(-2);
618 }
619 }
620
621 if (! $ok)
622 {
623 printf STDERR "No OK flag in input\n" unless ($quiet);
624 $sconn->exec("ROLLBACK");
625 return(-2);
626 }
627
628 $result = $sconn->exec("COMMIT");
629 if ($result->resultStatus ne PGRES_COMMAND_OK)
630 {
631 print STDERR $sconn->errorMessage unless ($quiet);
632 $sconn->exec("ROLLBACK");
633 return(-1);
634 }
635
636 return(1);
637 }
638
639 sub DoDelete
640 {
641 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
642
643 # only delete tables that the slave wants
644 if (! defined($Stables{$tabname})) {
645 print "Not configured to delete rows from table $tabname\n" unless $quiet;
646 while (<$inpf>) {
647 my $istring = $_;
648 $istring =~ s/\n//;
649 last if ($istring eq '\.');
650 }
651 return(0);
652 }
653
654 my $ok = 0;
655 while(<$inpf>)
656 {
657 if ($_ !~ /\n$/)
658 {
659 printf STDERR "Invalid format\n" unless ($quiet);
660 return(-2);
661 }
662 my $key = $_;
663 $key =~ s/\n//;
664 if ($key eq '\.')
665 {
666 $ok = 1;
667 last;
668 }
669
670 my $sql = "delete from \"$tabname\" where ".
671 "\"$Stables{$tabname}->[1]\" = '$key'";
672
673 printf "$sql\n" if $debug;
674
675 my $result = $sconn->exec($sql);
676 if ($result->resultStatus ne PGRES_COMMAND_OK)
677 {
678 print STDERR $sconn->errorMessage unless ($quiet);
679 return(-1);
680 }
681 }
682
683 if (! $ok)
684 {
685 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
686 return(-2);
687 }
688
689 return(0);
690 }
691
692
693 sub DoUpdate
694 {
695 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
696
697 # only update the tables that the slave wants
698 if (! defined($Stables{$tabname})) {
699 print "Not configured to update rows from table $tabname\n" unless $quiet;
700 while (<$inpf>) {
701 my $istring = $_;
702 $istring =~ s/\n//;
703 last if ($istring eq '\.');
704 }
705 return(0);
706 }
707
708 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
709
710 my @CopyBuf = ();
711 my $CBufLen = 0;
712 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
713
714 my $sql = "select attnum, attname from pg_attribute" .
715 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
716
717 my $result = $sconn->exec($sql);
718 if ($result->resultStatus ne PGRES_TUPLES_OK)
719 {
720 print STDERR $sconn->errorMessage unless ($quiet);
721 return(-1);
722 }
723
724 my @anames = ();
725 while (my @row = $result->fetchrow)
726 {
727 $anames[$row[0]] = $row[1];
728 }
729
730 my $istring;
731 my $ok = 0;
732 while(<$inpf>)
733 {
734 if ($_ !~ /\n$/)
735 {
736 printf STDERR "Invalid format\n" unless ($quiet);
737 return(-2);
738 }
739 $istring = $_;
740 $istring =~ s/\n//;
741 if ($istring eq '\.')
742 {
743 $ok = 1;
744 last;
745 }
746 my @vals = split(/ /, $istring);
747 if ($oidkey)
748 {
749 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
750 {
751 printf STDERR "Invalid OID\n" unless ($quiet);
752 return(-2);
753 }
754 $oidkey = $vals[0];
755 }
756 else
757 {
758 unshift @vals, '';
759 }
760
761 $sql = "update \"$tabname\" set ";
762 my $ocnt = 0;
763 for (my $i = 1; $i <= $#anames; $i++)
764 {
765 if ($vals[$i] eq '\N')
766 {
767 if ($i == $Stables{$tabname}->[2])
768 {
769 printf STDERR "NULL key\n" unless ($quiet);
770 return(-2);
771 }
772 $vals[$i] = 'null';
773 }
774 else
775 {
776 $vals[$i] = "'" . $vals[$i] . "'";
777 next if $i == $Stables{$tabname}->[2];
778 }
779 $ocnt++;
780 $sql .= ', ' if $ocnt > 1;
781 $sql .= "\"$anames[$i]\" = $vals[$i]";
782 }
783 if ($oidkey)
784 {
785 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
786 }
787 else
788 {
789 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
790 $vals[$Stables{$tabname}->[2]];
791 }
792
793 printf "$sql\n" if $debug;
794
795 $result = $sconn->exec($sql);
796
797 if ($result->resultStatus ne PGRES_COMMAND_OK)
798 {
799 print STDERR $sconn->errorMessage unless ($quiet);
800 return(-1);
801 }
802 next if $result->cmdTuples == 1; # updated
803
804 if ($result->cmdTuples > 1)
805 {
806 printf STDERR "Duplicate keys\n" unless ($quiet);
807 return(-2);
808 }
809
810 # no key - copy
811 push @CopyBuf, "$istring\n";
812 $CBufLen += length($istring);
813
814 if ($CBufLen >= $CBufMax)
815 {
816 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
817 return($result) if $result;
818 @CopyBuf = ();
819 $CBufLen = 0;
820 }
821 }
822
823 if (! $ok)
824 {
825 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
826 return(-2);
827 }
828
829 if ($CBufLen)
830 {
831 print "@CopyBuf\n" if $debug;
832 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
833 return($result) if $result;
834 }
835
836 return(0);
837 }
838
839 sub DoInsert
840 {
841 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
842
843 # only insert rows into tables that the slave wants
844 if (! defined($Stables{$tabname})) {
845 print "Not configured to insert rows from table $tabname\n" unless $quiet;
846 while (<$inpf>) {
847 my $istring = $_;
848 $istring =~ s/\n//;
849 last if ($istring eq '\.');
850 }
851 return(0);
852 }
853
854 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
855
856 my @CopyBuf = ();
857 my $CBufLen = 0;
858 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
859
860 my $istring;
861 my $ok = 0;
862 while(<$inpf>)
863 {
864 if ($_ !~ /\n$/)
865 {
866 printf STDERR "Invalid format\n" unless ($quiet);
867 return(-2);
868 }
869 $istring = $_;
870 $istring =~ s/\n//;
871 if ($istring eq '\.')
872 {
873 $ok = 1;
874 last;
875 }
876
877 # no key - copy
878 push @CopyBuf, "$istring\n";
879 $CBufLen += length($istring);
880
881 if ($CBufLen >= $CBufMax)
882 {
883 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
884 return($result) if $result;
885 @CopyBuf = ();
886 $CBufLen = 0;
887 }
888 }
889
890 if (! $ok)
891 {
892 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
893 return(-2);
894 }
895
896 if ($CBufLen)
897 {
898 print "@CopyBuf\n" if $debug;
899 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
900 return($result) if $result;
901 }
902
903 return(0);
904 }
905
906
907 sub DoCopy
908 {
909 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
910
911 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
912 "FROM STDIN";
913 my $result = $sconn->exec($sql);
914 if ($result->resultStatus ne PGRES_COPY_IN)
915 {
916 print STDERR $sconn->errorMessage unless ($quiet);
917 return(-1);
918 }
919
920 foreach my $str (@{$CBuf})
921 {
922 $sconn->putline($str);
923 }
924
925 $sconn->putline("\\.\n");
926
927 if ($sconn->endcopy)
928 {
929 print STDERR $sconn->errorMessage unless ($quiet);
930 return(-1);
931 }
932
933 return(0);
934 }
935
936
937 #
938 # Returns last SyncID applied on Slave
939 #
940 sub GetSyncID
941 {
942 my ($sconn) = @_; # (@_[0]);
943
944 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
945 if ($result->resultStatus ne PGRES_TUPLES_OK)
946 {
947 print STDERR $sconn->errorMessage unless ($quiet);
948 return(-1);
949 }
950 my @row = $result->fetchrow;
951 return(undef) unless defined $row[0]; # null
952 return($row[0]);
953 }
954
955 #
956 # Updates _RSERV_SYNC_ on Master with Slave SyncID
957 #
958 sub SyncSyncID
959 {
960 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
961
962 my $result = $mconn->exec("BEGIN");
963 if ($result->resultStatus ne PGRES_COMMAND_OK)
964 {
965 print STDERR $mconn->errorMessage unless ($quiet);
966 $mconn->exec("ROLLBACK");
967 return(-1);
968 }
969
970 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
971 " where server = $sserver AND syncid = $syncid" .
972 " for update");
973 if ($result->resultStatus ne PGRES_TUPLES_OK)
974 {
975 print STDERR $mconn->errorMessage unless ($quiet);
976 $mconn->exec("ROLLBACK");
977 return(-1);
978 }
979 my @row = $result->fetchrow;
980 if (! defined $row[0])
981 {
982 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
983 $mconn->exec("ROLLBACK");
984 return(0);
985 }
986 if ($row[1] > 0)
987 {
988 printf STDERR "SyncID $syncid for server ".
989 "$sserver already updated\n" unless ($quiet);
990 $mconn->exec("ROLLBACK");
991 return(0);
992 }
993 $result = $mconn->exec("update _RSERV_SYNC_" .
994 " set synctime = now(), status = 1" .
995 " where server = $sserver AND syncid = $syncid");
996 if ($result->resultStatus ne PGRES_COMMAND_OK)
997 {
998 print STDERR $mconn->errorMessage unless ($quiet);
999 $mconn->exec("ROLLBACK");
1000 return(-1);
1001 }
1002 $result = $mconn->exec("delete from _RSERV_SYNC_" .
1003 " where server = $sserver AND syncid < $syncid");
1004 if ($result->resultStatus ne PGRES_COMMAND_OK)
1005 {
1006 print STDERR $mconn->errorMessage unless ($quiet);
1007 $mconn->exec("ROLLBACK");
1008 return(-1);
1009 }
1010
1011 $result = $mconn->exec("COMMIT");
1012 if ($result->resultStatus ne PGRES_COMMAND_OK)
1013 {
1014 print STDERR $mconn->errorMessage unless ($quiet);
1015 $mconn->exec("ROLLBACK");
1016 return(-1);
1017 }
1018
1019 return(1);
1020 }
1021
1022 # stuff moved from perl scripts for better re-use
1023
1024 sub Rollback {
1025 my $conn = shift @_;
1026
1027 print STDERR $conn->errorMessage unless ($quiet);
1028 $conn->exec("ROLLBACK");
1029 }
1030
1031 sub RollbackAndQuit {
1032 my $conn = shift @_;
1033
1034 Rollback($conn);
1035 exit (-1);
1036 }
1037
1038 sub Connect {
1039 my $info = shift @_;
1040
1041 print("Connecting to $info\n") if ($debug || $verbose);
1042 my $conn = Pg::connectdb($info);
1043 if ($conn->status != PGRES_CONNECTION_OK) {
1044 print STDERR "Failed opening $info\n";
1045 exit 1;
1046 }
1047 return $conn;
1048 }
1049
1050 sub Exec {
1051 my $conn = shift || die "Exec needs connection!";
1052 my $sql = shift || die "Exec needs SQL statement!";
1053 # used to return error code if no tuples are retured
1054 my $return_code = shift;
1055
1056 if ($debug) {
1057 # re-format SQL in one line (for nicer output)
1058 $sql =~ s/[\s\n\r]+/ /gs;
1059 print STDERR "Exec: $sql\n";
1060 }
1061 my $result = $conn->exec($sql);
1062 if ($result->resultStatus eq PGRES_COMMAND_OK) {
1063 return;
1064 } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
1065 print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1066 return $result;
1067 } else {
1068 if (defined($return_code)) {
1069 print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
1070 return($return_code);
1071 } else {
1072 RollbackAndQuit($conn)
1073 }
1074 }
1075 }
1076
1077 sub Exec2 {
1078 my $mconn = shift @_;
1079 my $sconn = shift @_;
1080 my $sql = shift @_;
1081
1082 my $result = $mconn->exec($sql);
1083 RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1084 $result = $sconn->exec($sql);
1085 RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1086 # XXX TODO: return results?!
1087 }
1088
1089 sub MkInfo {
1090 my $db = shift || die "need database name!";
1091 my $host = shift;
1092 my $port = shift;
1093 my $user = shift;
1094 my $password = shift;
1095
1096 my $info = "dbname=$db";
1097 $info = "$info host=$host" if (defined($host));
1098 $info = "$info port=$port" if (defined($port));
1099 $info = "$info user=$user" if (defined($user));
1100 $info = "$info password=$password" if (defined($password));
1101
1102 return $info;
1103 }
1104
1105 1;

  ViewVC Help
Powered by ViewVC 1.1.26