/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.18 - (show annotations)
Mon Nov 3 21:30:39 2003 UTC (20 years, 5 months ago) by dpavlin
Branch: MAIN
CVS Tags: HEAD
Changes since 1.17: +27 -73 lines
added _rserv_xid_ function to return current transaction xid and removed
all perl cludgery about keys (as well as KEYS directive from snapshots)

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11 $debug $quiet $verbose
12 );
13 @EXPORT_OK = qw();
14 use strict;
15 use Pg;
16
17 my $debug = 0;
18 my $quiet = 1;
19 my $verbose = 0;
20
21 $debug = 1;
22 $quiet = 0;
23 $verbose = 1;
24
25 my %Mtables = ();
26 my %Stables = ();
27
28 sub GetServerId
29 {
30 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31
32 print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33
34 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 " host='$Host' AND dbase='$DB'");
36
37 if ($result->resultStatus ne PGRES_TUPLES_OK)
38 {
39 print STDERR $mconn->errorMessage unless ($quiet);
40 return(-1);
41 }
42
43 if ($result->cmdTuples && $result->cmdTuples > 1)
44 {
45 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 return(-2);
47 }
48
49 my @row = $result->fetchrow;
50
51 print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52
53 return $row[0];
54 }
55
56 sub PrepareSnapshot
57 {
58 my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59
60 if ($mserver == $sserver) {
61 print STDERR "master and slave numbers are same [$mserver] !\n";
62 return(-1);
63 }
64
65 print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66
67 # dump master server ID into snapshot file (to prevent replication
68 # of colums from master back to slave)
69 print $outf "-- SERVER $mserver\n";
70
71 # first, we must know for wich tables the slave subscribed
72 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73 return (-1) if ($result == -1);
74
75 my @row;
76 while (@row = $result->fetchrow) {
77 $Stables{$row[0]} = 1;
78 }
79
80 print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81
82 Exec($mconn,"BEGIN");
83 Exec($mconn,"set transaction isolation level serializable");
84
85 # MAP oid --> tabname, keyname, key_type
86 my $sql = qq{
87 select pgc.oid, pgc.relname, pga.attname, pgt.typname
88 from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89 pg_type pgt
90 where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91 AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92 };
93 $result = Exec($mconn,$sql);
94
95 while (@row = $result->fetchrow)
96 {
97 printf "$row[0], $row[1], $row[2]\n" if ($debug);
98 if (ref($onlytables) eq 'HASH') {
99 next unless (exists $onlytables->{$row[1]});
100 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101 }
102 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103 }
104
105 print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106 if (! %Mtables) {
107 print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108 RollbackAndQuit($mconn);
109 }
110
111 # Read last succeeded sync
112 $sql = qq{
113 select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114 where server = $sserver AND syncid =
115 (select max(syncid) from _RSERV_SYNC_
116 where server = $sserver AND status > 0)
117 };
118
119 $result = Exec($mconn,$sql);
120
121 my @lastsync = $result->fetchrow;
122 print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123
124 # exclude data which originated from master server
125 my $sel_server = " and l.server = $mserver ";
126
127 my $sinfo = "";
128 if (@lastsync && $lastsync[3] ne '') # sync info
129 {
130 $sinfo = "and (l.logid >= $lastsync[3]";
131 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132 $sinfo .= ")";
133 }
134
135 my $havedeal = 0;
136
137 # DELETED rows
138 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
139 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
140
141 printf "DELETED: $sql\n" if $debug;
142
143 $result = $mconn->exec($sql);
144 if ($result->resultStatus ne PGRES_TUPLES_OK) {
145 print STDERR $mconn->errorMessage unless ($quiet);
146 $mconn->exec("ROLLBACK");
147 return(-1);
148 }
149
150 my $lastoid = -1;
151 while (@row = $result->fetchrow) {
152 next unless exists $Mtables{$row[0]};
153 next unless exists $Stables{$Mtables{$row[0]}[0]};
154
155 if ($lastoid != $row[0]) {
156 if ($lastoid == -1) {
157 my $syncid = GetSYNCID($mconn, $outf);
158 return($syncid) if $syncid < 0;
159 $havedeal = 1;
160 } else {
161 printf $outf "\\.\n";
162 }
163 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
164 $lastoid = $row[0];
165 }
166 if (! defined $row[1]) {
167 print STDERR "NULL key\n" unless ($quiet);
168 $mconn->exec("ROLLBACK");
169 return(-2);
170 }
171 printf $outf "%s\n", OutputValue($row[1]);
172 }
173 printf $outf "\\.\n" if ($lastoid != -1);
174
175 # UPDATED rows
176
177 my ($taboid, $tabname, $tabkey);
178 foreach $taboid (keys %Mtables)
179 {
180 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
181 next unless exists $Stables{$tabname};
182
183 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
184
185 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
186 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
187 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
188 $sel_server;
189
190 printf "UPDATED: $sql\n" if $debug;
191
192 $result = $mconn->exec($sql);
193 if ($result->resultStatus ne PGRES_TUPLES_OK)
194 {
195 printf $outf "-- ERROR\n" if $havedeal;
196 print STDERR $mconn->errorMessage unless ($quiet);
197 $mconn->exec("ROLLBACK");
198 return(-1);
199 }
200 next if $result->ntuples <= 0;
201 if (! $havedeal)
202 {
203 my $syncid = GetSYNCID($mconn, $outf);
204 return($syncid) if $syncid < 0;
205 $havedeal = 1;
206 }
207 printf $outf "-- UPDATE $tabname\n";
208 printf "-- UPDATE $tabname\n" if $debug;
209 while (@row = $result->fetchrow)
210 {
211 for (my $i = 0; $i <= $#row; $i++)
212 {
213 printf $outf " " if $i;
214 printf " " if $i && $debug;
215 printf $outf "%s", OutputValue($row[$i]);
216 printf "%s", OutputValue($row[$i]) if $debug;;
217 }
218 printf $outf "\n";
219 printf "\n" if $debug;
220 }
221 printf $outf "\\.\n";
222 printf "\\.\n" if $debug;;
223 }
224
225 # INSERTED rows
226
227 foreach $taboid (keys %Mtables)
228 {
229 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
230 next unless exists $Stables{$tabname};
231
232 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
233
234 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
235 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
236 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
237 $sel_server;
238
239 printf "INSERTED: $sql\n" if $debug;
240
241 $result = $mconn->exec($sql);
242 if ($result->resultStatus ne PGRES_TUPLES_OK)
243 {
244 printf $outf "-- ERROR\n" if $havedeal;
245 print STDERR $mconn->errorMessage unless ($quiet);
246 $mconn->exec("ROLLBACK");
247 return(-1);
248 }
249 next if $result->ntuples <= 0;
250 if (! $havedeal)
251 {
252 my $syncid = GetSYNCID($mconn, $outf);
253 return($syncid) if $syncid < 0;
254 $havedeal = 1;
255 }
256 printf $outf "-- INSERT $tabname\n";
257 printf "-- INSERT $tabname\n" if $debug;
258 while (@row = $result->fetchrow)
259 {
260 for (my $i = 0; $i <= $#row; $i++)
261 {
262 printf $outf " " if $i;
263 printf " " if $i && $debug;
264 printf $outf "%s", OutputValue($row[$i]);
265 printf "%s", OutputValue($row[$i]) if $debug;
266 }
267 printf $outf "\n";
268 printf "\n" if $debug;
269 }
270 printf $outf "\\.\n";
271 printf "\\.\n" if $debug;;
272 }
273
274
275 unless ($havedeal)
276 {
277 print STDERR "hon't have deal, rollback...\n" if ($debug);
278 $mconn->exec("ROLLBACK");
279 return(0);
280 }
281
282 # Remember this snapshot info
283 $result = $mconn->exec("select _rserv_sync_($sserver)");
284 if ($result->resultStatus ne PGRES_TUPLES_OK)
285 {
286 printf $outf "-- ERROR\n";
287 print STDERR $mconn->errorMessage unless ($quiet);
288 $mconn->exec("ROLLBACK");
289 return(-1);
290 }
291
292 $result = $mconn->exec("COMMIT");
293 if ($result->resultStatus ne PGRES_COMMAND_OK)
294 {
295 printf $outf "-- ERROR\n";
296 print STDERR $mconn->errorMessage unless ($quiet);
297 $mconn->exec("ROLLBACK");
298 return(-1);
299 }
300
301 printf $outf "-- OK\n";
302 printf "-- OK\n" if $debug;
303
304 return(1);
305
306 }
307
308 sub OutputValue
309 {
310 my ($val) = @_; # @_[0];
311
312 return("\\N") unless defined $val;
313
314 $val =~ s/\\/\\\\/g;
315 $val =~ s/ /\\011/g;
316 $val =~ s/\n/\\012/g;
317 $val =~ s/\'/\\047/g;
318
319 return($val);
320 }
321
322 # Get syncid for new snapshot
323 sub GetSYNCID
324 {
325 my ($conn, $outf) = @_; # (@_[0], @_[1]);
326
327 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
328 if ($result->resultStatus ne PGRES_TUPLES_OK)
329 {
330 print STDERR $conn->errorMessage unless ($quiet);
331 $conn->exec("ROLLBACK");
332 return(-1);
333 }
334
335 my @row = $result->fetchrow;
336
337 printf $outf "-- SYNCID $row[0]\n";
338 printf "-- SYNCID $row[0]\n" if $debug;
339 return($row[0]);
340 }
341
342
343 sub CleanLog
344 {
345 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
346
347 my $result = $conn->exec("BEGIN");
348 if ($result->resultStatus ne PGRES_COMMAND_OK)
349 {
350 print STDERR $conn->errorMessage unless ($quiet);
351 $conn->exec("ROLLBACK");
352 return(-1);
353 }
354
355 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
356 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
357 " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
358
359 printf "$sql\n" if $debug;
360
361 $result = $conn->exec($sql);
362 if ($result->resultStatus ne PGRES_TUPLES_OK)
363 {
364 print STDERR $conn->errorMessage unless ($quiet);
365 return(-1);
366 }
367 my $maxid = '';
368 my %active = ();
369 while (my @row = $result->fetchrow)
370 {
371 $maxid = $row[0] if $maxid eq '';
372 last if $row[0] > $maxid;
373 my @ids = split(/[ ]+,[ ]+/, $row[1]);
374 foreach my $aid (@ids)
375 {
376 $active{$aid} = 1 unless exists $active{$aid};
377 }
378 }
379 if ($maxid eq '')
380 {
381 print STDERR "No Sync IDs\n" unless ($quiet);
382 return(0);
383 }
384 my $alist = join(',', keys %active);
385 my $sinfo = "logid < $maxid";
386 $sinfo .= " AND logid not in ($alist)" if $alist ne '';
387 #if (ref($onlytables) eq 'HASH') {
388 # foreach my $onlytable (keys %{$onlytables}) {
389 # $sinfo
390 # }
391 #}
392 $sql = "delete from _RSERV_LOG_ where " .
393 "logtime < now() - '$howold second'::interval AND $sinfo";
394
395 printf "$sql\n" if $debug;
396
397 $result = $conn->exec($sql);
398 if ($result->resultStatus ne PGRES_COMMAND_OK)
399 {
400 print STDERR $conn->errorMessage unless ($quiet);
401 $conn->exec("ROLLBACK");
402 return(-1);
403 }
404 $maxid = $result->cmdTuples;
405
406 $result = $conn->exec("COMMIT");
407 if ($result->resultStatus ne PGRES_COMMAND_OK)
408 {
409 print STDERR $conn->errorMessage unless ($quiet);
410 $conn->exec("ROLLBACK");
411 return(-1);
412 }
413
414 return($maxid);
415 }
416
417 sub ApplySnapshot
418 {
419 my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
420
421 my $serverId;
422
423 my $result = $sconn->exec("BEGIN");
424 if ($result->resultStatus ne PGRES_COMMAND_OK) {
425 print STDERR $sconn->errorMessage unless ($quiet);
426 $sconn->exec("ROLLBACK");
427 return(-1);
428 }
429
430 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
431 if ($result->resultStatus ne PGRES_COMMAND_OK) {
432 print STDERR $sconn->errorMessage unless ($quiet);
433 $sconn->exec("ROLLBACK");
434 return(-1);
435 }
436
437 # MAP name --> oid, keyname, keynum
438 my $sql = qq{
439 select pgc.oid, pgc.relname, pga.attname, rt.key
440 from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
441 where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
442 AND pga.attnum = rt.key
443 };
444
445 $result = $sconn->exec($sql);
446 if ($result->resultStatus ne PGRES_TUPLES_OK) {
447 print STDERR $sconn->errorMessage unless ($quiet);
448 $sconn->exec("ROLLBACK");
449 return(-1);
450 }
451 %Stables = ();
452 while (my @row = $result->fetchrow) {
453 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
454 if (ref($onlytables) eq 'HASH') {
455 next unless (exists $onlytables->{$row[1]});
456 $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
457 }
458 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
459 }
460
461 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
462
463 my $ok = 0;
464 my $syncid = -1;
465 while(<$inpf>) {
466 $_ =~ s/\n//;
467 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
468 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
469 if ($cmt ne '--') {
470 printf STDERR "Invalid format\n" unless ($quiet);
471 $sconn->exec("ROLLBACK");
472 return(-2);
473 }
474 if ($cmd eq 'DELETE') {
475 if ($syncid == -1) {
476 printf STDERR "Sync ID unspecified\n" unless ($quiet);
477 $sconn->exec("ROLLBACK");
478 return(-2);
479 }
480 $result = DoDelete($sconn, $inpf, $prm);
481 if ($result) {
482 $sconn->exec("ROLLBACK");
483 return($result);
484 }
485 } elsif ($cmd eq 'INSERT') {
486 if ($syncid == -1) {
487 printf STDERR "Sync ID unspecified\n" unless ($quiet);
488 $sconn->exec("ROLLBACK");
489 return(-2);
490 }
491 $result = DoInsert($sconn, $inpf, $prm);
492 if ($result) {
493 $sconn->exec("ROLLBACK");
494 return($result);
495 }
496 } elsif ($cmd eq 'UPDATE') {
497 if ($syncid == -1) {
498 printf STDERR "Sync ID unspecified\n" unless ($quiet);
499 $sconn->exec("ROLLBACK");
500 return(-2);
501 }
502 $result = DoUpdate($sconn, $inpf, $prm);
503 if ($result) {
504 $sconn->exec("ROLLBACK");
505 return($result);
506 }
507 } elsif ($cmd eq 'SYNCID') {
508 if ($syncid != -1) {
509 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
510 $sconn->exec("ROLLBACK");
511 return(-2);
512 }
513 if ($prm !~ /^\d+$/) {
514 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
515 $sconn->exec("ROLLBACK");
516 return(-2);
517 }
518 $syncid = $prm;
519
520 printf STDERR "Sync ID $syncid\n" unless ($quiet);
521
522 $result = $sconn->exec(qq{
523 select syncid, synctime
524 from _RSERV_SLAVE_SYNC_
525 where syncid =
526 (select max(syncid) from _RSERV_SLAVE_SYNC_)
527 });
528 if ($result->resultStatus ne PGRES_TUPLES_OK) {
529 print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
530 $sconn->exec("ROLLBACK");
531 return(-1);
532 }
533
534 my @row = $result->fetchrow;
535 print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
536 if (! defined $row[0]) {
537 $result = Exec($sconn,qq{
538 insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
539 values ($syncid, now())
540 });
541 } elsif ($row[0] >= $prm) {
542 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
543 $sconn->exec("ROLLBACK");
544 return(0);
545 } else {
546 $result = Exec($sconn,qq{
547 update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
548 });
549 }
550 if ($result->resultStatus ne PGRES_COMMAND_OK) {
551 print STDERR $sconn->errorMessage unless ($quiet);
552 $sconn->exec("ROLLBACK");
553 return(-1);
554 }
555 } elsif ($cmd eq 'OK') {
556 $ok = 1;
557 if ($multimaster) {
558 # now, update server in _rserv_log_ based on transaction xid
559 ExecFatch($sconn,"select count(*) from _rserv_log_");
560 ExecDebug($sconn,"select * from _rserv_log_");
561 my $keys_sql = qq{
562 update _rserv_log_ set server=$serverId
563 where logid = (select _rserv_xid_())
564 };
565
566 Exec($sconn,$keys_sql);
567 }
568 last;
569 } elsif ($cmd eq 'ERROR') {
570 printf STDERR "ERROR signaled\n" unless ($quiet);
571 $sconn->exec("ROLLBACK");
572 return(-2);
573 } elsif ($cmd eq 'SERVER') {
574 if ($prm !~ /^\d+$/) {
575 printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
576 $sconn->exec("ROLLBACK");
577 return(-2);
578 }
579 $serverId = $prm;
580 print STDERR "Server ID $serverId\n" unless ($quiet);
581 } else {
582 printf STDERR "Unknown command $cmd\n" unless ($quiet);
583 $sconn->exec("ROLLBACK");
584 return(-2);
585 }
586 }
587
588 if (! $ok) {
589 printf STDERR "No OK flag in input\n" unless ($quiet);
590 $sconn->exec("ROLLBACK");
591 return(-2);
592 }
593
594 $result = $sconn->exec("COMMIT");
595 if ($result->resultStatus ne PGRES_COMMAND_OK) {
596 print STDERR $sconn->errorMessage unless ($quiet);
597 $sconn->exec("ROLLBACK");
598 return(-1);
599 }
600
601 return(1);
602 }
603
604 sub DoDelete
605 {
606 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
607
608 # only delete tables that the slave wants
609 if (! defined($Stables{$tabname})) {
610 print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
611 while (<$inpf>) {
612 my $istring = $_;
613 $istring =~ s/\n//;
614 last if ($istring eq '\.');
615 }
616 return(0);
617 }
618
619 my $ok = 0;
620 while(<$inpf>)
621 {
622 if ($_ !~ /\n$/)
623 {
624 printf STDERR "Invalid format\n" unless ($quiet);
625 return(-2);
626 }
627 my $key = $_;
628 $key =~ s/\n//;
629 if ($key eq '\.')
630 {
631 $ok = 1;
632 last;
633 }
634
635 my $sql = "delete from \"$tabname\" where ".
636 "\"$Stables{$tabname}->[1]\" = '$key'";
637
638 printf "$sql\n" if $debug;
639
640 my $result = $sconn->exec($sql);
641 if ($result->resultStatus ne PGRES_COMMAND_OK)
642 {
643 print STDERR $sconn->errorMessage unless ($quiet);
644 return(-1);
645 }
646 }
647
648 if (! $ok)
649 {
650 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
651 return(-2);
652 }
653
654 return(0);
655 }
656
657
658 sub DoUpdate
659 {
660 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
661
662 # only update the tables that the slave wants
663 if (! defined($Stables{$tabname})) {
664 print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
665 while (<$inpf>) {
666 my $istring = $_;
667 $istring =~ s/\n//;
668 last if ($istring eq '\.');
669 }
670 return(0);
671 }
672
673 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
674
675 my @CopyBuf = ();
676 my $CBufLen = 0;
677 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
678
679 my $sql = "select attnum, attname from pg_attribute" .
680 " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
681
682 my $result = $sconn->exec($sql);
683 if ($result->resultStatus ne PGRES_TUPLES_OK)
684 {
685 print STDERR $sconn->errorMessage unless ($quiet);
686 return(-1);
687 }
688
689 my @anames = ();
690 while (my @row = $result->fetchrow) {
691 $anames[$row[0]] = $row[1];
692 }
693
694 my $istring;
695 my $ok = 0;
696 while(<$inpf>) {
697 if ($_ !~ /\n$/) {
698 printf STDERR "Invalid format\n" unless ($quiet);
699 return(-2);
700 }
701 $istring = $_;
702 $istring =~ s/\n//;
703 if ($istring eq '\.') {
704 $ok = 1;
705 last;
706 }
707 my @vals = split(/ /, $istring);
708 if ($oidkey) {
709 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
710 printf STDERR "Invalid OID\n" unless ($quiet);
711 return(-2);
712 }
713 $oidkey = $vals[0];
714 } else {
715 unshift @vals, '';
716 }
717
718 $sql = "update \"$tabname\" set ";
719 my $ocnt = 0;
720 for (my $i = 1; $i <= $#anames; $i++) {
721 if ($vals[$i] eq '\N') {
722 if ($i == $Stables{$tabname}->[2]) {
723 printf STDERR "NULL key\n" unless ($quiet);
724 return(-2);
725 }
726 $vals[$i] = 'null';
727 } else {
728 $vals[$i] = "'" . $vals[$i] . "'";
729 next if $i == $Stables{$tabname}->[2];
730 }
731 $ocnt++;
732 $sql .= ', ' if $ocnt > 1;
733 $sql .= "\"$anames[$i]\" = $vals[$i]";
734 } if ($oidkey) {
735 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
736 } else {
737 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
738 $vals[$Stables{$tabname}->[2]];
739 }
740
741 printf "$sql\n" if $debug;
742
743 $result = $sconn->exec($sql);
744
745 if ($result->resultStatus ne PGRES_COMMAND_OK) {
746 print STDERR $sconn->errorMessage unless ($quiet);
747 return(-1);
748 }
749 next if $result->cmdTuples == 1; # updated
750
751 if ($result->cmdTuples > 1) {
752 printf STDERR "Duplicate keys\n" unless ($quiet);
753 return(-2);
754 }
755
756 # no key - copy
757 push @CopyBuf, "$istring\n";
758 $CBufLen += length($istring);
759
760 if ($CBufLen >= $CBufMax) {
761 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
762 return($result) if $result;
763 @CopyBuf = ();
764 $CBufLen = 0;
765 }
766 }
767
768 if (! $ok) {
769 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
770 return(-2);
771 }
772
773 if ($CBufLen) {
774 print STDERR "@CopyBuf\n" if $debug;
775 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776 return($result) if $result;
777 }
778
779 return(0);
780 }
781
782 sub DoInsert
783 {
784 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
785
786 # only insert rows into tables that the slave wants
787 if (! defined($Stables{$tabname})) {
788 print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
789 while (<$inpf>) {
790 my $istring = $_;
791 $istring =~ s/\n//;
792 last if ($istring eq '\.');
793 }
794 return(0);
795 }
796
797 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
798
799 my @CopyBuf = ();
800 my $CBufLen = 0;
801 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
802
803 my $istring;
804 my $ok = 0;
805 while(<$inpf>) {
806 if ($_ !~ /\n$/) {
807 printf STDERR "Invalid format\n" unless ($quiet);
808 return(-2);
809 }
810 $istring = $_;
811 $istring =~ s/\n//;
812 if ($istring eq '\.') {
813 $ok = 1;
814 last;
815 }
816
817 # no key - copy
818 push @CopyBuf, "$istring\n";
819 $CBufLen += length($istring);
820
821 if ($CBufLen >= $CBufMax) {
822 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
823 return($result) if $result;
824 @CopyBuf = ();
825 $CBufLen = 0;
826 }
827 }
828
829 if (! $ok) {
830 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
831 return(-2);
832 }
833
834 if ($CBufLen) {
835 print STDERR "@CopyBuf\n" if $debug;
836 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
837 return($result) if $result;
838 }
839
840 return(0);
841 }
842
843
844 sub DoCopy {
845 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
846
847 my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
848 "FROM STDIN";
849 my $result = $sconn->exec($sql);
850 if ($result->resultStatus ne PGRES_COPY_IN) {
851 print STDERR $sconn->errorMessage unless ($quiet);
852 return(-1);
853 }
854
855 foreach my $str (@{$CBuf}) {
856 $sconn->putline($str);
857 }
858
859 $sconn->putline("\\.\n");
860
861 if ($sconn->endcopy) {
862 print STDERR $sconn->errorMessage unless ($quiet);
863 return(-1);
864 }
865
866 return(0);
867 }
868
869
870 #
871 # Returns last SyncID applied on Slave
872 #
873 sub GetSyncID {
874 my ($sconn) = @_; # (@_[0]);
875
876 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
877 if ($result->resultStatus ne PGRES_TUPLES_OK) {
878 print STDERR $sconn->errorMessage unless ($quiet);
879 return(-1);
880 }
881 my @row = $result->fetchrow;
882 print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
883 return(undef) unless defined $row[0]; # null
884 return($row[0]);
885 }
886
887 #
888 # Updates _RSERV_SYNC_ on Master with Slave SyncID
889 #
890 sub SyncSyncID {
891 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
892
893 my $result = $mconn->exec("BEGIN");
894 if ($result->resultStatus ne PGRES_COMMAND_OK) {
895 print STDERR $mconn->errorMessage unless ($quiet);
896 $mconn->exec("ROLLBACK");
897 return(-1);
898 }
899
900 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
901 " where server = $sserver AND syncid = $syncid" .
902 " for update");
903 if ($result->resultStatus ne PGRES_TUPLES_OK) {
904 print STDERR $mconn->errorMessage unless ($quiet);
905 $mconn->exec("ROLLBACK");
906 return(-1);
907 }
908 my @row = $result->fetchrow;
909 if (! defined $row[0]) {
910 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
911 $mconn->exec("ROLLBACK");
912 return(0);
913 }
914 if ($row[1] > 0) {
915 printf STDERR "SyncID $syncid for server ".
916 "$sserver already updated\n" unless ($quiet);
917 $mconn->exec("ROLLBACK");
918 return(0);
919 }
920 $result = $mconn->exec("update _RSERV_SYNC_" .
921 " set synctime = now(), status = 1" .
922 " where server = $sserver AND syncid = $syncid");
923 if ($result->resultStatus ne PGRES_COMMAND_OK) {
924 print STDERR $mconn->errorMessage unless ($quiet);
925 $mconn->exec("ROLLBACK");
926 return(-1);
927 }
928 $result = $mconn->exec("delete from _RSERV_SYNC_" .
929 " where server = $sserver AND syncid < $syncid");
930 if ($result->resultStatus ne PGRES_COMMAND_OK) {
931 print STDERR $mconn->errorMessage unless ($quiet);
932 $mconn->exec("ROLLBACK");
933 return(-1);
934 }
935
936 $result = $mconn->exec("COMMIT");
937 if ($result->resultStatus ne PGRES_COMMAND_OK) {
938 print STDERR $mconn->errorMessage unless ($quiet);
939 $mconn->exec("ROLLBACK");
940 return(-1);
941 }
942
943 return(1);
944 }
945
946 # stuff moved from perl scripts for better re-use
947
948 sub Rollback {
949 my $conn = shift @_;
950
951 print STDERR $conn->errorMessage unless ($quiet);
952 $conn->exec("ROLLBACK");
953 }
954
955 sub RollbackAndQuit {
956 my $conn = shift @_;
957
958 Rollback($conn);
959 exit (-1);
960 }
961
962 sub Connect {
963 my $info = shift @_;
964
965 print("Connecting to $info\n") if ($debug || $verbose);
966 my $conn = Pg::connectdb($info);
967 if ($conn->status != PGRES_CONNECTION_OK) {
968 die "Failed opening $info";
969 }
970 return $conn;
971 }
972
973 sub Exec {
974 my $conn = shift || die "Exec needs connection!";
975 my $sql = shift || die "Exec needs SQL statement!";
976 # used to return error code if no tuples are retured
977 my $return_code = shift;
978
979 if ($debug) {
980 # re-format SQL in one line (for nicer output)
981 $sql =~ s/[\s\n\r]+/ /gs;
982 print STDERR "Exec: $sql\n";
983 }
984 my $result = $conn->exec($sql);
985 if ($result->resultStatus eq PGRES_COMMAND_OK) {
986 return $result;
987 } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
988 print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
989 return $result;
990 } else {
991 if (defined($return_code)) {
992 print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
993 return($return_code);
994 } else {
995 RollbackAndQuit($conn)
996 }
997 }
998 }
999
1000 sub Exec2 {
1001 my $mconn = shift @_;
1002 my $sconn = shift @_;
1003 my $sql = shift @_;
1004
1005 my $result = $mconn->exec($sql);
1006 RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1007 $result = $sconn->exec($sql);
1008 RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1009 # XXX TODO: return results?!
1010 }
1011
1012 # exec sql query and return one row from it
1013 sub ExecFatch {
1014 my $conn = shift || die "ExecFatch need conn!";
1015 my $sql = shift || die "ExecFatch need SQL!";
1016
1017 if ($debug) {
1018 # re-format SQL in one line (for nicer output)
1019 $sql =~ s/[\s\n\r]+/ /gs;
1020 print STDERR "Exec: $sql\n";
1021 }
1022
1023 my $result = $conn->exec($sql);
1024 RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1025
1026 print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1027
1028 my @row = $result->fetchrow;
1029 print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1030 return @row;
1031 }
1032
1033 # exec sql query and dump all rows retured to STDERR (great for debugging)
1034 sub ExecDebug {
1035 return if (! $debug);
1036
1037 my $conn = shift || die "ExecDebug need conn!";
1038 my $sql = shift || die "ExecDebug need SQL!";
1039
1040 if ($debug) {
1041 # re-format SQL in one line (for nicer output)
1042 $sql =~ s/[\s\n\r]+/ /gs;
1043 print STDERR "Exec: $sql\n";
1044 }
1045
1046 my $result = $conn->exec($sql);
1047 RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1048
1049 print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1050
1051 while (my @row = $result->fetchrow) {
1052 print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1053 }
1054 return $result->ntuples;
1055 }
1056 sub MkInfo {
1057 my $db = shift || die "need database name!";
1058 my $host = shift;
1059 my $port = shift;
1060 my $user = shift;
1061 my $password = shift;
1062
1063 my $info = "dbname=$db";
1064 $info = "$info host=$host" if (defined($host));
1065 $info = "$info port=$port" if (defined($port));
1066 $info = "$info user=$user" if (defined($user));
1067 $info = "$info password=$password" if (defined($password));
1068
1069 return $info;
1070 }
1071
1072 1;

  ViewVC Help
Powered by ViewVC 1.1.26