/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.15 - (show annotations)
Sun Nov 2 13:21:12 2003 UTC (20 years, 7 months ago) by dpavlin
Branch: MAIN
Changes since 1.14: +50 -53 lines
Exec now supports returning of tuples, re-wrote more code to use Exec

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11 $debug $quiet $verbose
12 );
13 @EXPORT_OK = qw();
14 use strict;
15 use Pg;
16
17 my $debug = 0;
18 my $quiet = 1;
19 my $verbose = 0;
20
21 $debug = 1;
22 $quiet = 0;
23 $verbose = 1;
24
25 my %Mtables = ();
26 my %Stables = ();
27
28 sub GetServerId
29 {
30 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31
32 print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33
34 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 " host='$Host' AND dbase='$DB'");
36
37 if ($result->resultStatus ne PGRES_TUPLES_OK)
38 {
39 print STDERR $mconn->errorMessage unless ($quiet);
40 return(-1);
41 }
42
43 if ($result->cmdTuples && $result->cmdTuples > 1)
44 {
45 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 return(-2);
47 }
48
49 my @row = $result->fetchrow;
50
51 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52
53 return $row[0];
54 }
55
56 sub PrepareSnapshot
57 {
58 my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59
60 if ($mserver == $sserver) {
61 print STDERR "master and slave numbers are same [$mserver] !\n";
62 return(-1);
63 }
64
65 print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66
67 # first, we must know for wich tables the slave subscribed
68 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
69 return (-1) if ($result == -1);
70
71 my @row;
72 while (@row = $result->fetchrow) {
73 $Stables{$row[0]} = 1;
74 }
75
76 print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
77
78 Exec($mconn,"BEGIN");
79 Exec($mconn,"set transaction isolation level serializable");
80
81 # MAP oid --> tabname, keyname, key_type
82 my $sql = qq{
83 select pgc.oid, pgc.relname, pga.attname, pgt.typname
84 from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
85 pg_type pgt
86 where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
87 AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
88 };
89 $result = Exec($mconn,$sql);
90
91 while (@row = $result->fetchrow)
92 {
93 # printf "$row[0], $row[1], $row[2]\n";
94 if (ref($onlytables) eq 'HASH') {
95 next unless (exists $onlytables->{$row[1]});
96 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
97 }
98 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
99 }
100
101 print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
102 if (! %Mtables) {
103 print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
104 RollbackAndQuit($mconn);
105 }
106
107 # Read last succeeded sync
108 $sql = qq{
109 select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
110 where server = $sserver AND syncid =
111 (select max(syncid) from _RSERV_SYNC_
112 where server = $sserver AND status > 0)
113 };
114
115 $result = Exec($mconn,$sql);
116
117 my @lastsync = $result->fetchrow;
118 print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
119
120 # exclude data which originated from master server
121 my $sel_server = " and l.server = $mserver ";
122
123 my $sinfo = "";
124 if (@lastsync && $lastsync[3] ne '') # sync info
125 {
126 $sinfo = "and (l.logid >= $lastsync[3]";
127 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
128 $sinfo .= ")";
129 }
130
131 my $havedeal = 0;
132
133 # DELETED rows
134 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
135 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
136
137 printf "DELETED: $sql\n" if $debug;
138
139 $result = $mconn->exec($sql);
140 if ($result->resultStatus ne PGRES_TUPLES_OK)
141 {
142 print STDERR $mconn->errorMessage unless ($quiet);
143 $mconn->exec("ROLLBACK");
144 return(-1);
145 }
146
147 my $lastoid = -1;
148 while (@row = $result->fetchrow)
149 {
150 next unless exists $Mtables{$row[0]};
151 next unless exists $Stables{$Mtables{$row[0]}[0]};
152
153 if ($lastoid != $row[0])
154 {
155 if ($lastoid == -1)
156 {
157 my $syncid = GetSYNCID($mconn, $outf);
158 return($syncid) if $syncid < 0;
159 $havedeal = 1;
160 }
161 else
162 {
163 printf $outf "\\.\n";
164 }
165 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
166 $lastoid = $row[0];
167 }
168 if (! defined $row[1])
169 {
170 print STDERR "NULL key\n" unless ($quiet);
171 $mconn->exec("ROLLBACK");
172 return(-2);
173 }
174 printf $outf "%s\n", OutputValue($row[1]);
175 }
176 printf $outf "\\.\n" if ($lastoid != -1);
177
178 # UPDATED rows
179
180 my ($taboid, $tabname, $tabkey);
181 foreach $taboid (keys %Mtables)
182 {
183 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
184 next unless exists $Stables{$tabname};
185
186 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
187
188 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
189 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
190 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
191 $sel_server;
192
193 printf "UPDATED: $sql\n" if $debug;
194
195 $result = $mconn->exec($sql);
196 if ($result->resultStatus ne PGRES_TUPLES_OK)
197 {
198 printf $outf "-- ERROR\n" if $havedeal;
199 print STDERR $mconn->errorMessage unless ($quiet);
200 $mconn->exec("ROLLBACK");
201 return(-1);
202 }
203 next if $result->ntuples <= 0;
204 if (! $havedeal)
205 {
206 my $syncid = GetSYNCID($mconn, $outf);
207 return($syncid) if $syncid < 0;
208 $havedeal = 1;
209 }
210 printf $outf "-- UPDATE $tabname\n";
211 printf "-- UPDATE $tabname\n" if $debug;
212 while (@row = $result->fetchrow)
213 {
214 for (my $i = 0; $i <= $#row; $i++)
215 {
216 printf $outf " " if $i;
217 printf " " if $i && $debug;
218 printf $outf "%s", OutputValue($row[$i]);
219 printf "%s", OutputValue($row[$i]) if $debug;;
220 }
221 printf $outf "\n";
222 printf "\n" if $debug;
223 }
224 printf $outf "\\.\n";
225 printf "\\.\n" if $debug;;
226 }
227
228 # INSERTED rows
229
230 foreach $taboid (keys %Mtables)
231 {
232 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
233 next unless exists $Stables{$tabname};
234
235 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
236
237 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
238 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
239 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
240 $sel_server;
241
242 printf "INSERTED: $sql\n" if $debug;
243
244 $result = $mconn->exec($sql);
245 if ($result->resultStatus ne PGRES_TUPLES_OK)
246 {
247 printf $outf "-- ERROR\n" if $havedeal;
248 print STDERR $mconn->errorMessage unless ($quiet);
249 $mconn->exec("ROLLBACK");
250 return(-1);
251 }
252 next if $result->ntuples <= 0;
253 if (! $havedeal)
254 {
255 my $syncid = GetSYNCID($mconn, $outf);
256 return($syncid) if $syncid < 0;
257 $havedeal = 1;
258 }
259 printf $outf "-- INSERT $tabname\n";
260 printf "-- INSERT $tabname\n" if $debug;
261 while (@row = $result->fetchrow)
262 {
263 for (my $i = 0; $i <= $#row; $i++)
264 {
265 printf $outf " " if $i;
266 printf " " if $i && $debug;
267 printf $outf "%s", OutputValue($row[$i]);
268 printf "%s", OutputValue($row[$i]) if $debug;;
269 }
270 printf $outf "\n";
271 printf "\n" if $debug;
272 }
273 printf $outf "\\.\n";
274 printf "\\.\n" if $debug;;
275 }
276
277
278 unless ($havedeal)
279 {
280 $mconn->exec("ROLLBACK");
281 return(0);
282 }
283
284 # Remember this snapshot info
285 $result = $mconn->exec("select _rserv_sync_($sserver)");
286 if ($result->resultStatus ne PGRES_TUPLES_OK)
287 {
288 printf $outf "-- ERROR\n";
289 print STDERR $mconn->errorMessage unless ($quiet);
290 $mconn->exec("ROLLBACK");
291 return(-1);
292 }
293
294 $result = $mconn->exec("COMMIT");
295 if ($result->resultStatus ne PGRES_COMMAND_OK)
296 {
297 printf $outf "-- ERROR\n";
298 print STDERR $mconn->errorMessage unless ($quiet);
299 $mconn->exec("ROLLBACK");
300 return(-1);
301 }
302 printf $outf "-- OK\n";
303 printf "-- OK\n" if $debug;
304
305 return(1);
306
307 }
308
309 sub OutputValue
310 {
311 my ($val) = @_; # @_[0];
312
313 return("\\N") unless defined $val;
314
315 $val =~ s/\\/\\\\/g;
316 $val =~ s/ /\\011/g;
317 $val =~ s/\n/\\012/g;
318 $val =~ s/\'/\\047/g;
319
320 return($val);
321 }
322
323 # Get syncid for new snapshot
324 sub GetSYNCID
325 {
326 my ($conn, $outf) = @_; # (@_[0], @_[1]);
327
328 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
329 if ($result->resultStatus ne PGRES_TUPLES_OK)
330 {
331 print STDERR $conn->errorMessage unless ($quiet);
332 $conn->exec("ROLLBACK");
333 return(-1);
334 }
335
336 my @row = $result->fetchrow;
337
338 printf $outf "-- SYNCID $row[0]\n";
339 printf "-- SYNCID $row[0]\n" if $debug;
340 return($row[0]);
341 }
342
343
344 sub CleanLog
345 {
346 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
347
348 my $result = $conn->exec("BEGIN");
349 if ($result->resultStatus ne PGRES_COMMAND_OK)
350 {
351 print STDERR $conn->errorMessage unless ($quiet);
352 $conn->exec("ROLLBACK");
353 return(-1);
354 }
355
356 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
357 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
358 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
359
360 printf "$sql\n" if $debug;
361
362 $result = $conn->exec($sql);
363 if ($result->resultStatus ne PGRES_TUPLES_OK)
364 {
365 print STDERR $conn->errorMessage unless ($quiet);
366 return(-1);
367 }
368 my $maxid = '';
369 my %active = ();
370 while (my @row = $result->fetchrow)
371 {
372 $maxid = $row[0] if $maxid eq '';
373 last if $row[0] > $maxid;
374 my @ids = split(/[ ]+,[ ]+/, $row[1]);
375 foreach my $aid (@ids)
376 {
377 $active{$aid} = 1 unless exists $active{$aid};
378 }
379 }
380 if ($maxid eq '')
381 {
382 print STDERR "No Sync IDs\n" unless ($quiet);
383 return(0);
384 }
385 my $alist = join(',', keys %active);
386 my $sinfo = "logid < $maxid";
387 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
388 #if (ref($onlytables) eq 'HASH') {
389 # foreach my $onlytable (keys %{$onlytables}) {
390 # $sinfo
391 # }
392 #}
393 $sql = "delete from _RSERV_LOG_ where " .
394 "logtime < now() - '$howold second'::interval AND $sinfo";
395
396 printf "$sql\n" if $debug;
397
398 $result = $conn->exec($sql);
399 if ($result->resultStatus ne PGRES_COMMAND_OK)
400 {
401 print STDERR $conn->errorMessage unless ($quiet);
402 $conn->exec("ROLLBACK");
403 return(-1);
404 }
405 $maxid = $result->cmdTuples;
406
407 $result = $conn->exec("COMMIT");
408 if ($result->resultStatus ne PGRES_COMMAND_OK)
409 {
410 print STDERR $conn->errorMessage unless ($quiet);
411 $conn->exec("ROLLBACK");
412 return(-1);
413 }
414
415 return($maxid);
416 }
417
418 sub ApplySnapshot
419 {
420 my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
421
422 my $result = $sconn->exec("BEGIN");
423 if ($result->resultStatus ne PGRES_COMMAND_OK)
424 {
425 print STDERR $sconn->errorMessage unless ($quiet);
426 $sconn->exec("ROLLBACK");
427 return(-1);
428 }
429
430 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
431 if ($result->resultStatus ne PGRES_COMMAND_OK)
432 {
433 print STDERR $sconn->errorMessage unless ($quiet);
434 $sconn->exec("ROLLBACK");
435 return(-1);
436 }
437
438 # MAP name --> oid, keyname, keynum
439 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
440 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
441 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
442 " AND pga.attnum = rt.key";
443
444 $result = $sconn->exec($sql);
445 if ($result->resultStatus ne PGRES_TUPLES_OK)
446 {
447 print STDERR $sconn->errorMessage unless ($quiet);
448 $sconn->exec("ROLLBACK");
449 return(-1);
450 }
451 %Stables = ();
452 while (my @row = $result->fetchrow)
453 {
454 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
455 if (ref($onlytables) eq 'HASH') {
456 next unless (exists $onlytables->{$row[1]});
457 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
458 }
459 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
460 }
461
462 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
463
464 my $ok = 0;
465 my $syncid = -1;
466 while(<$inpf>)
467 {
468 $_ =~ s/\n//;
469 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
470 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
471 if ($cmt ne '--')
472 {
473 printf STDERR "Invalid format\n" unless ($quiet);
474 $sconn->exec("ROLLBACK");
475 return(-2);
476 }
477 if ($cmd eq 'DELETE')
478 {
479 if ($syncid == -1)
480 {
481 printf STDERR "Sync ID unspecified\n" unless ($quiet);
482 $sconn->exec("ROLLBACK");
483 return(-2);
484 }
485 $result = DoDelete($sconn, $inpf, $prm);
486 if ($result)
487 {
488 $sconn->exec("ROLLBACK");
489 return($result);
490 }
491 }
492 elsif ($cmd eq 'INSERT')
493 {
494 if ($syncid == -1)
495 {
496 printf STDERR "Sync ID unspecified\n" unless ($quiet);
497 $sconn->exec("ROLLBACK");
498 return(-2);
499 }
500 $result = DoInsert($sconn, $inpf, $prm);
501 if ($result)
502 {
503 $sconn->exec("ROLLBACK");
504 return($result);
505 }
506 }
507 elsif ($cmd eq 'UPDATE')
508 {
509 if ($syncid == -1)
510 {
511 printf STDERR "Sync ID unspecified\n" unless ($quiet);
512 $sconn->exec("ROLLBACK");
513 return(-2);
514 }
515 $result = DoUpdate($sconn, $inpf, $prm);
516 if ($result)
517 {
518 $sconn->exec("ROLLBACK");
519 return($result);
520 }
521 }
522 elsif ($cmd eq 'SYNCID')
523 {
524 if ($syncid != -1)
525 {
526 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
527 $sconn->exec("ROLLBACK");
528 return(-2);
529 }
530 if ($prm !~ /^\d+$/)
531 {
532 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
533 $sconn->exec("ROLLBACK");
534 return(-2);
535 }
536 $syncid = $prm;
537
538 printf STDERR "Sync ID $syncid\n" unless ($quiet);
539
540 $result = $sconn->exec("select syncid, synctime from " .
541 "_RSERV_SLAVE_SYNC_ where syncid = " .
542 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
543 if ($result->resultStatus ne PGRES_TUPLES_OK)
544 {
545 print STDERR $sconn->errorMessage unless ($quiet);
546 $sconn->exec("ROLLBACK");
547 return(-1);
548 }
549 my @row = $result->fetchrow;
550 if (! defined $row[0])
551 {
552 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
553 "(syncid, synctime) values ($syncid, now())");
554 }
555 elsif ($row[0] >= $prm)
556 {
557 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
558 $sconn->exec("ROLLBACK");
559 return(0);
560 }
561 else
562 {
563 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
564 " set syncid = $syncid, synctime = now()");
565 }
566 if ($result->resultStatus ne PGRES_COMMAND_OK)
567 {
568 print STDERR $sconn->errorMessage unless ($quiet);
569 $sconn->exec("ROLLBACK");
570 return(-1);
571 }
572 }
573 elsif ($cmd eq 'OK')
574 {
575 $ok = 1;
576 last;
577 }
578 elsif ($cmd eq 'ERROR')
579 {
580 printf STDERR "ERROR signaled\n" unless ($quiet);
581 $sconn->exec("ROLLBACK");
582 return(-2);
583 }
584 else
585 {
586 printf STDERR "Unknown command $cmd\n" unless ($quiet);
587 $sconn->exec("ROLLBACK");
588 return(-2);
589 }
590 }
591
592 if (! $ok)
593 {
594 printf STDERR "No OK flag in input\n" unless ($quiet);
595 $sconn->exec("ROLLBACK");
596 return(-2);
597 }
598
599 $result = $sconn->exec("COMMIT");
600 if ($result->resultStatus ne PGRES_COMMAND_OK)
601 {
602 print STDERR $sconn->errorMessage unless ($quiet);
603 $sconn->exec("ROLLBACK");
604 return(-1);
605 }
606
607 return(1);
608 }
609
610 sub DoDelete
611 {
612 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
613
614 # only delete tables that the slave wants
615 if (! defined($Stables{$tabname})) {
616 print "Not configured to delete rows from table $tabname\n" unless $quiet;
617 while (<$inpf>) {
618 my $istring = $_;
619 $istring =~ s/\n//;
620 last if ($istring eq '\.');
621 }
622 return(0);
623 }
624
625 my $ok = 0;
626 while(<$inpf>)
627 {
628 if ($_ !~ /\n$/)
629 {
630 printf STDERR "Invalid format\n" unless ($quiet);
631 return(-2);
632 }
633 my $key = $_;
634 $key =~ s/\n//;
635 if ($key eq '\.')
636 {
637 $ok = 1;
638 last;
639 }
640
641 my $sql = "delete from \"$tabname\" where ".
642 "\"$Stables{$tabname}->[1]\" = '$key'";
643
644 printf "$sql\n" if $debug;
645
646 my $result = $sconn->exec($sql);
647 if ($result->resultStatus ne PGRES_COMMAND_OK)
648 {
649 print STDERR $sconn->errorMessage unless ($quiet);
650 return(-1);
651 }
652 }
653
654 if (! $ok)
655 {
656 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
657 return(-2);
658 }
659
660 return(0);
661 }
662
663
664 sub DoUpdate
665 {
666 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
667
668 # only update the tables that the slave wants
669 if (! defined($Stables{$tabname})) {
670 print "Not configured to update rows from table $tabname\n" unless $quiet;
671 while (<$inpf>) {
672 my $istring = $_;
673 $istring =~ s/\n//;
674 last if ($istring eq '\.');
675 }
676 return(0);
677 }
678
679 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
680
681 my @CopyBuf = ();
682 my $CBufLen = 0;
683 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
684
685 my $sql = "select attnum, attname from pg_attribute" .
686 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
687
688 my $result = $sconn->exec($sql);
689 if ($result->resultStatus ne PGRES_TUPLES_OK)
690 {
691 print STDERR $sconn->errorMessage unless ($quiet);
692 return(-1);
693 }
694
695 my @anames = ();
696 while (my @row = $result->fetchrow)
697 {
698 $anames[$row[0]] = $row[1];
699 }
700
701 my $istring;
702 my $ok = 0;
703 while(<$inpf>)
704 {
705 if ($_ !~ /\n$/)
706 {
707 printf STDERR "Invalid format\n" unless ($quiet);
708 return(-2);
709 }
710 $istring = $_;
711 $istring =~ s/\n//;
712 if ($istring eq '\.')
713 {
714 $ok = 1;
715 last;
716 }
717 my @vals = split(/ /, $istring);
718 if ($oidkey)
719 {
720 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
721 {
722 printf STDERR "Invalid OID\n" unless ($quiet);
723 return(-2);
724 }
725 $oidkey = $vals[0];
726 }
727 else
728 {
729 unshift @vals, '';
730 }
731
732 $sql = "update \"$tabname\" set ";
733 my $ocnt = 0;
734 for (my $i = 1; $i <= $#anames; $i++)
735 {
736 if ($vals[$i] eq '\N')
737 {
738 if ($i == $Stables{$tabname}->[2])
739 {
740 printf STDERR "NULL key\n" unless ($quiet);
741 return(-2);
742 }
743 $vals[$i] = 'null';
744 }
745 else
746 {
747 $vals[$i] = "'" . $vals[$i] . "'";
748 next if $i == $Stables{$tabname}->[2];
749 }
750 $ocnt++;
751 $sql .= ', ' if $ocnt > 1;
752 $sql .= "\"$anames[$i]\" = $vals[$i]";
753 }
754 if ($oidkey)
755 {
756 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
757 }
758 else
759 {
760 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
761 $vals[$Stables{$tabname}->[2]];
762 }
763
764 printf "$sql\n" if $debug;
765
766 $result = $sconn->exec($sql);
767
768 if ($result->resultStatus ne PGRES_COMMAND_OK)
769 {
770 print STDERR $sconn->errorMessage unless ($quiet);
771 return(-1);
772 }
773 next if $result->cmdTuples == 1; # updated
774
775 if ($result->cmdTuples > 1)
776 {
777 printf STDERR "Duplicate keys\n" unless ($quiet);
778 return(-2);
779 }
780
781 # no key - copy
782 push @CopyBuf, "$istring\n";
783 $CBufLen += length($istring);
784
785 if ($CBufLen >= $CBufMax)
786 {
787 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
788 return($result) if $result;
789 @CopyBuf = ();
790 $CBufLen = 0;
791 }
792 }
793
794 if (! $ok)
795 {
796 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
797 return(-2);
798 }
799
800 if ($CBufLen)
801 {
802 print "@CopyBuf\n" if $debug;
803 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
804 return($result) if $result;
805 }
806
807 return(0);
808 }
809
810 sub DoInsert
811 {
812 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
813
814 # only insert rows into tables that the slave wants
815 if (! defined($Stables{$tabname})) {
816 print "Not configured to insert rows from table $tabname\n" unless $quiet;
817 while (<$inpf>) {
818 my $istring = $_;
819 $istring =~ s/\n//;
820 last if ($istring eq '\.');
821 }
822 return(0);
823 }
824
825 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
826
827 my @CopyBuf = ();
828 my $CBufLen = 0;
829 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
830
831 my $istring;
832 my $ok = 0;
833 while(<$inpf>)
834 {
835 if ($_ !~ /\n$/)
836 {
837 printf STDERR "Invalid format\n" unless ($quiet);
838 return(-2);
839 }
840 $istring = $_;
841 $istring =~ s/\n//;
842 if ($istring eq '\.')
843 {
844 $ok = 1;
845 last;
846 }
847
848 # no key - copy
849 push @CopyBuf, "$istring\n";
850 $CBufLen += length($istring);
851
852 if ($CBufLen >= $CBufMax)
853 {
854 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
855 return($result) if $result;
856 @CopyBuf = ();
857 $CBufLen = 0;
858 }
859 }
860
861 if (! $ok)
862 {
863 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
864 return(-2);
865 }
866
867 if ($CBufLen)
868 {
869 print "@CopyBuf\n" if $debug;
870 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
871 return($result) if $result;
872 }
873
874 return(0);
875 }
876
877
878 sub DoCopy
879 {
880 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
881
882 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
883 "FROM STDIN";
884 my $result = $sconn->exec($sql);
885 if ($result->resultStatus ne PGRES_COPY_IN)
886 {
887 print STDERR $sconn->errorMessage unless ($quiet);
888 return(-1);
889 }
890
891 foreach my $str (@{$CBuf})
892 {
893 $sconn->putline($str);
894 }
895
896 $sconn->putline("\\.\n");
897
898 if ($sconn->endcopy)
899 {
900 print STDERR $sconn->errorMessage unless ($quiet);
901 return(-1);
902 }
903
904 return(0);
905 }
906
907
908 #
909 # Returns last SyncID applied on Slave
910 #
911 sub GetSyncID
912 {
913 my ($sconn) = @_; # (@_[0]);
914
915 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
916 if ($result->resultStatus ne PGRES_TUPLES_OK)
917 {
918 print STDERR $sconn->errorMessage unless ($quiet);
919 return(-1);
920 }
921 my @row = $result->fetchrow;
922 return(undef) unless defined $row[0]; # null
923 return($row[0]);
924 }
925
926 #
927 # Updates _RSERV_SYNC_ on Master with Slave SyncID
928 #
929 sub SyncSyncID
930 {
931 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
932
933 my $result = $mconn->exec("BEGIN");
934 if ($result->resultStatus ne PGRES_COMMAND_OK)
935 {
936 print STDERR $mconn->errorMessage unless ($quiet);
937 $mconn->exec("ROLLBACK");
938 return(-1);
939 }
940
941 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
942 " where server = $sserver AND syncid = $syncid" .
943 " for update");
944 if ($result->resultStatus ne PGRES_TUPLES_OK)
945 {
946 print STDERR $mconn->errorMessage unless ($quiet);
947 $mconn->exec("ROLLBACK");
948 return(-1);
949 }
950 my @row = $result->fetchrow;
951 if (! defined $row[0])
952 {
953 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
954 $mconn->exec("ROLLBACK");
955 return(0);
956 }
957 if ($row[1] > 0)
958 {
959 printf STDERR "SyncID $syncid for server ".
960 "$sserver already updated\n" unless ($quiet);
961 $mconn->exec("ROLLBACK");
962 return(0);
963 }
964 $result = $mconn->exec("update _RSERV_SYNC_" .
965 " set synctime = now(), status = 1" .
966 " where server = $sserver AND syncid = $syncid");
967 if ($result->resultStatus ne PGRES_COMMAND_OK)
968 {
969 print STDERR $mconn->errorMessage unless ($quiet);
970 $mconn->exec("ROLLBACK");
971 return(-1);
972 }
973 $result = $mconn->exec("delete from _RSERV_SYNC_" .
974 " where server = $sserver AND syncid < $syncid");
975 if ($result->resultStatus ne PGRES_COMMAND_OK)
976 {
977 print STDERR $mconn->errorMessage unless ($quiet);
978 $mconn->exec("ROLLBACK");
979 return(-1);
980 }
981
982 $result = $mconn->exec("COMMIT");
983 if ($result->resultStatus ne PGRES_COMMAND_OK)
984 {
985 print STDERR $mconn->errorMessage unless ($quiet);
986 $mconn->exec("ROLLBACK");
987 return(-1);
988 }
989
990 return(1);
991 }
992
993 # stuff moved from perl scripts for better re-use
994
995 sub Rollback {
996 my $conn = shift @_;
997
998 print STDERR $conn->errorMessage unless ($quiet);
999 $conn->exec("ROLLBACK");
1000 }
1001
1002 sub RollbackAndQuit {
1003 my $conn = shift @_;
1004
1005 Rollback($conn);
1006 exit (-1);
1007 }
1008
1009 sub Connect {
1010 my $info = shift @_;
1011
1012 print("Connecting to $info\n") if ($debug || $verbose);
1013 my $conn = Pg::connectdb($info);
1014 if ($conn->status != PGRES_CONNECTION_OK) {
1015 print STDERR "Failed opening $info\n";
1016 exit 1;
1017 }
1018 return $conn;
1019 }
1020
1021 sub Exec {
1022 my $conn = shift || die "Exec needs connection!";
1023 my $sql = shift || die "Exec needs SQL statement!";
1024 # used to return error code if no tuples are retured
1025 my $return_code = shift;
1026
1027 if ($debug) {
1028 # re-format SQL in one line (for nicer output)
1029 $sql =~ s/[\s\n\r]+/ /gs;
1030 print STDERR "Exec: $sql\n";
1031 }
1032 my $result = $conn->exec($sql);
1033 if ($result->resultStatus eq PGRES_COMMAND_OK) {
1034 return;
1035 } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
1036 print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1037 return $result;
1038 } else {
1039 if (defined($return_code)) {
1040 print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
1041 return($return_code);
1042 } else {
1043 RollbackAndQuit($conn)
1044 }
1045 }
1046 }
1047
1048 sub Exec2 {
1049 my $mconn = shift @_;
1050 my $sconn = shift @_;
1051 my $sql = shift @_;
1052
1053 my $result = $mconn->exec($sql);
1054 RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1055 $result = $sconn->exec($sql);
1056 RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1057 # XXX TODO: return results?!
1058 }
1059
1060 sub MkInfo {
1061 my $db = shift || die "need database name!";
1062 my $host = shift;
1063 my $port = shift;
1064 my $user = shift;
1065 my $password = shift;
1066
1067 my $info = "dbname=$db";
1068 $info = "$info host=$host" if (defined($host));
1069 $info = "$info port=$port" if (defined($port));
1070 $info = "$info user=$user" if (defined($user));
1071 $info = "$info password=$password" if (defined($password));
1072
1073 return $info;
1074 }
1075
1076 1;

  ViewVC Help
Powered by ViewVC 1.1.26