/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.12 - (show annotations)
Sun Nov 2 10:21:47 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.11: +27 -1 lines
moved all info preparation into MkInfo in RServ.pm

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 Rollback RollbackAndQuit Connect Exec Exec2
11 $debug $quiet $verbose
12 );
13 @EXPORT_OK = qw();
14 use strict;
15 use Pg;
16
17 my $debug = 0;
18 my $quiet = 1;
19 my $verbose = 0;
20
21 $debug = 1;
22 $quiet = 0;
23 $verbose = 1;
24
25 my %Mtables = ();
26 my %Stables = ();
27
28 sub GetServerId
29 {
30 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31
32 print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33
34 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 " host='$Host' AND dbase='$DB'");
36
37 if ($result->resultStatus ne PGRES_TUPLES_OK)
38 {
39 print STDERR $mconn->errorMessage unless ($quiet);
40 return(-1);
41 }
42
43 if ($result->cmdTuples && $result->cmdTuples > 1)
44 {
45 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 return(-2);
47 }
48
49 my @row = $result->fetchrow;
50
51 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52
53 return $row[0];
54 }
55
56 sub PrepareSnapshot
57 {
58 my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
59
60 print STDERR "## d: $debug v: $verbose q: $quiet\n";
61
62 if ($mserver == $sserver) {
63 print STDERR "master and slave numbers are same [$mserver] !\n";
64 return(-1);
65 }
66
67 print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
68
69 # first, we must know for wich tables the slave subscribed
70 my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
71 if ($result->resultStatus ne PGRES_TUPLES_OK)
72 {
73 print STDERR $mconn->errorMessage unless ($quiet);
74 return(-1);
75 }
76
77 my @row;
78 while (@row = $result->fetchrow) {
79 $Stables{$row[0]} = 1;
80 }
81
82 $result = $mconn->exec("BEGIN");
83 if ($result->resultStatus ne PGRES_COMMAND_OK)
84 {
85 print STDERR $mconn->errorMessage unless ($quiet);
86 $mconn->exec("ROLLBACK");
87 return(-1);
88 }
89 $result = $mconn->exec("set transaction isolation level serializable");
90 if ($result->resultStatus ne PGRES_COMMAND_OK)
91 {
92 print STDERR $mconn->errorMessage unless ($quiet);
93 $mconn->exec("ROLLBACK");
94 return(-1);
95 }
96
97 # MAP oid --> tabname, keyname, key_type
98 $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
99 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
100 ", pg_type pgt".
101 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
102 " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
103 if ($result->resultStatus ne PGRES_TUPLES_OK)
104 {
105 print STDERR $mconn->errorMessage unless ($quiet);
106 $mconn->exec("ROLLBACK");
107 return(-1);
108 }
109
110 while (@row = $result->fetchrow)
111 {
112 # printf "$row[0], $row[1], $row[2]\n";
113 if (ref($onlytables) eq 'HASH') {
114 next unless (exists $onlytables->{$row[1]});
115 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
116 }
117 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
118 }
119
120 print "Prepare snapshot for tables with oid: ",join(",",keys %Mtables),"\n" if ($debug);
121
122 # Read last succeeded sync
123 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
124 " where server = $sserver AND syncid = (select max(syncid) from" .
125 " _RSERV_SYNC_ where server = $sserver AND status > 0)";
126
127 printf "$sql\n" if $debug;
128
129 $result = $mconn->exec($sql);
130 if ($result->resultStatus ne PGRES_TUPLES_OK)
131 {
132 print STDERR $mconn->errorMessage unless ($quiet);
133 $mconn->exec("ROLLBACK");
134 return(-1);
135 }
136
137 my @lastsync = $result->fetchrow;
138
139 # exclude data which originated from master server
140 my $sel_server = " and l.server = $mserver ";
141
142 my $sinfo = "";
143 if (@lastsync && $lastsync[3] ne '') # sync info
144 {
145 $sinfo = "and (l.logid >= $lastsync[3]";
146 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
147 $sinfo .= ")";
148 }
149
150 my $havedeal = 0;
151
152 # DELETED rows
153 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
154 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
155
156 printf "DELETED: $sql\n" if $debug;
157
158 $result = $mconn->exec($sql);
159 if ($result->resultStatus ne PGRES_TUPLES_OK)
160 {
161 print STDERR $mconn->errorMessage unless ($quiet);
162 $mconn->exec("ROLLBACK");
163 return(-1);
164 }
165
166 my $lastoid = -1;
167 while (@row = $result->fetchrow)
168 {
169 next unless exists $Mtables{$row[0]};
170 next unless exists $Stables{$Mtables{$row[0]}[0]};
171
172 if ($lastoid != $row[0])
173 {
174 if ($lastoid == -1)
175 {
176 my $syncid = GetSYNCID($mconn, $outf);
177 return($syncid) if $syncid < 0;
178 $havedeal = 1;
179 }
180 else
181 {
182 printf $outf "\\.\n";
183 }
184 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
185 $lastoid = $row[0];
186 }
187 if (! defined $row[1])
188 {
189 print STDERR "NULL key\n" unless ($quiet);
190 $mconn->exec("ROLLBACK");
191 return(-2);
192 }
193 printf $outf "%s\n", OutputValue($row[1]);
194 }
195 printf $outf "\\.\n" if ($lastoid != -1);
196
197 # UPDATED rows
198
199 my ($taboid, $tabname, $tabkey);
200 foreach $taboid (keys %Mtables)
201 {
202 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
203 next unless exists $Stables{$tabname};
204
205 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
206
207 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
208 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
209 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
210 $sel_server;
211
212 printf "UPDATED: $sql\n" if $debug;
213
214 $result = $mconn->exec($sql);
215 if ($result->resultStatus ne PGRES_TUPLES_OK)
216 {
217 printf $outf "-- ERROR\n" if $havedeal;
218 print STDERR $mconn->errorMessage unless ($quiet);
219 $mconn->exec("ROLLBACK");
220 return(-1);
221 }
222 next if $result->ntuples <= 0;
223 if (! $havedeal)
224 {
225 my $syncid = GetSYNCID($mconn, $outf);
226 return($syncid) if $syncid < 0;
227 $havedeal = 1;
228 }
229 printf $outf "-- UPDATE $tabname\n";
230 printf "-- UPDATE $tabname\n" if $debug;
231 while (@row = $result->fetchrow)
232 {
233 for (my $i = 0; $i <= $#row; $i++)
234 {
235 printf $outf " " if $i;
236 printf " " if $i && $debug;
237 printf $outf "%s", OutputValue($row[$i]);
238 printf "%s", OutputValue($row[$i]) if $debug;;
239 }
240 printf $outf "\n";
241 printf "\n" if $debug;
242 }
243 printf $outf "\\.\n";
244 printf "\\.\n" if $debug;;
245 }
246
247 # INSERTED rows
248
249 foreach $taboid (keys %Mtables)
250 {
251 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
252 next unless exists $Stables{$tabname};
253
254 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
255
256 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
257 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
258 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
259 $sel_server;
260
261 printf "INSERTED: $sql\n" if $debug;
262
263 $result = $mconn->exec($sql);
264 if ($result->resultStatus ne PGRES_TUPLES_OK)
265 {
266 printf $outf "-- ERROR\n" if $havedeal;
267 print STDERR $mconn->errorMessage unless ($quiet);
268 $mconn->exec("ROLLBACK");
269 return(-1);
270 }
271 next if $result->ntuples <= 0;
272 if (! $havedeal)
273 {
274 my $syncid = GetSYNCID($mconn, $outf);
275 return($syncid) if $syncid < 0;
276 $havedeal = 1;
277 }
278 printf $outf "-- INSERT $tabname\n";
279 printf "-- INSERT $tabname\n" if $debug;
280 while (@row = $result->fetchrow)
281 {
282 for (my $i = 0; $i <= $#row; $i++)
283 {
284 printf $outf " " if $i;
285 printf " " if $i && $debug;
286 printf $outf "%s", OutputValue($row[$i]);
287 printf "%s", OutputValue($row[$i]) if $debug;;
288 }
289 printf $outf "\n";
290 printf "\n" if $debug;
291 }
292 printf $outf "\\.\n";
293 printf "\\.\n" if $debug;;
294 }
295
296
297 unless ($havedeal)
298 {
299 $mconn->exec("ROLLBACK");
300 return(0);
301 }
302
303 # Remember this snapshot info
304 $result = $mconn->exec("select _rserv_sync_($sserver)");
305 if ($result->resultStatus ne PGRES_TUPLES_OK)
306 {
307 printf $outf "-- ERROR\n";
308 print STDERR $mconn->errorMessage unless ($quiet);
309 $mconn->exec("ROLLBACK");
310 return(-1);
311 }
312
313 $result = $mconn->exec("COMMIT");
314 if ($result->resultStatus ne PGRES_COMMAND_OK)
315 {
316 printf $outf "-- ERROR\n";
317 print STDERR $mconn->errorMessage unless ($quiet);
318 $mconn->exec("ROLLBACK");
319 return(-1);
320 }
321 printf $outf "-- OK\n";
322 printf "-- OK\n" if $debug;
323
324 return(1);
325
326 }
327
328 sub OutputValue
329 {
330 my ($val) = @_; # @_[0];
331
332 return("\\N") unless defined $val;
333
334 $val =~ s/\\/\\\\/g;
335 $val =~ s/ /\\011/g;
336 $val =~ s/\n/\\012/g;
337 $val =~ s/\'/\\047/g;
338
339 return($val);
340 }
341
342 # Get syncid for new snapshot
343 sub GetSYNCID
344 {
345 my ($conn, $outf) = @_; # (@_[0], @_[1]);
346
347 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
348 if ($result->resultStatus ne PGRES_TUPLES_OK)
349 {
350 print STDERR $conn->errorMessage unless ($quiet);
351 $conn->exec("ROLLBACK");
352 return(-1);
353 }
354
355 my @row = $result->fetchrow;
356
357 printf $outf "-- SYNCID $row[0]\n";
358 printf "-- SYNCID $row[0]\n" if $debug;
359 return($row[0]);
360 }
361
362
363 sub CleanLog
364 {
365 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
366
367 my $result = $conn->exec("BEGIN");
368 if ($result->resultStatus ne PGRES_COMMAND_OK)
369 {
370 print STDERR $conn->errorMessage unless ($quiet);
371 $conn->exec("ROLLBACK");
372 return(-1);
373 }
374
375 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
376 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
377 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
378
379 printf "$sql\n" if $debug;
380
381 $result = $conn->exec($sql);
382 if ($result->resultStatus ne PGRES_TUPLES_OK)
383 {
384 print STDERR $conn->errorMessage unless ($quiet);
385 return(-1);
386 }
387 my $maxid = '';
388 my %active = ();
389 while (my @row = $result->fetchrow)
390 {
391 $maxid = $row[0] if $maxid eq '';
392 last if $row[0] > $maxid;
393 my @ids = split(/[ ]+,[ ]+/, $row[1]);
394 foreach my $aid (@ids)
395 {
396 $active{$aid} = 1 unless exists $active{$aid};
397 }
398 }
399 if ($maxid eq '')
400 {
401 print STDERR "No Sync IDs\n" unless ($quiet);
402 return(0);
403 }
404 my $alist = join(',', keys %active);
405 my $sinfo = "logid < $maxid";
406 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
407 #if (ref($onlytables) eq 'HASH') {
408 # foreach my $onlytable (keys %{$onlytables}) {
409 # $sinfo
410 # }
411 #}
412 $sql = "delete from _RSERV_LOG_ where " .
413 "logtime < now() - '$howold second'::interval AND $sinfo";
414
415 printf "$sql\n" if $debug;
416
417 $result = $conn->exec($sql);
418 if ($result->resultStatus ne PGRES_COMMAND_OK)
419 {
420 print STDERR $conn->errorMessage unless ($quiet);
421 $conn->exec("ROLLBACK");
422 return(-1);
423 }
424 $maxid = $result->cmdTuples;
425
426 $result = $conn->exec("COMMIT");
427 if ($result->resultStatus ne PGRES_COMMAND_OK)
428 {
429 print STDERR $conn->errorMessage unless ($quiet);
430 $conn->exec("ROLLBACK");
431 return(-1);
432 }
433
434 return($maxid);
435 }
436
437 sub ApplySnapshot
438 {
439 my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
440
441 my $result = $sconn->exec("BEGIN");
442 if ($result->resultStatus ne PGRES_COMMAND_OK)
443 {
444 print STDERR $sconn->errorMessage unless ($quiet);
445 $sconn->exec("ROLLBACK");
446 return(-1);
447 }
448
449 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
450 if ($result->resultStatus ne PGRES_COMMAND_OK)
451 {
452 print STDERR $sconn->errorMessage unless ($quiet);
453 $sconn->exec("ROLLBACK");
454 return(-1);
455 }
456
457 # MAP name --> oid, keyname, keynum
458 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
459 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
460 " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
461 " AND pga.attnum = rt.key";
462
463 $result = $sconn->exec($sql);
464 if ($result->resultStatus ne PGRES_TUPLES_OK)
465 {
466 print STDERR $sconn->errorMessage unless ($quiet);
467 $sconn->exec("ROLLBACK");
468 return(-1);
469 }
470 %Stables = ();
471 while (my @row = $result->fetchrow)
472 {
473 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
474 if (ref($onlytables) eq 'HASH') {
475 next unless (exists $onlytables->{$row[1]});
476 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
477 }
478 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
479 }
480
481 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
482
483 my $ok = 0;
484 my $syncid = -1;
485 while(<$inpf>)
486 {
487 $_ =~ s/\n//;
488 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
489 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
490 if ($cmt ne '--')
491 {
492 printf STDERR "Invalid format\n" unless ($quiet);
493 $sconn->exec("ROLLBACK");
494 return(-2);
495 }
496 if ($cmd eq 'DELETE')
497 {
498 if ($syncid == -1)
499 {
500 printf STDERR "Sync ID unspecified\n" unless ($quiet);
501 $sconn->exec("ROLLBACK");
502 return(-2);
503 }
504 $result = DoDelete($sconn, $inpf, $prm);
505 if ($result)
506 {
507 $sconn->exec("ROLLBACK");
508 return($result);
509 }
510 }
511 elsif ($cmd eq 'INSERT')
512 {
513 if ($syncid == -1)
514 {
515 printf STDERR "Sync ID unspecified\n" unless ($quiet);
516 $sconn->exec("ROLLBACK");
517 return(-2);
518 }
519 $result = DoInsert($sconn, $inpf, $prm);
520 if ($result)
521 {
522 $sconn->exec("ROLLBACK");
523 return($result);
524 }
525 }
526 elsif ($cmd eq 'UPDATE')
527 {
528 if ($syncid == -1)
529 {
530 printf STDERR "Sync ID unspecified\n" unless ($quiet);
531 $sconn->exec("ROLLBACK");
532 return(-2);
533 }
534 $result = DoUpdate($sconn, $inpf, $prm);
535 if ($result)
536 {
537 $sconn->exec("ROLLBACK");
538 return($result);
539 }
540 }
541 elsif ($cmd eq 'SYNCID')
542 {
543 if ($syncid != -1)
544 {
545 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
546 $sconn->exec("ROLLBACK");
547 return(-2);
548 }
549 if ($prm !~ /^\d+$/)
550 {
551 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
552 $sconn->exec("ROLLBACK");
553 return(-2);
554 }
555 $syncid = $prm;
556
557 printf STDERR "Sync ID $syncid\n" unless ($quiet);
558
559 $result = $sconn->exec("select syncid, synctime from " .
560 "_RSERV_SLAVE_SYNC_ where syncid = " .
561 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
562 if ($result->resultStatus ne PGRES_TUPLES_OK)
563 {
564 print STDERR $sconn->errorMessage unless ($quiet);
565 $sconn->exec("ROLLBACK");
566 return(-1);
567 }
568 my @row = $result->fetchrow;
569 if (! defined $row[0])
570 {
571 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
572 "(syncid, synctime) values ($syncid, now())");
573 }
574 elsif ($row[0] >= $prm)
575 {
576 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
577 $sconn->exec("ROLLBACK");
578 return(0);
579 }
580 else
581 {
582 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
583 " set syncid = $syncid, synctime = now()");
584 }
585 if ($result->resultStatus ne PGRES_COMMAND_OK)
586 {
587 print STDERR $sconn->errorMessage unless ($quiet);
588 $sconn->exec("ROLLBACK");
589 return(-1);
590 }
591 }
592 elsif ($cmd eq 'OK')
593 {
594 $ok = 1;
595 last;
596 }
597 elsif ($cmd eq 'ERROR')
598 {
599 printf STDERR "ERROR signaled\n" unless ($quiet);
600 $sconn->exec("ROLLBACK");
601 return(-2);
602 }
603 else
604 {
605 printf STDERR "Unknown command $cmd\n" unless ($quiet);
606 $sconn->exec("ROLLBACK");
607 return(-2);
608 }
609 }
610
611 if (! $ok)
612 {
613 printf STDERR "No OK flag in input\n" unless ($quiet);
614 $sconn->exec("ROLLBACK");
615 return(-2);
616 }
617
618 $result = $sconn->exec("COMMIT");
619 if ($result->resultStatus ne PGRES_COMMAND_OK)
620 {
621 print STDERR $sconn->errorMessage unless ($quiet);
622 $sconn->exec("ROLLBACK");
623 return(-1);
624 }
625
626 return(1);
627 }
628
629 sub DoDelete
630 {
631 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
632
633 # only delete tables that the slave wants
634 if (! defined($Stables{$tabname})) {
635 print "Not configured to delete rows from table $tabname\n" unless $quiet;
636 while (<$inpf>) {
637 my $istring = $_;
638 $istring =~ s/\n//;
639 last if ($istring eq '\.');
640 }
641 return(0);
642 }
643
644 my $ok = 0;
645 while(<$inpf>)
646 {
647 if ($_ !~ /\n$/)
648 {
649 printf STDERR "Invalid format\n" unless ($quiet);
650 return(-2);
651 }
652 my $key = $_;
653 $key =~ s/\n//;
654 if ($key eq '\.')
655 {
656 $ok = 1;
657 last;
658 }
659
660 my $sql = "delete from \"$tabname\" where ".
661 "\"$Stables{$tabname}->[1]\" = '$key'";
662
663 printf "$sql\n" if $debug;
664
665 my $result = $sconn->exec($sql);
666 if ($result->resultStatus ne PGRES_COMMAND_OK)
667 {
668 print STDERR $sconn->errorMessage unless ($quiet);
669 return(-1);
670 }
671 }
672
673 if (! $ok)
674 {
675 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
676 return(-2);
677 }
678
679 return(0);
680 }
681
682
683 sub DoUpdate
684 {
685 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
686
687 # only update the tables that the slave wants
688 if (! defined($Stables{$tabname})) {
689 print "Not configured to update rows from table $tabname\n" unless $quiet;
690 while (<$inpf>) {
691 my $istring = $_;
692 $istring =~ s/\n//;
693 last if ($istring eq '\.');
694 }
695 return(0);
696 }
697
698 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
699
700 my @CopyBuf = ();
701 my $CBufLen = 0;
702 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
703
704 my $sql = "select attnum, attname from pg_attribute" .
705 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
706
707 my $result = $sconn->exec($sql);
708 if ($result->resultStatus ne PGRES_TUPLES_OK)
709 {
710 print STDERR $sconn->errorMessage unless ($quiet);
711 return(-1);
712 }
713
714 my @anames = ();
715 while (my @row = $result->fetchrow)
716 {
717 $anames[$row[0]] = $row[1];
718 }
719
720 my $istring;
721 my $ok = 0;
722 while(<$inpf>)
723 {
724 if ($_ !~ /\n$/)
725 {
726 printf STDERR "Invalid format\n" unless ($quiet);
727 return(-2);
728 }
729 $istring = $_;
730 $istring =~ s/\n//;
731 if ($istring eq '\.')
732 {
733 $ok = 1;
734 last;
735 }
736 my @vals = split(/ /, $istring);
737 if ($oidkey)
738 {
739 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
740 {
741 printf STDERR "Invalid OID\n" unless ($quiet);
742 return(-2);
743 }
744 $oidkey = $vals[0];
745 }
746 else
747 {
748 unshift @vals, '';
749 }
750
751 $sql = "update \"$tabname\" set ";
752 my $ocnt = 0;
753 for (my $i = 1; $i <= $#anames; $i++)
754 {
755 if ($vals[$i] eq '\N')
756 {
757 if ($i == $Stables{$tabname}->[2])
758 {
759 printf STDERR "NULL key\n" unless ($quiet);
760 return(-2);
761 }
762 $vals[$i] = 'null';
763 }
764 else
765 {
766 $vals[$i] = "'" . $vals[$i] . "'";
767 next if $i == $Stables{$tabname}->[2];
768 }
769 $ocnt++;
770 $sql .= ', ' if $ocnt > 1;
771 $sql .= "\"$anames[$i]\" = $vals[$i]";
772 }
773 if ($oidkey)
774 {
775 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
776 }
777 else
778 {
779 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
780 $vals[$Stables{$tabname}->[2]];
781 }
782
783 printf "$sql\n" if $debug;
784
785 $result = $sconn->exec($sql);
786
787 if ($result->resultStatus ne PGRES_COMMAND_OK)
788 {
789 print STDERR $sconn->errorMessage unless ($quiet);
790 return(-1);
791 }
792 next if $result->cmdTuples == 1; # updated
793
794 if ($result->cmdTuples > 1)
795 {
796 printf STDERR "Duplicate keys\n" unless ($quiet);
797 return(-2);
798 }
799
800 # no key - copy
801 push @CopyBuf, "$istring\n";
802 $CBufLen += length($istring);
803
804 if ($CBufLen >= $CBufMax)
805 {
806 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
807 return($result) if $result;
808 @CopyBuf = ();
809 $CBufLen = 0;
810 }
811 }
812
813 if (! $ok)
814 {
815 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
816 return(-2);
817 }
818
819 if ($CBufLen)
820 {
821 print "@CopyBuf\n" if $debug;
822 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
823 return($result) if $result;
824 }
825
826 return(0);
827 }
828
829 sub DoInsert
830 {
831 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
832
833 # only insert rows into tables that the slave wants
834 if (! defined($Stables{$tabname})) {
835 print "Not configured to insert rows from table $tabname\n" unless $quiet;
836 while (<$inpf>) {
837 my $istring = $_;
838 $istring =~ s/\n//;
839 last if ($istring eq '\.');
840 }
841 return(0);
842 }
843
844 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
845
846 my @CopyBuf = ();
847 my $CBufLen = 0;
848 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
849
850 my $istring;
851 my $ok = 0;
852 while(<$inpf>)
853 {
854 if ($_ !~ /\n$/)
855 {
856 printf STDERR "Invalid format\n" unless ($quiet);
857 return(-2);
858 }
859 $istring = $_;
860 $istring =~ s/\n//;
861 if ($istring eq '\.')
862 {
863 $ok = 1;
864 last;
865 }
866
867 # no key - copy
868 push @CopyBuf, "$istring\n";
869 $CBufLen += length($istring);
870
871 if ($CBufLen >= $CBufMax)
872 {
873 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
874 return($result) if $result;
875 @CopyBuf = ();
876 $CBufLen = 0;
877 }
878 }
879
880 if (! $ok)
881 {
882 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
883 return(-2);
884 }
885
886 if ($CBufLen)
887 {
888 print "@CopyBuf\n" if $debug;
889 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
890 return($result) if $result;
891 }
892
893 return(0);
894 }
895
896
897 sub DoCopy
898 {
899 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
900
901 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
902 "FROM STDIN";
903 my $result = $sconn->exec($sql);
904 if ($result->resultStatus ne PGRES_COPY_IN)
905 {
906 print STDERR $sconn->errorMessage unless ($quiet);
907 return(-1);
908 }
909
910 foreach my $str (@{$CBuf})
911 {
912 $sconn->putline($str);
913 }
914
915 $sconn->putline("\\.\n");
916
917 if ($sconn->endcopy)
918 {
919 print STDERR $sconn->errorMessage unless ($quiet);
920 return(-1);
921 }
922
923 return(0);
924 }
925
926
927 #
928 # Returns last SyncID applied on Slave
929 #
930 sub GetSyncID
931 {
932 my ($sconn) = @_; # (@_[0]);
933
934 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
935 if ($result->resultStatus ne PGRES_TUPLES_OK)
936 {
937 print STDERR $sconn->errorMessage unless ($quiet);
938 return(-1);
939 }
940 my @row = $result->fetchrow;
941 return(undef) unless defined $row[0]; # null
942 return($row[0]);
943 }
944
945 #
946 # Updates _RSERV_SYNC_ on Master with Slave SyncID
947 #
948 sub SyncSyncID
949 {
950 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
951
952 my $result = $mconn->exec("BEGIN");
953 if ($result->resultStatus ne PGRES_COMMAND_OK)
954 {
955 print STDERR $mconn->errorMessage unless ($quiet);
956 $mconn->exec("ROLLBACK");
957 return(-1);
958 }
959
960 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
961 " where server = $sserver AND syncid = $syncid" .
962 " for update");
963 if ($result->resultStatus ne PGRES_TUPLES_OK)
964 {
965 print STDERR $mconn->errorMessage unless ($quiet);
966 $mconn->exec("ROLLBACK");
967 return(-1);
968 }
969 my @row = $result->fetchrow;
970 if (! defined $row[0])
971 {
972 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
973 $mconn->exec("ROLLBACK");
974 return(0);
975 }
976 if ($row[1] > 0)
977 {
978 printf STDERR "SyncID $syncid for server ".
979 "$sserver already updated\n" unless ($quiet);
980 $mconn->exec("ROLLBACK");
981 return(0);
982 }
983 $result = $mconn->exec("update _RSERV_SYNC_" .
984 " set synctime = now(), status = 1" .
985 " where server = $sserver AND syncid = $syncid");
986 if ($result->resultStatus ne PGRES_COMMAND_OK)
987 {
988 print STDERR $mconn->errorMessage unless ($quiet);
989 $mconn->exec("ROLLBACK");
990 return(-1);
991 }
992 $result = $mconn->exec("delete from _RSERV_SYNC_" .
993 " where server = $sserver AND syncid < $syncid");
994 if ($result->resultStatus ne PGRES_COMMAND_OK)
995 {
996 print STDERR $mconn->errorMessage unless ($quiet);
997 $mconn->exec("ROLLBACK");
998 return(-1);
999 }
1000
1001 $result = $mconn->exec("COMMIT");
1002 if ($result->resultStatus ne PGRES_COMMAND_OK)
1003 {
1004 print STDERR $mconn->errorMessage unless ($quiet);
1005 $mconn->exec("ROLLBACK");
1006 return(-1);
1007 }
1008
1009 return(1);
1010 }
1011
1012 # stuff moved from perl scripts for better re-use
1013
1014 sub Rollback {
1015 my $conn = shift @_;
1016
1017 print STDERR $conn->errorMessage;
1018 $conn->exec("ROLLBACK");
1019 }
1020
1021 sub RollbackAndQuit {
1022 my $conn = shift @_;
1023
1024 Rollback($conn);
1025 exit (-1);
1026 }
1027
1028 sub Connect {
1029 my $info = shift @_;
1030
1031 print("Connecting to $info\n") if ($debug || $verbose);
1032 my $conn = Pg::connectdb($info);
1033 if ($conn->status != PGRES_CONNECTION_OK) {
1034 print STDERR "Failed opening $info\n";
1035 exit 1;
1036 }
1037 return $conn;
1038 }
1039
1040 sub Exec {
1041 my $conn = shift @_;
1042 my $sql = shift @_;
1043
1044 my $result = $conn->exec($sql);
1045 print STDERR "$sql\n" if ($debug);
1046 RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1047 }
1048
1049 sub Exec2 {
1050 my $mconn = shift @_;
1051 my $sconn = shift @_;
1052 my $sql = shift @_;
1053
1054 my $result = $mconn->exec($sql);
1055 RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1056 $result = $sconn->exec($sql);
1057 RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1058 }
1059
1060 sub MkInfo {
1061 my $db = shift || die "need database name!";
1062 my $host = shift;
1063 my $port = shift;
1064 my $user = shift;
1065 my $password = shift;
1066
1067 my $info = "dbname=$db";
1068 $info = "$info host=$host" if (defined($host));
1069 $info = "$info port=$port" if (defined($port));
1070 $info = "$info user=$user" if (defined($user));
1071 $info = "$info password=$password" if (defined($password));
1072
1073 return $info;
1074 }
1075
1076 1;

  ViewVC Help
Powered by ViewVC 1.1.26