/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (show annotations)
Sun Oct 26 23:43:54 2003 UTC (20 years, 7 months ago) by dpavlin
Branch: MAIN
CVS Tags: before_multmaster, r_0_3
Changes since 1.4: +16 -4 lines
applied patch from Michael A Nachbaur to select tables which are replicating

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10 @EXPORT_OK = qw();
11 use strict;
12 use Pg;
13
14 my $debug = 0;
15 my $quiet = 1;
16
17 my %Mtables = ();
18 my %Stables = ();
19
20 sub GetSlaveId
21 {
22 my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23
24 my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25 " host='$slaveHost' AND dbase='$slaveDB'");
26
27 if ($result->resultStatus ne PGRES_TUPLES_OK)
28 {
29 print STDERR $conn->errorMessage unless ($quiet);
30 return(-1);
31 }
32
33 if ($result->cmdTuples && $result->cmdTuples > 1)
34 {
35 printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
36 return(-2);
37 }
38
39 my @row = $result->fetchrow;
40
41 return $row[0];
42 }
43
44 sub PrepareSnapshot
45 {
46 my ($conn, $sconn, $outf, $server, $onlytables) = @_;
47
48 # first, we must know for wich tables the slave subscribed
49 my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
50 if ($result->resultStatus ne PGRES_TUPLES_OK)
51 {
52 print STDERR $conn->errorMessage unless ($quiet);
53 return(-1);
54 }
55
56 my @row;
57 while (@row = $result->fetchrow) {
58 $Stables{$row[0]} = 1;
59 }
60
61 $result = $conn->exec("BEGIN");
62 if ($result->resultStatus ne PGRES_COMMAND_OK)
63 {
64 print STDERR $conn->errorMessage unless ($quiet);
65 $conn->exec("ROLLBACK");
66 return(-1);
67 }
68 $result = $conn->exec("set transaction isolation level serializable");
69 if ($result->resultStatus ne PGRES_COMMAND_OK)
70 {
71 print STDERR $conn->errorMessage unless ($quiet);
72 $conn->exec("ROLLBACK");
73 return(-1);
74 }
75
76 # MAP oid --> tabname, keyname, key_type
77 $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
78 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
79 ", pg_type pgt".
80 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
81 " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
82 if ($result->resultStatus ne PGRES_TUPLES_OK)
83 {
84 print STDERR $conn->errorMessage unless ($quiet);
85 $conn->exec("ROLLBACK");
86 return(-1);
87 }
88
89 while (@row = $result->fetchrow)
90 {
91 # printf "$row[0], $row[1], $row[2]\n";
92 if (ref($onlytables) eq 'HASH') {
93 next unless (exists $onlytables->{$row[1]});
94 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
95 }
96 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
97 }
98
99 # Read last succeeded sync
100 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
101 " where server = $server AND syncid = (select max(syncid) from" .
102 " _RSERV_SYNC_ where server = $server AND status > 0)";
103
104 printf "$sql\n" if $debug;
105
106 $result = $conn->exec($sql);
107 if ($result->resultStatus ne PGRES_TUPLES_OK)
108 {
109 print STDERR $conn->errorMessage unless ($quiet);
110 $conn->exec("ROLLBACK");
111 return(-1);
112 }
113
114 my @lastsync = $result->fetchrow;
115
116 my $sinfo = "";
117 if (@lastsync && $lastsync[3] ne '') # sync info
118 {
119 $sinfo = "and (l.logid >= $lastsync[3]";
120 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
121 $sinfo .= ")";
122 }
123
124 my $havedeal = 0;
125
126 # DELETED rows
127 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
128 " where l.delete = 1 $sinfo order by l.reloid";
129
130 printf "$sql\n" if $debug;
131
132 $result = $conn->exec($sql);
133 if ($result->resultStatus ne PGRES_TUPLES_OK)
134 {
135 print STDERR $conn->errorMessage unless ($quiet);
136 $conn->exec("ROLLBACK");
137 return(-1);
138 }
139
140 my $lastoid = '';
141 while (@row = $result->fetchrow)
142 {
143 next unless exists $Mtables{$row[0]};
144 next unless exists $Stables{$Mtables{$row[0]}[0]};
145
146 if ($lastoid != $row[0])
147 {
148 if ($lastoid eq '')
149 {
150 my $syncid = GetSYNCID($conn, $outf);
151 return($syncid) if $syncid < 0;
152 $havedeal = 1;
153 }
154 else
155 {
156 printf $outf "\\.\n";
157 }
158 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
159 $lastoid = $row[0];
160 }
161 if (! defined $row[1])
162 {
163 print STDERR "NULL key\n" unless ($quiet);
164 $conn->exec("ROLLBACK");
165 return(-2);
166 }
167 printf $outf "%s\n", OutputValue($row[1]);
168 }
169 printf $outf "\\.\n" if $lastoid ne '';
170
171 # UPDATED rows
172
173 my ($taboid, $tabname, $tabkey);
174 foreach $taboid (keys %Mtables)
175 {
176 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
177 next unless exists $Stables{$tabname};
178
179 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
180
181 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
182 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
183 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
184
185 printf "$sql\n" if $debug;
186
187 $result = $conn->exec($sql);
188 if ($result->resultStatus ne PGRES_TUPLES_OK)
189 {
190 printf $outf "-- ERROR\n" if $havedeal;
191 print STDERR $conn->errorMessage unless ($quiet);
192 $conn->exec("ROLLBACK");
193 return(-1);
194 }
195 next if $result->ntuples <= 0;
196 if (! $havedeal)
197 {
198 my $syncid = GetSYNCID($conn, $outf);
199 return($syncid) if $syncid < 0;
200 $havedeal = 1;
201 }
202 printf $outf "-- UPDATE $tabname\n";
203 printf "-- UPDATE $tabname\n" if $debug;
204 while (@row = $result->fetchrow)
205 {
206 for (my $i = 0; $i <= $#row; $i++)
207 {
208 printf $outf " " if $i;
209 printf " " if $i && $debug;
210 printf $outf "%s", OutputValue($row[$i]);
211 printf "%s", OutputValue($row[$i]) if $debug;;
212 }
213 printf $outf "\n";
214 printf "\n" if $debug;
215 }
216 printf $outf "\\.\n";
217 printf "\\.\n" if $debug;;
218 }
219
220 # INSERTED rows
221
222 foreach $taboid (keys %Mtables)
223 {
224 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
225 next unless exists $Stables{$tabname};
226
227 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
228
229 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
230 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
231 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
232
233 printf "$sql\n" if $debug;
234
235 $result = $conn->exec($sql);
236 if ($result->resultStatus ne PGRES_TUPLES_OK)
237 {
238 printf $outf "-- ERROR\n" if $havedeal;
239 print STDERR $conn->errorMessage unless ($quiet);
240 $conn->exec("ROLLBACK");
241 return(-1);
242 }
243 next if $result->ntuples <= 0;
244 if (! $havedeal)
245 {
246 my $syncid = GetSYNCID($conn, $outf);
247 return($syncid) if $syncid < 0;
248 $havedeal = 1;
249 }
250 printf $outf "-- INSERT $tabname\n";
251 printf "-- INSERT $tabname\n" if $debug;
252 while (@row = $result->fetchrow)
253 {
254 for (my $i = 0; $i <= $#row; $i++)
255 {
256 printf $outf " " if $i;
257 printf " " if $i && $debug;
258 printf $outf "%s", OutputValue($row[$i]);
259 printf "%s", OutputValue($row[$i]) if $debug;;
260 }
261 printf $outf "\n";
262 printf "\n" if $debug;
263 }
264 printf $outf "\\.\n";
265 printf "\\.\n" if $debug;;
266 }
267
268
269 unless ($havedeal)
270 {
271 $conn->exec("ROLLBACK");
272 return(0);
273 }
274
275 # Remember this snapshot info
276 $result = $conn->exec("select _rserv_sync_($server)");
277 if ($result->resultStatus ne PGRES_TUPLES_OK)
278 {
279 printf $outf "-- ERROR\n";
280 print STDERR $conn->errorMessage unless ($quiet);
281 $conn->exec("ROLLBACK");
282 return(-1);
283 }
284
285 $result = $conn->exec("COMMIT");
286 if ($result->resultStatus ne PGRES_COMMAND_OK)
287 {
288 printf $outf "-- ERROR\n";
289 print STDERR $conn->errorMessage unless ($quiet);
290 $conn->exec("ROLLBACK");
291 return(-1);
292 }
293 printf $outf "-- OK\n";
294 printf "-- OK\n" if $debug;
295
296 return(1);
297
298 }
299
300 sub OutputValue
301 {
302 my ($val) = @_; # @_[0];
303
304 return("\\N") unless defined $val;
305
306 $val =~ s/\\/\\\\/g;
307 $val =~ s/ /\\011/g;
308 $val =~ s/\n/\\012/g;
309 $val =~ s/\'/\\047/g;
310
311 return($val);
312 }
313
314 # Get syncid for new snapshot
315 sub GetSYNCID
316 {
317 my ($conn, $outf) = @_; # (@_[0], @_[1]);
318
319 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
320 if ($result->resultStatus ne PGRES_TUPLES_OK)
321 {
322 print STDERR $conn->errorMessage unless ($quiet);
323 $conn->exec("ROLLBACK");
324 return(-1);
325 }
326
327 my @row = $result->fetchrow;
328
329 printf $outf "-- SYNCID $row[0]\n";
330 printf "-- SYNCID $row[0]\n" if $debug;
331 return($row[0]);
332 }
333
334
335 sub CleanLog
336 {
337 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
338
339 my $result = $conn->exec("BEGIN");
340 if ($result->resultStatus ne PGRES_COMMAND_OK)
341 {
342 print STDERR $conn->errorMessage unless ($quiet);
343 $conn->exec("ROLLBACK");
344 return(-1);
345 }
346
347 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
348 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
349 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
350
351 printf "$sql\n" if $debug;
352
353 $result = $conn->exec($sql);
354 if ($result->resultStatus ne PGRES_TUPLES_OK)
355 {
356 print STDERR $conn->errorMessage unless ($quiet);
357 return(-1);
358 }
359 my $maxid = '';
360 my %active = ();
361 while (my @row = $result->fetchrow)
362 {
363 $maxid = $row[0] if $maxid eq '';
364 last if $row[0] > $maxid;
365 my @ids = split(/[ ]+,[ ]+/, $row[1]);
366 foreach my $aid (@ids)
367 {
368 $active{$aid} = 1 unless exists $active{$aid};
369 }
370 }
371 if ($maxid eq '')
372 {
373 print STDERR "No Sync IDs\n" unless ($quiet);
374 return(0);
375 }
376 my $alist = join(',', keys %active);
377 my $sinfo = "logid < $maxid";
378 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
379 #if (ref($onlytables) eq 'HASH') {
380 # foreach my $onlytable (keys %{$onlytables}) {
381 # $sinfo
382 # }
383 #}
384 $sql = "delete from _RSERV_LOG_ where " .
385 "logtime < now() - '$howold second'::interval AND $sinfo";
386
387 printf "$sql\n" if $debug;
388
389 $result = $conn->exec($sql);
390 if ($result->resultStatus ne PGRES_COMMAND_OK)
391 {
392 print STDERR $conn->errorMessage unless ($quiet);
393 $conn->exec("ROLLBACK");
394 return(-1);
395 }
396 $maxid = $result->cmdTuples;
397
398 $result = $conn->exec("COMMIT");
399 if ($result->resultStatus ne PGRES_COMMAND_OK)
400 {
401 print STDERR $conn->errorMessage unless ($quiet);
402 $conn->exec("ROLLBACK");
403 return(-1);
404 }
405
406 return($maxid);
407 }
408
409 sub ApplySnapshot
410 {
411 my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
412
413 my $result = $conn->exec("BEGIN");
414 if ($result->resultStatus ne PGRES_COMMAND_OK)
415 {
416 print STDERR $conn->errorMessage unless ($quiet);
417 $conn->exec("ROLLBACK");
418 return(-1);
419 }
420
421 $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
422 if ($result->resultStatus ne PGRES_COMMAND_OK)
423 {
424 print STDERR $conn->errorMessage unless ($quiet);
425 $conn->exec("ROLLBACK");
426 return(-1);
427 }
428
429 # MAP name --> oid, keyname, keynum
430 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
431 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
432 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
433 " AND pga.attnum = rt.key";
434
435 $result = $conn->exec($sql);
436 if ($result->resultStatus ne PGRES_TUPLES_OK)
437 {
438 print STDERR $conn->errorMessage unless ($quiet);
439 $conn->exec("ROLLBACK");
440 return(-1);
441 }
442 %Stables = ();
443 while (my @row = $result->fetchrow)
444 {
445 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
446 if (ref($onlytables) eq 'HASH') {
447 next unless (exists $onlytables->{$row[1]});
448 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
449 }
450 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
451 }
452
453 my $ok = 0;
454 my $syncid = '';
455 while(<$inpf>)
456 {
457 $_ =~ s/\n//;
458 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
459 if ($cmt ne '--')
460 {
461 printf STDERR "Invalid format\n" unless ($quiet);
462 $conn->exec("ROLLBACK");
463 return(-2);
464 }
465 if ($cmd eq 'DELETE')
466 {
467 if ($syncid eq '')
468 {
469 printf STDERR "Sync ID unspecified\n" unless ($quiet);
470 $conn->exec("ROLLBACK");
471 return(-2);
472 }
473 $result = DoDelete($conn, $inpf, $prm);
474 if ($result)
475 {
476 $conn->exec("ROLLBACK");
477 return($result);
478 }
479 }
480 elsif ($cmd eq 'INSERT')
481 {
482 if ($syncid eq '')
483 {
484 printf STDERR "Sync ID unspecified\n" unless ($quiet);
485 $conn->exec("ROLLBACK");
486 return(-2);
487 }
488 $result = DoInsert($conn, $inpf, $prm);
489 if ($result)
490 {
491 $conn->exec("ROLLBACK");
492 return($result);
493 }
494 }
495 elsif ($cmd eq 'UPDATE')
496 {
497 if ($syncid eq '')
498 {
499 printf STDERR "Sync ID unspecified\n" unless ($quiet);
500 $conn->exec("ROLLBACK");
501 return(-2);
502 }
503 $result = DoUpdate($conn, $inpf, $prm);
504 if ($result)
505 {
506 $conn->exec("ROLLBACK");
507 return($result);
508 }
509 }
510 elsif ($cmd eq 'SYNCID')
511 {
512 if ($syncid ne '')
513 {
514 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
515 $conn->exec("ROLLBACK");
516 return(-2);
517 }
518 if ($prm !~ /^\d+$/)
519 {
520 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
521 $conn->exec("ROLLBACK");
522 return(-2);
523 }
524 $syncid = $prm;
525
526 printf STDERR "Sync ID $syncid\n" unless ($quiet);
527
528 $result = $conn->exec("select syncid, synctime from " .
529 "_RSERV_SLAVE_SYNC_ where syncid = " .
530 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
531 if ($result->resultStatus ne PGRES_TUPLES_OK)
532 {
533 print STDERR $conn->errorMessage unless ($quiet);
534 $conn->exec("ROLLBACK");
535 return(-1);
536 }
537 my @row = $result->fetchrow;
538 if (! defined $row[0])
539 {
540 $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".
541 "(syncid, synctime) values ($syncid, now())");
542 }
543 elsif ($row[0] >= $prm)
544 {
545 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
546 $conn->exec("ROLLBACK");
547 return(0);
548 }
549 else
550 {
551 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
552 " set syncid = $syncid, synctime = now()");
553 }
554 if ($result->resultStatus ne PGRES_COMMAND_OK)
555 {
556 print STDERR $conn->errorMessage unless ($quiet);
557 $conn->exec("ROLLBACK");
558 return(-1);
559 }
560 }
561 elsif ($cmd eq 'OK')
562 {
563 $ok = 1;
564 last;
565 }
566 elsif ($cmd eq 'ERROR')
567 {
568 printf STDERR "ERROR signaled\n" unless ($quiet);
569 $conn->exec("ROLLBACK");
570 return(-2);
571 }
572 else
573 {
574 printf STDERR "Unknown command $cmd\n" unless ($quiet);
575 $conn->exec("ROLLBACK");
576 return(-2);
577 }
578 }
579
580 if (! $ok)
581 {
582 printf STDERR "No OK flag in input\n" unless ($quiet);
583 $conn->exec("ROLLBACK");
584 return(-2);
585 }
586
587 $result = $conn->exec("COMMIT");
588 if ($result->resultStatus ne PGRES_COMMAND_OK)
589 {
590 print STDERR $conn->errorMessage unless ($quiet);
591 $conn->exec("ROLLBACK");
592 return(-1);
593 }
594
595 return(1);
596 }
597
598 sub DoDelete
599 {
600 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
601
602 # only delete tables that the slave wants
603 if (! defined($Stables{$tabname})) {
604 print "Not configured to delete rows from table $tabname\n" unless $quiet;
605 while (<$inpf>) {
606 my $istring = $_;
607 $istring =~ s/\n//;
608 last if ($istring eq '\.');
609 }
610 return(0);
611 }
612
613 my $ok = 0;
614 while(<$inpf>)
615 {
616 if ($_ !~ /\n$/)
617 {
618 printf STDERR "Invalid format\n" unless ($quiet);
619 return(-2);
620 }
621 my $key = $_;
622 $key =~ s/\n//;
623 if ($key eq '\.')
624 {
625 $ok = 1;
626 last;
627 }
628
629 my $sql = "delete from \"$tabname\" where ".
630 "\"$Stables{$tabname}->[1]\" = '$key'";
631
632 printf "$sql\n" if $debug;
633
634 my $result = $conn->exec($sql);
635 if ($result->resultStatus ne PGRES_COMMAND_OK)
636 {
637 print STDERR $conn->errorMessage unless ($quiet);
638 return(-1);
639 }
640 }
641
642 if (! $ok)
643 {
644 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
645 return(-2);
646 }
647
648 return(0);
649 }
650
651
652 sub DoUpdate
653 {
654 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
655
656 # only update the tables that the slave wants
657 if (! defined($Stables{$tabname})) {
658 print "Not configured to update rows from table $tabname\n" unless $quiet;
659 while (<$inpf>) {
660 my $istring = $_;
661 $istring =~ s/\n//;
662 last if ($istring eq '\.');
663 }
664 return(0);
665 }
666
667 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
668
669 my @CopyBuf = ();
670 my $CBufLen = 0;
671 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
672
673 my $sql = "select attnum, attname from pg_attribute" .
674 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
675
676 my $result = $conn->exec($sql);
677 if ($result->resultStatus ne PGRES_TUPLES_OK)
678 {
679 print STDERR $conn->errorMessage unless ($quiet);
680 return(-1);
681 }
682
683 my @anames = ();
684 while (my @row = $result->fetchrow)
685 {
686 $anames[$row[0]] = $row[1];
687 }
688
689 my $istring;
690 my $ok = 0;
691 while(<$inpf>)
692 {
693 if ($_ !~ /\n$/)
694 {
695 printf STDERR "Invalid format\n" unless ($quiet);
696 return(-2);
697 }
698 $istring = $_;
699 $istring =~ s/\n//;
700 if ($istring eq '\.')
701 {
702 $ok = 1;
703 last;
704 }
705 my @vals = split(/ /, $istring);
706 if ($oidkey)
707 {
708 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
709 {
710 printf STDERR "Invalid OID\n" unless ($quiet);
711 return(-2);
712 }
713 $oidkey = $vals[0];
714 }
715 else
716 {
717 unshift @vals, '';
718 }
719
720 $sql = "update \"$tabname\" set ";
721 my $ocnt = 0;
722 for (my $i = 1; $i <= $#anames; $i++)
723 {
724 if ($vals[$i] eq '\N')
725 {
726 if ($i == $Stables{$tabname}->[2])
727 {
728 printf STDERR "NULL key\n" unless ($quiet);
729 return(-2);
730 }
731 $vals[$i] = 'null';
732 }
733 else
734 {
735 $vals[$i] = "'" . $vals[$i] . "'";
736 next if $i == $Stables{$tabname}->[2];
737 }
738 $ocnt++;
739 $sql .= ', ' if $ocnt > 1;
740 $sql .= "\"$anames[$i]\" = $vals[$i]";
741 }
742 if ($oidkey)
743 {
744 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
745 }
746 else
747 {
748 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
749 $vals[$Stables{$tabname}->[2]];
750 }
751
752 printf "$sql\n" if $debug;
753
754 $result = $conn->exec($sql);
755
756 if ($result->resultStatus ne PGRES_COMMAND_OK)
757 {
758 print STDERR $conn->errorMessage unless ($quiet);
759 return(-1);
760 }
761 next if $result->cmdTuples == 1; # updated
762
763 if ($result->cmdTuples > 1)
764 {
765 printf STDERR "Duplicate keys\n" unless ($quiet);
766 return(-2);
767 }
768
769 # no key - copy
770 push @CopyBuf, "$istring\n";
771 $CBufLen += length($istring);
772
773 if ($CBufLen >= $CBufMax)
774 {
775 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
776 return($result) if $result;
777 @CopyBuf = ();
778 $CBufLen = 0;
779 }
780 }
781
782 if (! $ok)
783 {
784 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
785 return(-2);
786 }
787
788 if ($CBufLen)
789 {
790 print "@CopyBuf\n" if $debug;
791 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
792 return($result) if $result;
793 }
794
795 return(0);
796 }
797
798 sub DoInsert
799 {
800 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
801
802 # only insert rows into tables that the slave wants
803 if (! defined($Stables{$tabname})) {
804 print "Not configured to insert rows from table $tabname\n" unless $quiet;
805 while (<$inpf>) {
806 my $istring = $_;
807 $istring =~ s/\n//;
808 last if ($istring eq '\.');
809 }
810 return(0);
811 }
812
813 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
814
815 my @CopyBuf = ();
816 my $CBufLen = 0;
817 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
818
819 my $istring;
820 my $ok = 0;
821 while(<$inpf>)
822 {
823 if ($_ !~ /\n$/)
824 {
825 printf STDERR "Invalid format\n" unless ($quiet);
826 return(-2);
827 }
828 $istring = $_;
829 $istring =~ s/\n//;
830 if ($istring eq '\.')
831 {
832 $ok = 1;
833 last;
834 }
835
836 # no key - copy
837 push @CopyBuf, "$istring\n";
838 $CBufLen += length($istring);
839
840 if ($CBufLen >= $CBufMax)
841 {
842 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
843 return($result) if $result;
844 @CopyBuf = ();
845 $CBufLen = 0;
846 }
847 }
848
849 if (! $ok)
850 {
851 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
852 return(-2);
853 }
854
855 if ($CBufLen)
856 {
857 print "@CopyBuf\n" if $debug;
858 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
859 return($result) if $result;
860 }
861
862 return(0);
863 }
864
865
866 sub DoCopy
867 {
868 my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
869
870 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
871 "FROM STDIN";
872 my $result = $conn->exec($sql);
873 if ($result->resultStatus ne PGRES_COPY_IN)
874 {
875 print STDERR $conn->errorMessage unless ($quiet);
876 return(-1);
877 }
878
879 foreach my $str (@{$CBuf})
880 {
881 $conn->putline($str);
882 }
883
884 $conn->putline("\\.\n");
885
886 if ($conn->endcopy)
887 {
888 print STDERR $conn->errorMessage unless ($quiet);
889 return(-1);
890 }
891
892 return(0);
893 }
894
895
896 #
897 # Returns last SyncID applied on Slave
898 #
899 sub GetSyncID
900 {
901 my ($conn) = @_; # (@_[0]);
902
903 my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
904 if ($result->resultStatus ne PGRES_TUPLES_OK)
905 {
906 print STDERR $conn->errorMessage unless ($quiet);
907 return(-1);
908 }
909 my @row = $result->fetchrow;
910 return(undef) unless defined $row[0]; # null
911 return($row[0]);
912 }
913
914 #
915 # Updates _RSERV_SYNC_ on Master with Slave SyncID
916 #
917 sub SyncSyncID
918 {
919 my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
920
921 my $result = $conn->exec("BEGIN");
922 if ($result->resultStatus ne PGRES_COMMAND_OK)
923 {
924 print STDERR $conn->errorMessage unless ($quiet);
925 $conn->exec("ROLLBACK");
926 return(-1);
927 }
928
929 $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
930 " where server = $server AND syncid = $syncid" .
931 " for update");
932 if ($result->resultStatus ne PGRES_TUPLES_OK)
933 {
934 print STDERR $conn->errorMessage unless ($quiet);
935 $conn->exec("ROLLBACK");
936 return(-1);
937 }
938 my @row = $result->fetchrow;
939 if (! defined $row[0])
940 {
941 printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
942 $conn->exec("ROLLBACK");
943 return(0);
944 }
945 if ($row[1] > 0)
946 {
947 printf STDERR "SyncID $syncid for server ".
948 "$server already updated\n" unless ($quiet);
949 $conn->exec("ROLLBACK");
950 return(0);
951 }
952 $result = $conn->exec("update _RSERV_SYNC_" .
953 " set synctime = now(), status = 1" .
954 " where server = $server AND syncid = $syncid");
955 if ($result->resultStatus ne PGRES_COMMAND_OK)
956 {
957 print STDERR $conn->errorMessage unless ($quiet);
958 $conn->exec("ROLLBACK");
959 return(-1);
960 }
961 $result = $conn->exec("delete from _RSERV_SYNC_" .
962 " where server = $server AND syncid < $syncid");
963 if ($result->resultStatus ne PGRES_COMMAND_OK)
964 {
965 print STDERR $conn->errorMessage unless ($quiet);
966 $conn->exec("ROLLBACK");
967 return(-1);
968 }
969
970 $result = $conn->exec("COMMIT");
971 if ($result->resultStatus ne PGRES_COMMAND_OK)
972 {
973 print STDERR $conn->errorMessage unless ($quiet);
974 $conn->exec("ROLLBACK");
975 return(-1);
976 }
977
978 return(1);
979 }
980
981 1;

  ViewVC Help
Powered by ViewVC 1.1.26