/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.9 - (show annotations)
Wed Oct 29 18:13:22 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.8: +30 -15 lines
changes in preparation for multi-master

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10 @EXPORT_OK = qw();
11 use strict;
12 use Pg;
13
14 #my $debug = 0;
15 #my $quiet = 1;
16
17 my $debug = 1;
18 my $quiet = 0;
19
20 my %Mtables = ();
21 my %Stables = ();
22
23 sub GetSlaveId
24 {
25 my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
26
27 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
28 " host='$slaveHost' AND dbase='$slaveDB'");
29
30 if ($result->resultStatus ne PGRES_TUPLES_OK)
31 {
32 print STDERR $mconn->errorMessage unless ($quiet);
33 return(-1);
34 }
35
36 if ($result->cmdTuples && $result->cmdTuples > 1)
37 {
38 printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
39 return(-2);
40 }
41
42 my @row = $result->fetchrow;
43
44 return $row[0];
45 }
46
47 sub PrepareSnapshot
48 {
49 my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
50
51 if ($mserver == $sserver) {
52 print STDERR "master and slave numbers are same!\n";
53 return(-1);
54 }
55
56 print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
57
58 # first, we must know for wich tables the slave subscribed
59 my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
60 if ($result->resultStatus ne PGRES_TUPLES_OK)
61 {
62 print STDERR $mconn->errorMessage unless ($quiet);
63 return(-1);
64 }
65
66 my @row;
67 while (@row = $result->fetchrow) {
68 $Stables{$row[0]} = 1;
69 }
70
71 $result = $mconn->exec("BEGIN");
72 if ($result->resultStatus ne PGRES_COMMAND_OK)
73 {
74 print STDERR $mconn->errorMessage unless ($quiet);
75 $mconn->exec("ROLLBACK");
76 return(-1);
77 }
78 $result = $mconn->exec("set transaction isolation level serializable");
79 if ($result->resultStatus ne PGRES_COMMAND_OK)
80 {
81 print STDERR $mconn->errorMessage unless ($quiet);
82 $mconn->exec("ROLLBACK");
83 return(-1);
84 }
85
86 # MAP oid --> tabname, keyname, key_type
87 $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
88 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
89 ", pg_type pgt".
90 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
91 " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
92 if ($result->resultStatus ne PGRES_TUPLES_OK)
93 {
94 print STDERR $mconn->errorMessage unless ($quiet);
95 $mconn->exec("ROLLBACK");
96 return(-1);
97 }
98
99 while (@row = $result->fetchrow)
100 {
101 # printf "$row[0], $row[1], $row[2]\n";
102 if (ref($onlytables) eq 'HASH') {
103 next unless (exists $onlytables->{$row[1]});
104 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
105 }
106 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
107 }
108
109 # Read last succeeded sync
110 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
111 " where server = $sserver AND syncid = (select max(syncid) from" .
112 " _RSERV_SYNC_ where server = $sserver AND status > 0)";
113
114 printf "$sql\n" if $debug;
115
116 $result = $mconn->exec($sql);
117 if ($result->resultStatus ne PGRES_TUPLES_OK)
118 {
119 print STDERR $mconn->errorMessage unless ($quiet);
120 $mconn->exec("ROLLBACK");
121 return(-1);
122 }
123
124 my @lastsync = $result->fetchrow;
125
126 # exclude data which originated from master server
127 my $sel_server = " and l.server = $mserver ";
128
129 my $sinfo = "";
130 if (@lastsync && $lastsync[3] ne '') # sync info
131 {
132 $sinfo = "and (l.logid >= $lastsync[3]";
133 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
134 $sinfo .= ")";
135 }
136
137 my $havedeal = 0;
138
139 # DELETED rows
140 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
141 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
142
143 printf "DELETED: $sql\n" if $debug;
144
145 $result = $mconn->exec($sql);
146 if ($result->resultStatus ne PGRES_TUPLES_OK)
147 {
148 print STDERR $mconn->errorMessage unless ($quiet);
149 $mconn->exec("ROLLBACK");
150 return(-1);
151 }
152
153 my $lastoid = -1;
154 while (@row = $result->fetchrow)
155 {
156 next unless exists $Mtables{$row[0]};
157 next unless exists $Stables{$Mtables{$row[0]}[0]};
158
159 if ($lastoid != $row[0])
160 {
161 if ($lastoid == -1)
162 {
163 my $syncid = GetSYNCID($mconn, $outf);
164 return($syncid) if $syncid < 0;
165 $havedeal = 1;
166 }
167 else
168 {
169 printf $outf "\\.\n";
170 }
171 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
172 $lastoid = $row[0];
173 }
174 if (! defined $row[1])
175 {
176 print STDERR "NULL key\n" unless ($quiet);
177 $mconn->exec("ROLLBACK");
178 return(-2);
179 }
180 printf $outf "%s\n", OutputValue($row[1]);
181 }
182 printf $outf "\\.\n" if ($lastoid != -1);
183
184 # UPDATED rows
185
186 my ($taboid, $tabname, $tabkey);
187 foreach $taboid (keys %Mtables)
188 {
189 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
190 next unless exists $Stables{$tabname};
191
192 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
193
194 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
195 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
196 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
197 $sel_server;
198
199 printf "UPDATED: $sql\n" if $debug;
200
201 $result = $mconn->exec($sql);
202 if ($result->resultStatus ne PGRES_TUPLES_OK)
203 {
204 printf $outf "-- ERROR\n" if $havedeal;
205 print STDERR $mconn->errorMessage unless ($quiet);
206 $mconn->exec("ROLLBACK");
207 return(-1);
208 }
209 next if $result->ntuples <= 0;
210 if (! $havedeal)
211 {
212 my $syncid = GetSYNCID($mconn, $outf);
213 return($syncid) if $syncid < 0;
214 $havedeal = 1;
215 }
216 printf $outf "-- UPDATE $tabname\n";
217 printf "-- UPDATE $tabname\n" if $debug;
218 while (@row = $result->fetchrow)
219 {
220 for (my $i = 0; $i <= $#row; $i++)
221 {
222 printf $outf " " if $i;
223 printf " " if $i && $debug;
224 printf $outf "%s", OutputValue($row[$i]);
225 printf "%s", OutputValue($row[$i]) if $debug;;
226 }
227 printf $outf "\n";
228 printf "\n" if $debug;
229 }
230 printf $outf "\\.\n";
231 printf "\\.\n" if $debug;;
232 }
233
234 # INSERTED rows
235
236 foreach $taboid (keys %Mtables)
237 {
238 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
239 next unless exists $Stables{$tabname};
240
241 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
242
243 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
244 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
245 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
246 $sel_server;
247
248 printf "INSERTED: $sql\n" if $debug;
249
250 $result = $mconn->exec($sql);
251 if ($result->resultStatus ne PGRES_TUPLES_OK)
252 {
253 printf $outf "-- ERROR\n" if $havedeal;
254 print STDERR $mconn->errorMessage unless ($quiet);
255 $mconn->exec("ROLLBACK");
256 return(-1);
257 }
258 next if $result->ntuples <= 0;
259 if (! $havedeal)
260 {
261 my $syncid = GetSYNCID($mconn, $outf);
262 return($syncid) if $syncid < 0;
263 $havedeal = 1;
264 }
265 printf $outf "-- INSERT $tabname\n";
266 printf "-- INSERT $tabname\n" if $debug;
267 while (@row = $result->fetchrow)
268 {
269 for (my $i = 0; $i <= $#row; $i++)
270 {
271 printf $outf " " if $i;
272 printf " " if $i && $debug;
273 printf $outf "%s", OutputValue($row[$i]);
274 printf "%s", OutputValue($row[$i]) if $debug;;
275 }
276 printf $outf "\n";
277 printf "\n" if $debug;
278 }
279 printf $outf "\\.\n";
280 printf "\\.\n" if $debug;;
281 }
282
283
284 unless ($havedeal)
285 {
286 $mconn->exec("ROLLBACK");
287 return(0);
288 }
289
290 # Remember this snapshot info
291 $result = $mconn->exec("select _rserv_sync_($sserver)");
292 if ($result->resultStatus ne PGRES_TUPLES_OK)
293 {
294 printf $outf "-- ERROR\n";
295 print STDERR $mconn->errorMessage unless ($quiet);
296 $mconn->exec("ROLLBACK");
297 return(-1);
298 }
299
300 $result = $mconn->exec("COMMIT");
301 if ($result->resultStatus ne PGRES_COMMAND_OK)
302 {
303 printf $outf "-- ERROR\n";
304 print STDERR $mconn->errorMessage unless ($quiet);
305 $mconn->exec("ROLLBACK");
306 return(-1);
307 }
308 printf $outf "-- OK\n";
309 printf "-- OK\n" if $debug;
310
311 return(1);
312
313 }
314
315 sub OutputValue
316 {
317 my ($val) = @_; # @_[0];
318
319 return("\\N") unless defined $val;
320
321 $val =~ s/\\/\\\\/g;
322 $val =~ s/ /\\011/g;
323 $val =~ s/\n/\\012/g;
324 $val =~ s/\'/\\047/g;
325
326 return($val);
327 }
328
329 # Get syncid for new snapshot
330 sub GetSYNCID
331 {
332 my ($conn, $outf) = @_; # (@_[0], @_[1]);
333
334 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
335 if ($result->resultStatus ne PGRES_TUPLES_OK)
336 {
337 print STDERR $conn->errorMessage unless ($quiet);
338 $conn->exec("ROLLBACK");
339 return(-1);
340 }
341
342 my @row = $result->fetchrow;
343
344 printf $outf "-- SYNCID $row[0]\n";
345 printf "-- SYNCID $row[0]\n" if $debug;
346 return($row[0]);
347 }
348
349
350 sub CleanLog
351 {
352 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
353
354 my $result = $conn->exec("BEGIN");
355 if ($result->resultStatus ne PGRES_COMMAND_OK)
356 {
357 print STDERR $conn->errorMessage unless ($quiet);
358 $conn->exec("ROLLBACK");
359 return(-1);
360 }
361
362 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
363 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
364 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
365
366 printf "$sql\n" if $debug;
367
368 $result = $conn->exec($sql);
369 if ($result->resultStatus ne PGRES_TUPLES_OK)
370 {
371 print STDERR $conn->errorMessage unless ($quiet);
372 return(-1);
373 }
374 my $maxid = '';
375 my %active = ();
376 while (my @row = $result->fetchrow)
377 {
378 $maxid = $row[0] if $maxid eq '';
379 last if $row[0] > $maxid;
380 my @ids = split(/[ ]+,[ ]+/, $row[1]);
381 foreach my $aid (@ids)
382 {
383 $active{$aid} = 1 unless exists $active{$aid};
384 }
385 }
386 if ($maxid eq '')
387 {
388 print STDERR "No Sync IDs\n" unless ($quiet);
389 return(0);
390 }
391 my $alist = join(',', keys %active);
392 my $sinfo = "logid < $maxid";
393 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
394 #if (ref($onlytables) eq 'HASH') {
395 # foreach my $onlytable (keys %{$onlytables}) {
396 # $sinfo
397 # }
398 #}
399 $sql = "delete from _RSERV_LOG_ where " .
400 "logtime < now() - '$howold second'::interval AND $sinfo";
401
402 printf "$sql\n" if $debug;
403
404 $result = $conn->exec($sql);
405 if ($result->resultStatus ne PGRES_COMMAND_OK)
406 {
407 print STDERR $conn->errorMessage unless ($quiet);
408 $conn->exec("ROLLBACK");
409 return(-1);
410 }
411 $maxid = $result->cmdTuples;
412
413 $result = $conn->exec("COMMIT");
414 if ($result->resultStatus ne PGRES_COMMAND_OK)
415 {
416 print STDERR $conn->errorMessage unless ($quiet);
417 $conn->exec("ROLLBACK");
418 return(-1);
419 }
420
421 return($maxid);
422 }
423
424 sub ApplySnapshot
425 {
426 my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
427
428 my $result = $sconn->exec("BEGIN");
429 if ($result->resultStatus ne PGRES_COMMAND_OK)
430 {
431 print STDERR $sconn->errorMessage unless ($quiet);
432 $sconn->exec("ROLLBACK");
433 return(-1);
434 }
435
436 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
437 if ($result->resultStatus ne PGRES_COMMAND_OK)
438 {
439 print STDERR $sconn->errorMessage unless ($quiet);
440 $sconn->exec("ROLLBACK");
441 return(-1);
442 }
443
444 # MAP name --> oid, keyname, keynum
445 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
446 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
447 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
448 " AND pga.attnum = rt.key";
449
450 $result = $sconn->exec($sql);
451 if ($result->resultStatus ne PGRES_TUPLES_OK)
452 {
453 print STDERR $sconn->errorMessage unless ($quiet);
454 $sconn->exec("ROLLBACK");
455 return(-1);
456 }
457 %Stables = ();
458 while (my @row = $result->fetchrow)
459 {
460 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
461 if (ref($onlytables) eq 'HASH') {
462 next unless (exists $onlytables->{$row[1]});
463 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
464 }
465 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
466 }
467
468 my $ok = 0;
469 my $syncid = -1;
470 while(<$inpf>)
471 {
472 $_ =~ s/\n//;
473 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
474 if ($cmt ne '--')
475 {
476 printf STDERR "Invalid format\n" unless ($quiet);
477 $sconn->exec("ROLLBACK");
478 return(-2);
479 }
480 if ($cmd eq 'DELETE')
481 {
482 if ($syncid == -1)
483 {
484 printf STDERR "Sync ID unspecified\n" unless ($quiet);
485 $sconn->exec("ROLLBACK");
486 return(-2);
487 }
488 $result = DoDelete($sconn, $inpf, $prm);
489 if ($result)
490 {
491 $sconn->exec("ROLLBACK");
492 return($result);
493 }
494 }
495 elsif ($cmd eq 'INSERT')
496 {
497 if ($syncid == -1)
498 {
499 printf STDERR "Sync ID unspecified\n" unless ($quiet);
500 $sconn->exec("ROLLBACK");
501 return(-2);
502 }
503 $result = DoInsert($sconn, $inpf, $prm);
504 if ($result)
505 {
506 $sconn->exec("ROLLBACK");
507 return($result);
508 }
509 }
510 elsif ($cmd eq 'UPDATE')
511 {
512 if ($syncid == -1)
513 {
514 printf STDERR "Sync ID unspecified\n" unless ($quiet);
515 $sconn->exec("ROLLBACK");
516 return(-2);
517 }
518 $result = DoUpdate($sconn, $inpf, $prm);
519 if ($result)
520 {
521 $sconn->exec("ROLLBACK");
522 return($result);
523 }
524 }
525 elsif ($cmd eq 'SYNCID')
526 {
527 if ($syncid != -1)
528 {
529 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
530 $sconn->exec("ROLLBACK");
531 return(-2);
532 }
533 if ($prm !~ /^\d+$/)
534 {
535 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
536 $sconn->exec("ROLLBACK");
537 return(-2);
538 }
539 $syncid = $prm;
540
541 printf STDERR "Sync ID $syncid\n" unless ($quiet);
542
543 $result = $sconn->exec("select syncid, synctime from " .
544 "_RSERV_SLAVE_SYNC_ where syncid = " .
545 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
546 if ($result->resultStatus ne PGRES_TUPLES_OK)
547 {
548 print STDERR $sconn->errorMessage unless ($quiet);
549 $sconn->exec("ROLLBACK");
550 return(-1);
551 }
552 my @row = $result->fetchrow;
553 if (! defined $row[0])
554 {
555 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
556 "(syncid, synctime) values ($syncid, now())");
557 }
558 elsif ($row[0] >= $prm)
559 {
560 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
561 $sconn->exec("ROLLBACK");
562 return(0);
563 }
564 else
565 {
566 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
567 " set syncid = $syncid, synctime = now()");
568 }
569 if ($result->resultStatus ne PGRES_COMMAND_OK)
570 {
571 print STDERR $sconn->errorMessage unless ($quiet);
572 $sconn->exec("ROLLBACK");
573 return(-1);
574 }
575 }
576 elsif ($cmd eq 'OK')
577 {
578 $ok = 1;
579 last;
580 }
581 elsif ($cmd eq 'ERROR')
582 {
583 printf STDERR "ERROR signaled\n" unless ($quiet);
584 $sconn->exec("ROLLBACK");
585 return(-2);
586 }
587 else
588 {
589 printf STDERR "Unknown command $cmd\n" unless ($quiet);
590 $sconn->exec("ROLLBACK");
591 return(-2);
592 }
593 }
594
595 if (! $ok)
596 {
597 printf STDERR "No OK flag in input\n" unless ($quiet);
598 $sconn->exec("ROLLBACK");
599 return(-2);
600 }
601
602 $result = $sconn->exec("COMMIT");
603 if ($result->resultStatus ne PGRES_COMMAND_OK)
604 {
605 print STDERR $sconn->errorMessage unless ($quiet);
606 $sconn->exec("ROLLBACK");
607 return(-1);
608 }
609
610 return(1);
611 }
612
613 sub DoDelete
614 {
615 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
616
617 # only delete tables that the slave wants
618 if (! defined($Stables{$tabname})) {
619 print "Not configured to delete rows from table $tabname\n" unless $quiet;
620 while (<$inpf>) {
621 my $istring = $_;
622 $istring =~ s/\n//;
623 last if ($istring eq '\.');
624 }
625 return(0);
626 }
627
628 my $ok = 0;
629 while(<$inpf>)
630 {
631 if ($_ !~ /\n$/)
632 {
633 printf STDERR "Invalid format\n" unless ($quiet);
634 return(-2);
635 }
636 my $key = $_;
637 $key =~ s/\n//;
638 if ($key eq '\.')
639 {
640 $ok = 1;
641 last;
642 }
643
644 my $sql = "delete from \"$tabname\" where ".
645 "\"$Stables{$tabname}->[1]\" = '$key'";
646
647 printf "$sql\n" if $debug;
648
649 my $result = $sconn->exec($sql);
650 if ($result->resultStatus ne PGRES_COMMAND_OK)
651 {
652 print STDERR $sconn->errorMessage unless ($quiet);
653 return(-1);
654 }
655 }
656
657 if (! $ok)
658 {
659 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
660 return(-2);
661 }
662
663 return(0);
664 }
665
666
667 sub DoUpdate
668 {
669 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
670
671 # only update the tables that the slave wants
672 if (! defined($Stables{$tabname})) {
673 print "Not configured to update rows from table $tabname\n" unless $quiet;
674 while (<$inpf>) {
675 my $istring = $_;
676 $istring =~ s/\n//;
677 last if ($istring eq '\.');
678 }
679 return(0);
680 }
681
682 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
683
684 my @CopyBuf = ();
685 my $CBufLen = 0;
686 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
687
688 my $sql = "select attnum, attname from pg_attribute" .
689 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
690
691 my $result = $sconn->exec($sql);
692 if ($result->resultStatus ne PGRES_TUPLES_OK)
693 {
694 print STDERR $sconn->errorMessage unless ($quiet);
695 return(-1);
696 }
697
698 my @anames = ();
699 while (my @row = $result->fetchrow)
700 {
701 $anames[$row[0]] = $row[1];
702 }
703
704 my $istring;
705 my $ok = 0;
706 while(<$inpf>)
707 {
708 if ($_ !~ /\n$/)
709 {
710 printf STDERR "Invalid format\n" unless ($quiet);
711 return(-2);
712 }
713 $istring = $_;
714 $istring =~ s/\n//;
715 if ($istring eq '\.')
716 {
717 $ok = 1;
718 last;
719 }
720 my @vals = split(/ /, $istring);
721 if ($oidkey)
722 {
723 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
724 {
725 printf STDERR "Invalid OID\n" unless ($quiet);
726 return(-2);
727 }
728 $oidkey = $vals[0];
729 }
730 else
731 {
732 unshift @vals, '';
733 }
734
735 $sql = "update \"$tabname\" set ";
736 my $ocnt = 0;
737 for (my $i = 1; $i <= $#anames; $i++)
738 {
739 if ($vals[$i] eq '\N')
740 {
741 if ($i == $Stables{$tabname}->[2])
742 {
743 printf STDERR "NULL key\n" unless ($quiet);
744 return(-2);
745 }
746 $vals[$i] = 'null';
747 }
748 else
749 {
750 $vals[$i] = "'" . $vals[$i] . "'";
751 next if $i == $Stables{$tabname}->[2];
752 }
753 $ocnt++;
754 $sql .= ', ' if $ocnt > 1;
755 $sql .= "\"$anames[$i]\" = $vals[$i]";
756 }
757 if ($oidkey)
758 {
759 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
760 }
761 else
762 {
763 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
764 $vals[$Stables{$tabname}->[2]];
765 }
766
767 printf "$sql\n" if $debug;
768
769 $result = $sconn->exec($sql);
770
771 if ($result->resultStatus ne PGRES_COMMAND_OK)
772 {
773 print STDERR $sconn->errorMessage unless ($quiet);
774 return(-1);
775 }
776 next if $result->cmdTuples == 1; # updated
777
778 if ($result->cmdTuples > 1)
779 {
780 printf STDERR "Duplicate keys\n" unless ($quiet);
781 return(-2);
782 }
783
784 # no key - copy
785 push @CopyBuf, "$istring\n";
786 $CBufLen += length($istring);
787
788 if ($CBufLen >= $CBufMax)
789 {
790 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
791 return($result) if $result;
792 @CopyBuf = ();
793 $CBufLen = 0;
794 }
795 }
796
797 if (! $ok)
798 {
799 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
800 return(-2);
801 }
802
803 if ($CBufLen)
804 {
805 print "@CopyBuf\n" if $debug;
806 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
807 return($result) if $result;
808 }
809
810 return(0);
811 }
812
813 sub DoInsert
814 {
815 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
816
817 # only insert rows into tables that the slave wants
818 if (! defined($Stables{$tabname})) {
819 print "Not configured to insert rows from table $tabname\n" unless $quiet;
820 while (<$inpf>) {
821 my $istring = $_;
822 $istring =~ s/\n//;
823 last if ($istring eq '\.');
824 }
825 return(0);
826 }
827
828 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
829
830 my @CopyBuf = ();
831 my $CBufLen = 0;
832 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
833
834 my $istring;
835 my $ok = 0;
836 while(<$inpf>)
837 {
838 if ($_ !~ /\n$/)
839 {
840 printf STDERR "Invalid format\n" unless ($quiet);
841 return(-2);
842 }
843 $istring = $_;
844 $istring =~ s/\n//;
845 if ($istring eq '\.')
846 {
847 $ok = 1;
848 last;
849 }
850
851 # no key - copy
852 push @CopyBuf, "$istring\n";
853 $CBufLen += length($istring);
854
855 if ($CBufLen >= $CBufMax)
856 {
857 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
858 return($result) if $result;
859 @CopyBuf = ();
860 $CBufLen = 0;
861 }
862 }
863
864 if (! $ok)
865 {
866 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
867 return(-2);
868 }
869
870 if ($CBufLen)
871 {
872 print "@CopyBuf\n" if $debug;
873 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
874 return($result) if $result;
875 }
876
877 return(0);
878 }
879
880
881 sub DoCopy
882 {
883 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
884
885 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
886 "FROM STDIN";
887 my $result = $sconn->exec($sql);
888 if ($result->resultStatus ne PGRES_COPY_IN)
889 {
890 print STDERR $sconn->errorMessage unless ($quiet);
891 return(-1);
892 }
893
894 foreach my $str (@{$CBuf})
895 {
896 $sconn->putline($str);
897 }
898
899 $sconn->putline("\\.\n");
900
901 if ($sconn->endcopy)
902 {
903 print STDERR $sconn->errorMessage unless ($quiet);
904 return(-1);
905 }
906
907 return(0);
908 }
909
910
911 #
912 # Returns last SyncID applied on Slave
913 #
914 sub GetSyncID
915 {
916 my ($sconn) = @_; # (@_[0]);
917
918 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
919 if ($result->resultStatus ne PGRES_TUPLES_OK)
920 {
921 print STDERR $sconn->errorMessage unless ($quiet);
922 return(-1);
923 }
924 my @row = $result->fetchrow;
925 return(undef) unless defined $row[0]; # null
926 return($row[0]);
927 }
928
929 #
930 # Updates _RSERV_SYNC_ on Master with Slave SyncID
931 #
932 sub SyncSyncID
933 {
934 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
935
936 my $result = $mconn->exec("BEGIN");
937 if ($result->resultStatus ne PGRES_COMMAND_OK)
938 {
939 print STDERR $mconn->errorMessage unless ($quiet);
940 $mconn->exec("ROLLBACK");
941 return(-1);
942 }
943
944 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
945 " where server = $sserver AND syncid = $syncid" .
946 " for update");
947 if ($result->resultStatus ne PGRES_TUPLES_OK)
948 {
949 print STDERR $mconn->errorMessage unless ($quiet);
950 $mconn->exec("ROLLBACK");
951 return(-1);
952 }
953 my @row = $result->fetchrow;
954 if (! defined $row[0])
955 {
956 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
957 $mconn->exec("ROLLBACK");
958 return(0);
959 }
960 if ($row[1] > 0)
961 {
962 printf STDERR "SyncID $syncid for server ".
963 "$sserver already updated\n" unless ($quiet);
964 $mconn->exec("ROLLBACK");
965 return(0);
966 }
967 $result = $mconn->exec("update _RSERV_SYNC_" .
968 " set synctime = now(), status = 1" .
969 " where server = $sserver AND syncid = $syncid");
970 if ($result->resultStatus ne PGRES_COMMAND_OK)
971 {
972 print STDERR $mconn->errorMessage unless ($quiet);
973 $mconn->exec("ROLLBACK");
974 return(-1);
975 }
976 $result = $mconn->exec("delete from _RSERV_SYNC_" .
977 " where server = $sserver AND syncid < $syncid");
978 if ($result->resultStatus ne PGRES_COMMAND_OK)
979 {
980 print STDERR $mconn->errorMessage unless ($quiet);
981 $mconn->exec("ROLLBACK");
982 return(-1);
983 }
984
985 $result = $mconn->exec("COMMIT");
986 if ($result->resultStatus ne PGRES_COMMAND_OK)
987 {
988 print STDERR $mconn->errorMessage unless ($quiet);
989 $mconn->exec("ROLLBACK");
990 return(-1);
991 }
992
993 return(1);
994 }
995
996 1;

  ViewVC Help
Powered by ViewVC 1.1.26