/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.4 - (show annotations)
Wed Aug 6 00:28:29 2003 UTC (20 years, 9 months ago) by dpavlin
Branch: MAIN
CVS Tags: before_onlytables
Changes since 1.3: +1 -1 lines
fix warnings

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10 @EXPORT_OK = qw();
11 use strict;
12 use Pg;
13
14 my $debug = 0;
15 my $quiet = 1;
16
17 my %Mtables = ();
18 my %Stables = ();
19
20 sub GetSlaveId
21 {
22 my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23
24 my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25 " host='$slaveHost' AND dbase='$slaveDB'");
26
27 if ($result->resultStatus ne PGRES_TUPLES_OK)
28 {
29 print STDERR $conn->errorMessage unless ($quiet);
30 return(-1);
31 }
32
33 if ($result->cmdTuples && $result->cmdTuples > 1)
34 {
35 printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
36 return(-2);
37 }
38
39 my @row = $result->fetchrow;
40
41 return $row[0];
42 }
43
44 sub PrepareSnapshot
45 {
46 my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]);
47
48 # first, we must know for wich tables the slave subscribed
49 my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
50 if ($result->resultStatus ne PGRES_TUPLES_OK)
51 {
52 print STDERR $conn->errorMessage unless ($quiet);
53 return(-1);
54 }
55
56 my @row;
57 while (@row = $result->fetchrow) {
58 $Stables{$row[0]} = 1;
59 }
60
61 $result = $conn->exec("BEGIN");
62 if ($result->resultStatus ne PGRES_COMMAND_OK)
63 {
64 print STDERR $conn->errorMessage unless ($quiet);
65 $conn->exec("ROLLBACK");
66 return(-1);
67 }
68 $result = $conn->exec("set transaction isolation level serializable");
69 if ($result->resultStatus ne PGRES_COMMAND_OK)
70 {
71 print STDERR $conn->errorMessage unless ($quiet);
72 $conn->exec("ROLLBACK");
73 return(-1);
74 }
75
76 # MAP oid --> tabname, keyname, key_type
77 $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
78 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
79 ", pg_type pgt".
80 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
81 " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
82 if ($result->resultStatus ne PGRES_TUPLES_OK)
83 {
84 print STDERR $conn->errorMessage unless ($quiet);
85 $conn->exec("ROLLBACK");
86 return(-1);
87 }
88
89 while (@row = $result->fetchrow)
90 {
91 # printf "$row[0], $row[1], $row[2]\n";
92 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
93 }
94
95 # Read last succeeded sync
96 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
97 " where server = $server AND syncid = (select max(syncid) from" .
98 " _RSERV_SYNC_ where server = $server AND status > 0)";
99
100 printf "$sql\n" if $debug;
101
102 $result = $conn->exec($sql);
103 if ($result->resultStatus ne PGRES_TUPLES_OK)
104 {
105 print STDERR $conn->errorMessage unless ($quiet);
106 $conn->exec("ROLLBACK");
107 return(-1);
108 }
109
110 my @lastsync = $result->fetchrow;
111
112 my $sinfo = "";
113 if (@lastsync && $lastsync[3] ne '') # sync info
114 {
115 $sinfo = "and (l.logid >= $lastsync[3]";
116 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
117 $sinfo .= ")";
118 }
119
120 my $havedeal = 0;
121
122 # DELETED rows
123 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
124 " where l.delete = 1 $sinfo order by l.reloid";
125
126 printf "$sql\n" if $debug;
127
128 $result = $conn->exec($sql);
129 if ($result->resultStatus ne PGRES_TUPLES_OK)
130 {
131 print STDERR $conn->errorMessage unless ($quiet);
132 $conn->exec("ROLLBACK");
133 return(-1);
134 }
135
136 my $lastoid = '';
137 while (@row = $result->fetchrow)
138 {
139 next unless exists $Mtables{$row[0]};
140 next unless exists $Stables{$Mtables{$row[0]}[0]};
141
142 if ($lastoid != $row[0])
143 {
144 if ($lastoid eq '')
145 {
146 my $syncid = GetSYNCID($conn, $outf);
147 return($syncid) if $syncid < 0;
148 $havedeal = 1;
149 }
150 else
151 {
152 printf $outf "\\.\n";
153 }
154 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
155 $lastoid = $row[0];
156 }
157 if (! defined $row[1])
158 {
159 print STDERR "NULL key\n" unless ($quiet);
160 $conn->exec("ROLLBACK");
161 return(-2);
162 }
163 printf $outf "%s\n", OutputValue($row[1]);
164 }
165 printf $outf "\\.\n" if $lastoid ne '';
166
167 # UPDATED rows
168
169 my ($taboid, $tabname, $tabkey);
170 foreach $taboid (keys %Mtables)
171 {
172 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
173 next unless exists $Stables{$tabname};
174
175 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
176
177 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
178 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
179 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
180
181 printf "$sql\n" if $debug;
182
183 $result = $conn->exec($sql);
184 if ($result->resultStatus ne PGRES_TUPLES_OK)
185 {
186 printf $outf "-- ERROR\n" if $havedeal;
187 print STDERR $conn->errorMessage unless ($quiet);
188 $conn->exec("ROLLBACK");
189 return(-1);
190 }
191 next if $result->ntuples <= 0;
192 if (! $havedeal)
193 {
194 my $syncid = GetSYNCID($conn, $outf);
195 return($syncid) if $syncid < 0;
196 $havedeal = 1;
197 }
198 printf $outf "-- UPDATE $tabname\n";
199 printf "-- UPDATE $tabname\n" if $debug;
200 while (@row = $result->fetchrow)
201 {
202 for (my $i = 0; $i <= $#row; $i++)
203 {
204 printf $outf " " if $i;
205 printf " " if $i && $debug;
206 printf $outf "%s", OutputValue($row[$i]);
207 printf "%s", OutputValue($row[$i]) if $debug;;
208 }
209 printf $outf "\n";
210 printf "\n" if $debug;
211 }
212 printf $outf "\\.\n";
213 printf "\\.\n" if $debug;;
214 }
215
216 # INSERTED rows
217
218 foreach $taboid (keys %Mtables)
219 {
220 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
221 next unless exists $Stables{$tabname};
222
223 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
224
225 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
226 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
227 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
228
229 printf "$sql\n" if $debug;
230
231 $result = $conn->exec($sql);
232 if ($result->resultStatus ne PGRES_TUPLES_OK)
233 {
234 printf $outf "-- ERROR\n" if $havedeal;
235 print STDERR $conn->errorMessage unless ($quiet);
236 $conn->exec("ROLLBACK");
237 return(-1);
238 }
239 next if $result->ntuples <= 0;
240 if (! $havedeal)
241 {
242 my $syncid = GetSYNCID($conn, $outf);
243 return($syncid) if $syncid < 0;
244 $havedeal = 1;
245 }
246 printf $outf "-- INSERT $tabname\n";
247 printf "-- INSERT $tabname\n" if $debug;
248 while (@row = $result->fetchrow)
249 {
250 for (my $i = 0; $i <= $#row; $i++)
251 {
252 printf $outf " " if $i;
253 printf " " if $i && $debug;
254 printf $outf "%s", OutputValue($row[$i]);
255 printf "%s", OutputValue($row[$i]) if $debug;;
256 }
257 printf $outf "\n";
258 printf "\n" if $debug;
259 }
260 printf $outf "\\.\n";
261 printf "\\.\n" if $debug;;
262 }
263
264
265 unless ($havedeal)
266 {
267 $conn->exec("ROLLBACK");
268 return(0);
269 }
270
271 # Remember this snapshot info
272 $result = $conn->exec("select _rserv_sync_($server)");
273 if ($result->resultStatus ne PGRES_TUPLES_OK)
274 {
275 printf $outf "-- ERROR\n";
276 print STDERR $conn->errorMessage unless ($quiet);
277 $conn->exec("ROLLBACK");
278 return(-1);
279 }
280
281 $result = $conn->exec("COMMIT");
282 if ($result->resultStatus ne PGRES_COMMAND_OK)
283 {
284 printf $outf "-- ERROR\n";
285 print STDERR $conn->errorMessage unless ($quiet);
286 $conn->exec("ROLLBACK");
287 return(-1);
288 }
289 printf $outf "-- OK\n";
290 printf "-- OK\n" if $debug;
291
292 return(1);
293
294 }
295
296 sub OutputValue
297 {
298 my ($val) = @_; # @_[0];
299
300 return("\\N") unless defined $val;
301
302 $val =~ s/\\/\\\\/g;
303 $val =~ s/ /\\011/g;
304 $val =~ s/\n/\\012/g;
305 $val =~ s/\'/\\047/g;
306
307 return($val);
308 }
309
310 # Get syncid for new snapshot
311 sub GetSYNCID
312 {
313 my ($conn, $outf) = @_; # (@_[0], @_[1]);
314
315 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
316 if ($result->resultStatus ne PGRES_TUPLES_OK)
317 {
318 print STDERR $conn->errorMessage unless ($quiet);
319 $conn->exec("ROLLBACK");
320 return(-1);
321 }
322
323 my @row = $result->fetchrow;
324
325 printf $outf "-- SYNCID $row[0]\n";
326 printf "-- SYNCID $row[0]\n" if $debug;
327 return($row[0]);
328 }
329
330
331 sub CleanLog
332 {
333 my ($conn, $howold) = @_; # (@_[0], @_[1]);
334
335 my $result = $conn->exec("BEGIN");
336 if ($result->resultStatus ne PGRES_COMMAND_OK)
337 {
338 print STDERR $conn->errorMessage unless ($quiet);
339 $conn->exec("ROLLBACK");
340 return(-1);
341 }
342
343 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
344 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
345 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
346
347 printf "$sql\n" if $debug;
348
349 $result = $conn->exec($sql);
350 if ($result->resultStatus ne PGRES_TUPLES_OK)
351 {
352 print STDERR $conn->errorMessage unless ($quiet);
353 return(-1);
354 }
355 my $maxid = '';
356 my %active = ();
357 while (my @row = $result->fetchrow)
358 {
359 $maxid = $row[0] if $maxid eq '';
360 last if $row[0] > $maxid;
361 my @ids = split(/[ ]+,[ ]+/, $row[1]);
362 foreach my $aid (@ids)
363 {
364 $active{$aid} = 1 unless exists $active{$aid};
365 }
366 }
367 if ($maxid eq '')
368 {
369 print STDERR "No Sync IDs\n" unless ($quiet);
370 return(0);
371 }
372 my $alist = join(',', keys %active);
373 my $sinfo = "logid < $maxid";
374 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
375
376 $sql = "delete from _RSERV_LOG_ where " .
377 "logtime < now() - '$howold second'::interval AND $sinfo";
378
379 printf "$sql\n" if $debug;
380
381 $result = $conn->exec($sql);
382 if ($result->resultStatus ne PGRES_COMMAND_OK)
383 {
384 print STDERR $conn->errorMessage unless ($quiet);
385 $conn->exec("ROLLBACK");
386 return(-1);
387 }
388 $maxid = $result->cmdTuples;
389
390 $result = $conn->exec("COMMIT");
391 if ($result->resultStatus ne PGRES_COMMAND_OK)
392 {
393 print STDERR $conn->errorMessage unless ($quiet);
394 $conn->exec("ROLLBACK");
395 return(-1);
396 }
397
398 return($maxid);
399 }
400
401 sub ApplySnapshot
402 {
403 my ($conn, $inpf) = @_; # (@_[0], @_[1]);
404
405 my $result = $conn->exec("BEGIN");
406 if ($result->resultStatus ne PGRES_COMMAND_OK)
407 {
408 print STDERR $conn->errorMessage unless ($quiet);
409 $conn->exec("ROLLBACK");
410 return(-1);
411 }
412
413 $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
414 if ($result->resultStatus ne PGRES_COMMAND_OK)
415 {
416 print STDERR $conn->errorMessage unless ($quiet);
417 $conn->exec("ROLLBACK");
418 return(-1);
419 }
420
421 # MAP name --> oid, keyname, keynum
422 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
423 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
424 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
425 " AND pga.attnum = rt.key";
426
427 $result = $conn->exec($sql);
428 if ($result->resultStatus ne PGRES_TUPLES_OK)
429 {
430 print STDERR $conn->errorMessage unless ($quiet);
431 $conn->exec("ROLLBACK");
432 return(-1);
433 }
434 %Stables = ();
435 while (my @row = $result->fetchrow)
436 {
437 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
438 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
439 }
440
441 my $ok = 0;
442 my $syncid = '';
443 while(<$inpf>)
444 {
445 $_ =~ s/\n//;
446 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
447 if ($cmt ne '--')
448 {
449 printf STDERR "Invalid format\n" unless ($quiet);
450 $conn->exec("ROLLBACK");
451 return(-2);
452 }
453 if ($cmd eq 'DELETE')
454 {
455 if ($syncid eq '')
456 {
457 printf STDERR "Sync ID unspecified\n" unless ($quiet);
458 $conn->exec("ROLLBACK");
459 return(-2);
460 }
461 $result = DoDelete($conn, $inpf, $prm);
462 if ($result)
463 {
464 $conn->exec("ROLLBACK");
465 return($result);
466 }
467 }
468 elsif ($cmd eq 'INSERT')
469 {
470 if ($syncid eq '')
471 {
472 printf STDERR "Sync ID unspecified\n" unless ($quiet);
473 $conn->exec("ROLLBACK");
474 return(-2);
475 }
476 $result = DoInsert($conn, $inpf, $prm);
477 if ($result)
478 {
479 $conn->exec("ROLLBACK");
480 return($result);
481 }
482 }
483 elsif ($cmd eq 'UPDATE')
484 {
485 if ($syncid eq '')
486 {
487 printf STDERR "Sync ID unspecified\n" unless ($quiet);
488 $conn->exec("ROLLBACK");
489 return(-2);
490 }
491 $result = DoUpdate($conn, $inpf, $prm);
492 if ($result)
493 {
494 $conn->exec("ROLLBACK");
495 return($result);
496 }
497 }
498 elsif ($cmd eq 'SYNCID')
499 {
500 if ($syncid ne '')
501 {
502 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
503 $conn->exec("ROLLBACK");
504 return(-2);
505 }
506 if ($prm !~ /^\d+$/)
507 {
508 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
509 $conn->exec("ROLLBACK");
510 return(-2);
511 }
512 $syncid = $prm;
513
514 printf STDERR "Sync ID $syncid\n" unless ($quiet);
515
516 $result = $conn->exec("select syncid, synctime from " .
517 "_RSERV_SLAVE_SYNC_ where syncid = " .
518 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
519 if ($result->resultStatus ne PGRES_TUPLES_OK)
520 {
521 print STDERR $conn->errorMessage unless ($quiet);
522 $conn->exec("ROLLBACK");
523 return(-1);
524 }
525 my @row = $result->fetchrow;
526 if (! defined $row[0])
527 {
528 $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".
529 "(syncid, synctime) values ($syncid, now())");
530 }
531 elsif ($row[0] >= $prm)
532 {
533 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
534 $conn->exec("ROLLBACK");
535 return(0);
536 }
537 else
538 {
539 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
540 " set syncid = $syncid, synctime = now()");
541 }
542 if ($result->resultStatus ne PGRES_COMMAND_OK)
543 {
544 print STDERR $conn->errorMessage unless ($quiet);
545 $conn->exec("ROLLBACK");
546 return(-1);
547 }
548 }
549 elsif ($cmd eq 'OK')
550 {
551 $ok = 1;
552 last;
553 }
554 elsif ($cmd eq 'ERROR')
555 {
556 printf STDERR "ERROR signaled\n" unless ($quiet);
557 $conn->exec("ROLLBACK");
558 return(-2);
559 }
560 else
561 {
562 printf STDERR "Unknown command $cmd\n" unless ($quiet);
563 $conn->exec("ROLLBACK");
564 return(-2);
565 }
566 }
567
568 if (! $ok)
569 {
570 printf STDERR "No OK flag in input\n" unless ($quiet);
571 $conn->exec("ROLLBACK");
572 return(-2);
573 }
574
575 $result = $conn->exec("COMMIT");
576 if ($result->resultStatus ne PGRES_COMMAND_OK)
577 {
578 print STDERR $conn->errorMessage unless ($quiet);
579 $conn->exec("ROLLBACK");
580 return(-1);
581 }
582
583 return(1);
584 }
585
586 sub DoDelete
587 {
588 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
589
590 # only delete tables that the slave wants
591 if (! defined($Stables{$tabname})) {
592 print "Not configured to delete rows from table $tabname\n" unless $quiet;
593 while (<$inpf>) {
594 my $istring = $_;
595 $istring =~ s/\n//;
596 last if ($istring eq '\.');
597 }
598 return(0);
599 }
600
601 my $ok = 0;
602 while(<$inpf>)
603 {
604 if ($_ !~ /\n$/)
605 {
606 printf STDERR "Invalid format\n" unless ($quiet);
607 return(-2);
608 }
609 my $key = $_;
610 $key =~ s/\n//;
611 if ($key eq '\.')
612 {
613 $ok = 1;
614 last;
615 }
616
617 my $sql = "delete from \"$tabname\" where ".
618 "\"$Stables{$tabname}->[1]\" = '$key'";
619
620 printf "$sql\n" if $debug;
621
622 my $result = $conn->exec($sql);
623 if ($result->resultStatus ne PGRES_COMMAND_OK)
624 {
625 print STDERR $conn->errorMessage unless ($quiet);
626 return(-1);
627 }
628 }
629
630 if (! $ok)
631 {
632 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
633 return(-2);
634 }
635
636 return(0);
637 }
638
639
640 sub DoUpdate
641 {
642 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
643
644 # only update the tables that the slave wants
645 if (! defined($Stables{$tabname})) {
646 print "Not configured to update rows from table $tabname\n" unless $quiet;
647 while (<$inpf>) {
648 my $istring = $_;
649 $istring =~ s/\n//;
650 last if ($istring eq '\.');
651 }
652 return(0);
653 }
654
655 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
656
657 my @CopyBuf = ();
658 my $CBufLen = 0;
659 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
660
661 my $sql = "select attnum, attname from pg_attribute" .
662 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
663
664 my $result = $conn->exec($sql);
665 if ($result->resultStatus ne PGRES_TUPLES_OK)
666 {
667 print STDERR $conn->errorMessage unless ($quiet);
668 return(-1);
669 }
670
671 my @anames = ();
672 while (my @row = $result->fetchrow)
673 {
674 $anames[$row[0]] = $row[1];
675 }
676
677 my $istring;
678 my $ok = 0;
679 while(<$inpf>)
680 {
681 if ($_ !~ /\n$/)
682 {
683 printf STDERR "Invalid format\n" unless ($quiet);
684 return(-2);
685 }
686 $istring = $_;
687 $istring =~ s/\n//;
688 if ($istring eq '\.')
689 {
690 $ok = 1;
691 last;
692 }
693 my @vals = split(/ /, $istring);
694 if ($oidkey)
695 {
696 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
697 {
698 printf STDERR "Invalid OID\n" unless ($quiet);
699 return(-2);
700 }
701 $oidkey = $vals[0];
702 }
703 else
704 {
705 unshift @vals, '';
706 }
707
708 $sql = "update \"$tabname\" set ";
709 my $ocnt = 0;
710 for (my $i = 1; $i <= $#anames; $i++)
711 {
712 if ($vals[$i] eq '\N')
713 {
714 if ($i == $Stables{$tabname}->[2])
715 {
716 printf STDERR "NULL key\n" unless ($quiet);
717 return(-2);
718 }
719 $vals[$i] = 'null';
720 }
721 else
722 {
723 $vals[$i] = "'" . $vals[$i] . "'";
724 next if $i == $Stables{$tabname}->[2];
725 }
726 $ocnt++;
727 $sql .= ', ' if $ocnt > 1;
728 $sql .= "\"$anames[$i]\" = $vals[$i]";
729 }
730 if ($oidkey)
731 {
732 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
733 }
734 else
735 {
736 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
737 $vals[$Stables{$tabname}->[2]];
738 }
739
740 printf "$sql\n" if $debug;
741
742 $result = $conn->exec($sql);
743
744 if ($result->resultStatus ne PGRES_COMMAND_OK)
745 {
746 print STDERR $conn->errorMessage unless ($quiet);
747 return(-1);
748 }
749 next if $result->cmdTuples == 1; # updated
750
751 if ($result->cmdTuples > 1)
752 {
753 printf STDERR "Duplicate keys\n" unless ($quiet);
754 return(-2);
755 }
756
757 # no key - copy
758 push @CopyBuf, "$istring\n";
759 $CBufLen += length($istring);
760
761 if ($CBufLen >= $CBufMax)
762 {
763 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
764 return($result) if $result;
765 @CopyBuf = ();
766 $CBufLen = 0;
767 }
768 }
769
770 if (! $ok)
771 {
772 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
773 return(-2);
774 }
775
776 if ($CBufLen)
777 {
778 print "@CopyBuf\n" if $debug;
779 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
780 return($result) if $result;
781 }
782
783 return(0);
784 }
785
786 sub DoInsert
787 {
788 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
789
790 # only insert rows into tables that the slave wants
791 if (! defined($Stables{$tabname})) {
792 print "Not configured to insert rows from table $tabname\n" unless $quiet;
793 while (<$inpf>) {
794 my $istring = $_;
795 $istring =~ s/\n//;
796 last if ($istring eq '\.');
797 }
798 return(0);
799 }
800
801 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
802
803 my @CopyBuf = ();
804 my $CBufLen = 0;
805 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
806
807 my $istring;
808 my $ok = 0;
809 while(<$inpf>)
810 {
811 if ($_ !~ /\n$/)
812 {
813 printf STDERR "Invalid format\n" unless ($quiet);
814 return(-2);
815 }
816 $istring = $_;
817 $istring =~ s/\n//;
818 if ($istring eq '\.')
819 {
820 $ok = 1;
821 last;
822 }
823
824 # no key - copy
825 push @CopyBuf, "$istring\n";
826 $CBufLen += length($istring);
827
828 if ($CBufLen >= $CBufMax)
829 {
830 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
831 return($result) if $result;
832 @CopyBuf = ();
833 $CBufLen = 0;
834 }
835 }
836
837 if (! $ok)
838 {
839 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
840 return(-2);
841 }
842
843 if ($CBufLen)
844 {
845 print "@CopyBuf\n" if $debug;
846 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
847 return($result) if $result;
848 }
849
850 return(0);
851 }
852
853
854 sub DoCopy
855 {
856 my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
857
858 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
859 "FROM STDIN";
860 my $result = $conn->exec($sql);
861 if ($result->resultStatus ne PGRES_COPY_IN)
862 {
863 print STDERR $conn->errorMessage unless ($quiet);
864 return(-1);
865 }
866
867 foreach my $str (@{$CBuf})
868 {
869 $conn->putline($str);
870 }
871
872 $conn->putline("\\.\n");
873
874 if ($conn->endcopy)
875 {
876 print STDERR $conn->errorMessage unless ($quiet);
877 return(-1);
878 }
879
880 return(0);
881 }
882
883
884 #
885 # Returns last SyncID applied on Slave
886 #
887 sub GetSyncID
888 {
889 my ($conn) = @_; # (@_[0]);
890
891 my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
892 if ($result->resultStatus ne PGRES_TUPLES_OK)
893 {
894 print STDERR $conn->errorMessage unless ($quiet);
895 return(-1);
896 }
897 my @row = $result->fetchrow;
898 return(undef) unless defined $row[0]; # null
899 return($row[0]);
900 }
901
902 #
903 # Updates _RSERV_SYNC_ on Master with Slave SyncID
904 #
905 sub SyncSyncID
906 {
907 my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
908
909 my $result = $conn->exec("BEGIN");
910 if ($result->resultStatus ne PGRES_COMMAND_OK)
911 {
912 print STDERR $conn->errorMessage unless ($quiet);
913 $conn->exec("ROLLBACK");
914 return(-1);
915 }
916
917 $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
918 " where server = $server AND syncid = $syncid" .
919 " for update");
920 if ($result->resultStatus ne PGRES_TUPLES_OK)
921 {
922 print STDERR $conn->errorMessage unless ($quiet);
923 $conn->exec("ROLLBACK");
924 return(-1);
925 }
926 my @row = $result->fetchrow;
927 if (! defined $row[0])
928 {
929 printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
930 $conn->exec("ROLLBACK");
931 return(0);
932 }
933 if ($row[1] > 0)
934 {
935 printf STDERR "SyncID $syncid for server ".
936 "$server already updated\n" unless ($quiet);
937 $conn->exec("ROLLBACK");
938 return(0);
939 }
940 $result = $conn->exec("update _RSERV_SYNC_" .
941 " set synctime = now(), status = 1" .
942 " where server = $server AND syncid = $syncid");
943 if ($result->resultStatus ne PGRES_COMMAND_OK)
944 {
945 print STDERR $conn->errorMessage unless ($quiet);
946 $conn->exec("ROLLBACK");
947 return(-1);
948 }
949 $result = $conn->exec("delete from _RSERV_SYNC_" .
950 " where server = $server AND syncid < $syncid");
951 if ($result->resultStatus ne PGRES_COMMAND_OK)
952 {
953 print STDERR $conn->errorMessage unless ($quiet);
954 $conn->exec("ROLLBACK");
955 return(-1);
956 }
957
958 $result = $conn->exec("COMMIT");
959 if ($result->resultStatus ne PGRES_COMMAND_OK)
960 {
961 print STDERR $conn->errorMessage unless ($quiet);
962 $conn->exec("ROLLBACK");
963 return(-1);
964 }
965
966 return(1);
967 }
968
969 1;

  ViewVC Help
Powered by ViewVC 1.1.26