/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.14 - (show annotations)
Sun Nov 2 11:29:09 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.13: +4 -1 lines
more debugging

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo
11 $debug $quiet $verbose
12 );
13 @EXPORT_OK = qw();
14 use strict;
15 use Pg;
16
17 my $debug = 0;
18 my $quiet = 1;
19 my $verbose = 0;
20
21 $debug = 1;
22 $quiet = 0;
23 $verbose = 1;
24
25 my %Mtables = ();
26 my %Stables = ();
27
28 sub GetServerId
29 {
30 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31
32 print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33
34 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 " host='$Host' AND dbase='$DB'");
36
37 if ($result->resultStatus ne PGRES_TUPLES_OK)
38 {
39 print STDERR $mconn->errorMessage unless ($quiet);
40 return(-1);
41 }
42
43 if ($result->cmdTuples && $result->cmdTuples > 1)
44 {
45 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 return(-2);
47 }
48
49 my @row = $result->fetchrow;
50
51 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52
53 return $row[0];
54 }
55
56 sub PrepareSnapshot
57 {
58 my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59
60 print STDERR "## d: $debug v: $verbose q: $quiet\n";
61
62 if ($mserver == $sserver) {
63 print STDERR "master and slave numbers are same [$mserver] !\n";
64 return(-1);
65 }
66
67 print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
68
69 # first, we must know for wich tables the slave subscribed
70 my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
71 if ($result->resultStatus ne PGRES_TUPLES_OK)
72 {
73 print STDERR $mconn->errorMessage unless ($quiet);
74 return(-1);
75 }
76
77 my @row;
78 while (@row = $result->fetchrow) {
79 $Stables{$row[0]} = 1;
80 }
81
82 print "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
83
84 $result = $mconn->exec("BEGIN");
85 if ($result->resultStatus ne PGRES_COMMAND_OK)
86 {
87 print STDERR $mconn->errorMessage unless ($quiet);
88 $mconn->exec("ROLLBACK");
89 return(-1);
90 }
91 $result = $mconn->exec("set transaction isolation level serializable");
92 if ($result->resultStatus ne PGRES_COMMAND_OK)
93 {
94 print STDERR $mconn->errorMessage unless ($quiet);
95 $mconn->exec("ROLLBACK");
96 return(-1);
97 }
98
99 # MAP oid --> tabname, keyname, key_type
100 $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
101 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
102 ", pg_type pgt".
103 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
104 " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
105 if ($result->resultStatus ne PGRES_TUPLES_OK)
106 {
107 print STDERR $mconn->errorMessage unless ($quiet);
108 $mconn->exec("ROLLBACK");
109 return(-1);
110 }
111
112 while (@row = $result->fetchrow)
113 {
114 # printf "$row[0], $row[1], $row[2]\n";
115 if (ref($onlytables) eq 'HASH') {
116 next unless (exists $onlytables->{$row[1]});
117 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
118 }
119 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
120 }
121
122 print "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
123
124 # Read last succeeded sync
125 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
126 " where server = $sserver AND syncid = (select max(syncid) from" .
127 " _RSERV_SYNC_ where server = $sserver AND status > 0)";
128
129 printf "$sql\n" if $debug;
130
131 $result = $mconn->exec($sql);
132 if ($result->resultStatus ne PGRES_TUPLES_OK)
133 {
134 print STDERR $mconn->errorMessage unless ($quiet);
135 $mconn->exec("ROLLBACK");
136 return(-1);
137 }
138
139 my @lastsync = $result->fetchrow;
140 print "lastsync: ",join(",",@lastsync),"\n" if ($debug);
141
142 # exclude data which originated from master server
143 my $sel_server = " and l.server = $mserver ";
144
145 my $sinfo = "";
146 if (@lastsync && $lastsync[3] ne '') # sync info
147 {
148 $sinfo = "and (l.logid >= $lastsync[3]";
149 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
150 $sinfo .= ")";
151 }
152
153 my $havedeal = 0;
154
155 # DELETED rows
156 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
157 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
158
159 printf "DELETED: $sql\n" if $debug;
160
161 $result = $mconn->exec($sql);
162 if ($result->resultStatus ne PGRES_TUPLES_OK)
163 {
164 print STDERR $mconn->errorMessage unless ($quiet);
165 $mconn->exec("ROLLBACK");
166 return(-1);
167 }
168
169 my $lastoid = -1;
170 while (@row = $result->fetchrow)
171 {
172 next unless exists $Mtables{$row[0]};
173 next unless exists $Stables{$Mtables{$row[0]}[0]};
174
175 if ($lastoid != $row[0])
176 {
177 if ($lastoid == -1)
178 {
179 my $syncid = GetSYNCID($mconn, $outf);
180 return($syncid) if $syncid < 0;
181 $havedeal = 1;
182 }
183 else
184 {
185 printf $outf "\\.\n";
186 }
187 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
188 $lastoid = $row[0];
189 }
190 if (! defined $row[1])
191 {
192 print STDERR "NULL key\n" unless ($quiet);
193 $mconn->exec("ROLLBACK");
194 return(-2);
195 }
196 printf $outf "%s\n", OutputValue($row[1]);
197 }
198 printf $outf "\\.\n" if ($lastoid != -1);
199
200 # UPDATED rows
201
202 my ($taboid, $tabname, $tabkey);
203 foreach $taboid (keys %Mtables)
204 {
205 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
206 next unless exists $Stables{$tabname};
207
208 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
209
210 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
211 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
212 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
213 $sel_server;
214
215 printf "UPDATED: $sql\n" if $debug;
216
217 $result = $mconn->exec($sql);
218 if ($result->resultStatus ne PGRES_TUPLES_OK)
219 {
220 printf $outf "-- ERROR\n" if $havedeal;
221 print STDERR $mconn->errorMessage unless ($quiet);
222 $mconn->exec("ROLLBACK");
223 return(-1);
224 }
225 next if $result->ntuples <= 0;
226 if (! $havedeal)
227 {
228 my $syncid = GetSYNCID($mconn, $outf);
229 return($syncid) if $syncid < 0;
230 $havedeal = 1;
231 }
232 printf $outf "-- UPDATE $tabname\n";
233 printf "-- UPDATE $tabname\n" if $debug;
234 while (@row = $result->fetchrow)
235 {
236 for (my $i = 0; $i <= $#row; $i++)
237 {
238 printf $outf " " if $i;
239 printf " " if $i && $debug;
240 printf $outf "%s", OutputValue($row[$i]);
241 printf "%s", OutputValue($row[$i]) if $debug;;
242 }
243 printf $outf "\n";
244 printf "\n" if $debug;
245 }
246 printf $outf "\\.\n";
247 printf "\\.\n" if $debug;;
248 }
249
250 # INSERTED rows
251
252 foreach $taboid (keys %Mtables)
253 {
254 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
255 next unless exists $Stables{$tabname};
256
257 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
258
259 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
260 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
261 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
262 $sel_server;
263
264 printf "INSERTED: $sql\n" if $debug;
265
266 $result = $mconn->exec($sql);
267 if ($result->resultStatus ne PGRES_TUPLES_OK)
268 {
269 printf $outf "-- ERROR\n" if $havedeal;
270 print STDERR $mconn->errorMessage unless ($quiet);
271 $mconn->exec("ROLLBACK");
272 return(-1);
273 }
274 next if $result->ntuples <= 0;
275 if (! $havedeal)
276 {
277 my $syncid = GetSYNCID($mconn, $outf);
278 return($syncid) if $syncid < 0;
279 $havedeal = 1;
280 }
281 printf $outf "-- INSERT $tabname\n";
282 printf "-- INSERT $tabname\n" if $debug;
283 while (@row = $result->fetchrow)
284 {
285 for (my $i = 0; $i <= $#row; $i++)
286 {
287 printf $outf " " if $i;
288 printf " " if $i && $debug;
289 printf $outf "%s", OutputValue($row[$i]);
290 printf "%s", OutputValue($row[$i]) if $debug;;
291 }
292 printf $outf "\n";
293 printf "\n" if $debug;
294 }
295 printf $outf "\\.\n";
296 printf "\\.\n" if $debug;;
297 }
298
299
300 unless ($havedeal)
301 {
302 $mconn->exec("ROLLBACK");
303 return(0);
304 }
305
306 # Remember this snapshot info
307 $result = $mconn->exec("select _rserv_sync_($sserver)");
308 if ($result->resultStatus ne PGRES_TUPLES_OK)
309 {
310 printf $outf "-- ERROR\n";
311 print STDERR $mconn->errorMessage unless ($quiet);
312 $mconn->exec("ROLLBACK");
313 return(-1);
314 }
315
316 $result = $mconn->exec("COMMIT");
317 if ($result->resultStatus ne PGRES_COMMAND_OK)
318 {
319 printf $outf "-- ERROR\n";
320 print STDERR $mconn->errorMessage unless ($quiet);
321 $mconn->exec("ROLLBACK");
322 return(-1);
323 }
324 printf $outf "-- OK\n";
325 printf "-- OK\n" if $debug;
326
327 return(1);
328
329 }
330
331 sub OutputValue
332 {
333 my ($val) = @_; # @_[0];
334
335 return("\\N") unless defined $val;
336
337 $val =~ s/\\/\\\\/g;
338 $val =~ s/ /\\011/g;
339 $val =~ s/\n/\\012/g;
340 $val =~ s/\'/\\047/g;
341
342 return($val);
343 }
344
345 # Get syncid for new snapshot
346 sub GetSYNCID
347 {
348 my ($conn, $outf) = @_; # (@_[0], @_[1]);
349
350 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
351 if ($result->resultStatus ne PGRES_TUPLES_OK)
352 {
353 print STDERR $conn->errorMessage unless ($quiet);
354 $conn->exec("ROLLBACK");
355 return(-1);
356 }
357
358 my @row = $result->fetchrow;
359
360 printf $outf "-- SYNCID $row[0]\n";
361 printf "-- SYNCID $row[0]\n" if $debug;
362 return($row[0]);
363 }
364
365
366 sub CleanLog
367 {
368 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
369
370 my $result = $conn->exec("BEGIN");
371 if ($result->resultStatus ne PGRES_COMMAND_OK)
372 {
373 print STDERR $conn->errorMessage unless ($quiet);
374 $conn->exec("ROLLBACK");
375 return(-1);
376 }
377
378 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
379 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
380 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
381
382 printf "$sql\n" if $debug;
383
384 $result = $conn->exec($sql);
385 if ($result->resultStatus ne PGRES_TUPLES_OK)
386 {
387 print STDERR $conn->errorMessage unless ($quiet);
388 return(-1);
389 }
390 my $maxid = '';
391 my %active = ();
392 while (my @row = $result->fetchrow)
393 {
394 $maxid = $row[0] if $maxid eq '';
395 last if $row[0] > $maxid;
396 my @ids = split(/[ ]+,[ ]+/, $row[1]);
397 foreach my $aid (@ids)
398 {
399 $active{$aid} = 1 unless exists $active{$aid};
400 }
401 }
402 if ($maxid eq '')
403 {
404 print STDERR "No Sync IDs\n" unless ($quiet);
405 return(0);
406 }
407 my $alist = join(',', keys %active);
408 my $sinfo = "logid < $maxid";
409 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
410 #if (ref($onlytables) eq 'HASH') {
411 # foreach my $onlytable (keys %{$onlytables}) {
412 # $sinfo
413 # }
414 #}
415 $sql = "delete from _RSERV_LOG_ where " .
416 "logtime < now() - '$howold second'::interval AND $sinfo";
417
418 printf "$sql\n" if $debug;
419
420 $result = $conn->exec($sql);
421 if ($result->resultStatus ne PGRES_COMMAND_OK)
422 {
423 print STDERR $conn->errorMessage unless ($quiet);
424 $conn->exec("ROLLBACK");
425 return(-1);
426 }
427 $maxid = $result->cmdTuples;
428
429 $result = $conn->exec("COMMIT");
430 if ($result->resultStatus ne PGRES_COMMAND_OK)
431 {
432 print STDERR $conn->errorMessage unless ($quiet);
433 $conn->exec("ROLLBACK");
434 return(-1);
435 }
436
437 return($maxid);
438 }
439
440 sub ApplySnapshot
441 {
442 my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
443
444 my $result = $sconn->exec("BEGIN");
445 if ($result->resultStatus ne PGRES_COMMAND_OK)
446 {
447 print STDERR $sconn->errorMessage unless ($quiet);
448 $sconn->exec("ROLLBACK");
449 return(-1);
450 }
451
452 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
453 if ($result->resultStatus ne PGRES_COMMAND_OK)
454 {
455 print STDERR $sconn->errorMessage unless ($quiet);
456 $sconn->exec("ROLLBACK");
457 return(-1);
458 }
459
460 # MAP name --> oid, keyname, keynum
461 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
462 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
463 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
464 " AND pga.attnum = rt.key";
465
466 $result = $sconn->exec($sql);
467 if ($result->resultStatus ne PGRES_TUPLES_OK)
468 {
469 print STDERR $sconn->errorMessage unless ($quiet);
470 $sconn->exec("ROLLBACK");
471 return(-1);
472 }
473 %Stables = ();
474 while (my @row = $result->fetchrow)
475 {
476 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
477 if (ref($onlytables) eq 'HASH') {
478 next unless (exists $onlytables->{$row[1]});
479 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
480 }
481 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
482 }
483
484 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
485
486 my $ok = 0;
487 my $syncid = -1;
488 while(<$inpf>)
489 {
490 $_ =~ s/\n//;
491 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
492 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
493 if ($cmt ne '--')
494 {
495 printf STDERR "Invalid format\n" unless ($quiet);
496 $sconn->exec("ROLLBACK");
497 return(-2);
498 }
499 if ($cmd eq 'DELETE')
500 {
501 if ($syncid == -1)
502 {
503 printf STDERR "Sync ID unspecified\n" unless ($quiet);
504 $sconn->exec("ROLLBACK");
505 return(-2);
506 }
507 $result = DoDelete($sconn, $inpf, $prm);
508 if ($result)
509 {
510 $sconn->exec("ROLLBACK");
511 return($result);
512 }
513 }
514 elsif ($cmd eq 'INSERT')
515 {
516 if ($syncid == -1)
517 {
518 printf STDERR "Sync ID unspecified\n" unless ($quiet);
519 $sconn->exec("ROLLBACK");
520 return(-2);
521 }
522 $result = DoInsert($sconn, $inpf, $prm);
523 if ($result)
524 {
525 $sconn->exec("ROLLBACK");
526 return($result);
527 }
528 }
529 elsif ($cmd eq 'UPDATE')
530 {
531 if ($syncid == -1)
532 {
533 printf STDERR "Sync ID unspecified\n" unless ($quiet);
534 $sconn->exec("ROLLBACK");
535 return(-2);
536 }
537 $result = DoUpdate($sconn, $inpf, $prm);
538 if ($result)
539 {
540 $sconn->exec("ROLLBACK");
541 return($result);
542 }
543 }
544 elsif ($cmd eq 'SYNCID')
545 {
546 if ($syncid != -1)
547 {
548 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
549 $sconn->exec("ROLLBACK");
550 return(-2);
551 }
552 if ($prm !~ /^\d+$/)
553 {
554 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
555 $sconn->exec("ROLLBACK");
556 return(-2);
557 }
558 $syncid = $prm;
559
560 printf STDERR "Sync ID $syncid\n" unless ($quiet);
561
562 $result = $sconn->exec("select syncid, synctime from " .
563 "_RSERV_SLAVE_SYNC_ where syncid = " .
564 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
565 if ($result->resultStatus ne PGRES_TUPLES_OK)
566 {
567 print STDERR $sconn->errorMessage unless ($quiet);
568 $sconn->exec("ROLLBACK");
569 return(-1);
570 }
571 my @row = $result->fetchrow;
572 if (! defined $row[0])
573 {
574 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
575 "(syncid, synctime) values ($syncid, now())");
576 }
577 elsif ($row[0] >= $prm)
578 {
579 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
580 $sconn->exec("ROLLBACK");
581 return(0);
582 }
583 else
584 {
585 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
586 " set syncid = $syncid, synctime = now()");
587 }
588 if ($result->resultStatus ne PGRES_COMMAND_OK)
589 {
590 print STDERR $sconn->errorMessage unless ($quiet);
591 $sconn->exec("ROLLBACK");
592 return(-1);
593 }
594 }
595 elsif ($cmd eq 'OK')
596 {
597 $ok = 1;
598 last;
599 }
600 elsif ($cmd eq 'ERROR')
601 {
602 printf STDERR "ERROR signaled\n" unless ($quiet);
603 $sconn->exec("ROLLBACK");
604 return(-2);
605 }
606 else
607 {
608 printf STDERR "Unknown command $cmd\n" unless ($quiet);
609 $sconn->exec("ROLLBACK");
610 return(-2);
611 }
612 }
613
614 if (! $ok)
615 {
616 printf STDERR "No OK flag in input\n" unless ($quiet);
617 $sconn->exec("ROLLBACK");
618 return(-2);
619 }
620
621 $result = $sconn->exec("COMMIT");
622 if ($result->resultStatus ne PGRES_COMMAND_OK)
623 {
624 print STDERR $sconn->errorMessage unless ($quiet);
625 $sconn->exec("ROLLBACK");
626 return(-1);
627 }
628
629 return(1);
630 }
631
632 sub DoDelete
633 {
634 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
635
636 # only delete tables that the slave wants
637 if (! defined($Stables{$tabname})) {
638 print "Not configured to delete rows from table $tabname\n" unless $quiet;
639 while (<$inpf>) {
640 my $istring = $_;
641 $istring =~ s/\n//;
642 last if ($istring eq '\.');
643 }
644 return(0);
645 }
646
647 my $ok = 0;
648 while(<$inpf>)
649 {
650 if ($_ !~ /\n$/)
651 {
652 printf STDERR "Invalid format\n" unless ($quiet);
653 return(-2);
654 }
655 my $key = $_;
656 $key =~ s/\n//;
657 if ($key eq '\.')
658 {
659 $ok = 1;
660 last;
661 }
662
663 my $sql = "delete from \"$tabname\" where ".
664 "\"$Stables{$tabname}->[1]\" = '$key'";
665
666 printf "$sql\n" if $debug;
667
668 my $result = $sconn->exec($sql);
669 if ($result->resultStatus ne PGRES_COMMAND_OK)
670 {
671 print STDERR $sconn->errorMessage unless ($quiet);
672 return(-1);
673 }
674 }
675
676 if (! $ok)
677 {
678 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
679 return(-2);
680 }
681
682 return(0);
683 }
684
685
686 sub DoUpdate
687 {
688 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
689
690 # only update the tables that the slave wants
691 if (! defined($Stables{$tabname})) {
692 print "Not configured to update rows from table $tabname\n" unless $quiet;
693 while (<$inpf>) {
694 my $istring = $_;
695 $istring =~ s/\n//;
696 last if ($istring eq '\.');
697 }
698 return(0);
699 }
700
701 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
702
703 my @CopyBuf = ();
704 my $CBufLen = 0;
705 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
706
707 my $sql = "select attnum, attname from pg_attribute" .
708 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
709
710 my $result = $sconn->exec($sql);
711 if ($result->resultStatus ne PGRES_TUPLES_OK)
712 {
713 print STDERR $sconn->errorMessage unless ($quiet);
714 return(-1);
715 }
716
717 my @anames = ();
718 while (my @row = $result->fetchrow)
719 {
720 $anames[$row[0]] = $row[1];
721 }
722
723 my $istring;
724 my $ok = 0;
725 while(<$inpf>)
726 {
727 if ($_ !~ /\n$/)
728 {
729 printf STDERR "Invalid format\n" unless ($quiet);
730 return(-2);
731 }
732 $istring = $_;
733 $istring =~ s/\n//;
734 if ($istring eq '\.')
735 {
736 $ok = 1;
737 last;
738 }
739 my @vals = split(/ /, $istring);
740 if ($oidkey)
741 {
742 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
743 {
744 printf STDERR "Invalid OID\n" unless ($quiet);
745 return(-2);
746 }
747 $oidkey = $vals[0];
748 }
749 else
750 {
751 unshift @vals, '';
752 }
753
754 $sql = "update \"$tabname\" set ";
755 my $ocnt = 0;
756 for (my $i = 1; $i <= $#anames; $i++)
757 {
758 if ($vals[$i] eq '\N')
759 {
760 if ($i == $Stables{$tabname}->[2])
761 {
762 printf STDERR "NULL key\n" unless ($quiet);
763 return(-2);
764 }
765 $vals[$i] = 'null';
766 }
767 else
768 {
769 $vals[$i] = "'" . $vals[$i] . "'";
770 next if $i == $Stables{$tabname}->[2];
771 }
772 $ocnt++;
773 $sql .= ', ' if $ocnt > 1;
774 $sql .= "\"$anames[$i]\" = $vals[$i]";
775 }
776 if ($oidkey)
777 {
778 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
779 }
780 else
781 {
782 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
783 $vals[$Stables{$tabname}->[2]];
784 }
785
786 printf "$sql\n" if $debug;
787
788 $result = $sconn->exec($sql);
789
790 if ($result->resultStatus ne PGRES_COMMAND_OK)
791 {
792 print STDERR $sconn->errorMessage unless ($quiet);
793 return(-1);
794 }
795 next if $result->cmdTuples == 1; # updated
796
797 if ($result->cmdTuples > 1)
798 {
799 printf STDERR "Duplicate keys\n" unless ($quiet);
800 return(-2);
801 }
802
803 # no key - copy
804 push @CopyBuf, "$istring\n";
805 $CBufLen += length($istring);
806
807 if ($CBufLen >= $CBufMax)
808 {
809 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
810 return($result) if $result;
811 @CopyBuf = ();
812 $CBufLen = 0;
813 }
814 }
815
816 if (! $ok)
817 {
818 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
819 return(-2);
820 }
821
822 if ($CBufLen)
823 {
824 print "@CopyBuf\n" if $debug;
825 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
826 return($result) if $result;
827 }
828
829 return(0);
830 }
831
832 sub DoInsert
833 {
834 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
835
836 # only insert rows into tables that the slave wants
837 if (! defined($Stables{$tabname})) {
838 print "Not configured to insert rows from table $tabname\n" unless $quiet;
839 while (<$inpf>) {
840 my $istring = $_;
841 $istring =~ s/\n//;
842 last if ($istring eq '\.');
843 }
844 return(0);
845 }
846
847 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
848
849 my @CopyBuf = ();
850 my $CBufLen = 0;
851 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
852
853 my $istring;
854 my $ok = 0;
855 while(<$inpf>)
856 {
857 if ($_ !~ /\n$/)
858 {
859 printf STDERR "Invalid format\n" unless ($quiet);
860 return(-2);
861 }
862 $istring = $_;
863 $istring =~ s/\n//;
864 if ($istring eq '\.')
865 {
866 $ok = 1;
867 last;
868 }
869
870 # no key - copy
871 push @CopyBuf, "$istring\n";
872 $CBufLen += length($istring);
873
874 if ($CBufLen >= $CBufMax)
875 {
876 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
877 return($result) if $result;
878 @CopyBuf = ();
879 $CBufLen = 0;
880 }
881 }
882
883 if (! $ok)
884 {
885 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
886 return(-2);
887 }
888
889 if ($CBufLen)
890 {
891 print "@CopyBuf\n" if $debug;
892 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
893 return($result) if $result;
894 }
895
896 return(0);
897 }
898
899
900 sub DoCopy
901 {
902 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
903
904 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
905 "FROM STDIN";
906 my $result = $sconn->exec($sql);
907 if ($result->resultStatus ne PGRES_COPY_IN)
908 {
909 print STDERR $sconn->errorMessage unless ($quiet);
910 return(-1);
911 }
912
913 foreach my $str (@{$CBuf})
914 {
915 $sconn->putline($str);
916 }
917
918 $sconn->putline("\\.\n");
919
920 if ($sconn->endcopy)
921 {
922 print STDERR $sconn->errorMessage unless ($quiet);
923 return(-1);
924 }
925
926 return(0);
927 }
928
929
930 #
931 # Returns last SyncID applied on Slave
932 #
933 sub GetSyncID
934 {
935 my ($sconn) = @_; # (@_[0]);
936
937 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
938 if ($result->resultStatus ne PGRES_TUPLES_OK)
939 {
940 print STDERR $sconn->errorMessage unless ($quiet);
941 return(-1);
942 }
943 my @row = $result->fetchrow;
944 return(undef) unless defined $row[0]; # null
945 return($row[0]);
946 }
947
948 #
949 # Updates _RSERV_SYNC_ on Master with Slave SyncID
950 #
951 sub SyncSyncID
952 {
953 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
954
955 my $result = $mconn->exec("BEGIN");
956 if ($result->resultStatus ne PGRES_COMMAND_OK)
957 {
958 print STDERR $mconn->errorMessage unless ($quiet);
959 $mconn->exec("ROLLBACK");
960 return(-1);
961 }
962
963 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
964 " where server = $sserver AND syncid = $syncid" .
965 " for update");
966 if ($result->resultStatus ne PGRES_TUPLES_OK)
967 {
968 print STDERR $mconn->errorMessage unless ($quiet);
969 $mconn->exec("ROLLBACK");
970 return(-1);
971 }
972 my @row = $result->fetchrow;
973 if (! defined $row[0])
974 {
975 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
976 $mconn->exec("ROLLBACK");
977 return(0);
978 }
979 if ($row[1] > 0)
980 {
981 printf STDERR "SyncID $syncid for server ".
982 "$sserver already updated\n" unless ($quiet);
983 $mconn->exec("ROLLBACK");
984 return(0);
985 }
986 $result = $mconn->exec("update _RSERV_SYNC_" .
987 " set synctime = now(), status = 1" .
988 " where server = $sserver AND syncid = $syncid");
989 if ($result->resultStatus ne PGRES_COMMAND_OK)
990 {
991 print STDERR $mconn->errorMessage unless ($quiet);
992 $mconn->exec("ROLLBACK");
993 return(-1);
994 }
995 $result = $mconn->exec("delete from _RSERV_SYNC_" .
996 " where server = $sserver AND syncid < $syncid");
997 if ($result->resultStatus ne PGRES_COMMAND_OK)
998 {
999 print STDERR $mconn->errorMessage unless ($quiet);
1000 $mconn->exec("ROLLBACK");
1001 return(-1);
1002 }
1003
1004 $result = $mconn->exec("COMMIT");
1005 if ($result->resultStatus ne PGRES_COMMAND_OK)
1006 {
1007 print STDERR $mconn->errorMessage unless ($quiet);
1008 $mconn->exec("ROLLBACK");
1009 return(-1);
1010 }
1011
1012 return(1);
1013 }
1014
1015 # stuff moved from perl scripts for better re-use
1016
1017 sub Rollback {
1018 my $conn = shift @_;
1019
1020 print STDERR $conn->errorMessage;
1021 $conn->exec("ROLLBACK");
1022 }
1023
1024 sub RollbackAndQuit {
1025 my $conn = shift @_;
1026
1027 Rollback($conn);
1028 exit (-1);
1029 }
1030
1031 sub Connect {
1032 my $info = shift @_;
1033
1034 print("Connecting to $info\n") if ($debug || $verbose);
1035 my $conn = Pg::connectdb($info);
1036 if ($conn->status != PGRES_CONNECTION_OK) {
1037 print STDERR "Failed opening $info\n";
1038 exit 1;
1039 }
1040 return $conn;
1041 }
1042
1043 sub Exec {
1044 my $conn = shift @_;
1045 my $sql = shift @_;
1046
1047 my $result = $conn->exec($sql);
1048 print STDERR "$sql\n" if ($debug);
1049 RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1050 }
1051
1052 sub Exec2 {
1053 my $mconn = shift @_;
1054 my $sconn = shift @_;
1055 my $sql = shift @_;
1056
1057 my $result = $mconn->exec($sql);
1058 RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1059 $result = $sconn->exec($sql);
1060 RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1061 }
1062
1063 sub MkInfo {
1064 my $db = shift || die "need database name!";
1065 my $host = shift;
1066 my $port = shift;
1067 my $user = shift;
1068 my $password = shift;
1069
1070 my $info = "dbname=$db";
1071 $info = "$info host=$host" if (defined($host));
1072 $info = "$info port=$port" if (defined($port));
1073 $info = "$info user=$user" if (defined($user));
1074 $info = "$info password=$password" if (defined($password));
1075
1076 return $info;
1077 }
1078
1079 1;

  ViewVC Help
Powered by ViewVC 1.1.26