19 |
|
|
20 |
sub GetSlaveId |
sub GetSlaveId |
21 |
{ |
{ |
22 |
my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
my ($mconn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]); |
23 |
|
|
24 |
my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE". |
25 |
" host='$slaveHost' AND dbase='$slaveDB'"); |
" host='$slaveHost' AND dbase='$slaveDB'"); |
26 |
|
|
27 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
28 |
{ |
{ |
29 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
30 |
return(-1); |
return(-1); |
31 |
} |
} |
32 |
|
|
43 |
|
|
44 |
sub PrepareSnapshot |
sub PrepareSnapshot |
45 |
{ |
{ |
46 |
my ($conn, $sconn, $outf, $server, $onlytables) = @_; |
my ($mconn, $sconn, $outf, $server, $onlytables) = @_; |
47 |
|
|
48 |
# first, we must know for wich tables the slave subscribed |
# first, we must know for wich tables the slave subscribed |
49 |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_"); |
50 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
51 |
{ |
{ |
52 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
53 |
return(-1); |
return(-1); |
54 |
} |
} |
55 |
|
|
58 |
$Stables{$row[0]} = 1; |
$Stables{$row[0]} = 1; |
59 |
} |
} |
60 |
|
|
61 |
$result = $conn->exec("BEGIN"); |
$result = $mconn->exec("BEGIN"); |
62 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
63 |
{ |
{ |
64 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
65 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
66 |
return(-1); |
return(-1); |
67 |
} |
} |
68 |
$result = $conn->exec("set transaction isolation level serializable"); |
$result = $mconn->exec("set transaction isolation level serializable"); |
69 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
70 |
{ |
{ |
71 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
72 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
73 |
return(-1); |
return(-1); |
74 |
} |
} |
75 |
|
|
76 |
# MAP oid --> tabname, keyname, key_type |
# MAP oid --> tabname, keyname, key_type |
77 |
$result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . |
$result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" . |
78 |
" from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . |
" from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . |
79 |
", pg_type pgt". |
", pg_type pgt". |
80 |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
81 |
" AND pga.attnum = rt.key AND pga.atttypid=pgt.oid"); |
" AND pga.attnum = rt.key AND pga.atttypid=pgt.oid"); |
82 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
83 |
{ |
{ |
84 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
85 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
86 |
return(-1); |
return(-1); |
87 |
} |
} |
88 |
|
|
103 |
|
|
104 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
105 |
|
|
106 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
107 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
108 |
{ |
{ |
109 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
110 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
111 |
return(-1); |
return(-1); |
112 |
} |
} |
113 |
|
|
129 |
|
|
130 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
131 |
|
|
132 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
133 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
134 |
{ |
{ |
135 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
136 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
137 |
return(-1); |
return(-1); |
138 |
} |
} |
139 |
|
|
147 |
{ |
{ |
148 |
if ($lastoid eq '') |
if ($lastoid eq '') |
149 |
{ |
{ |
150 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
151 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
152 |
$havedeal = 1; |
$havedeal = 1; |
153 |
} |
} |
161 |
if (! defined $row[1]) |
if (! defined $row[1]) |
162 |
{ |
{ |
163 |
print STDERR "NULL key\n" unless ($quiet); |
print STDERR "NULL key\n" unless ($quiet); |
164 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
165 |
return(-2); |
return(-2); |
166 |
} |
} |
167 |
printf $outf "%s\n", OutputValue($row[1]); |
printf $outf "%s\n", OutputValue($row[1]); |
184 |
|
|
185 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
186 |
|
|
187 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
188 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
189 |
{ |
{ |
190 |
printf $outf "-- ERROR\n" if $havedeal; |
printf $outf "-- ERROR\n" if $havedeal; |
191 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
192 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
193 |
return(-1); |
return(-1); |
194 |
} |
} |
195 |
next if $result->ntuples <= 0; |
next if $result->ntuples <= 0; |
196 |
if (! $havedeal) |
if (! $havedeal) |
197 |
{ |
{ |
198 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
199 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
200 |
$havedeal = 1; |
$havedeal = 1; |
201 |
} |
} |
232 |
|
|
233 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
234 |
|
|
235 |
$result = $conn->exec($sql); |
$result = $mconn->exec($sql); |
236 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
237 |
{ |
{ |
238 |
printf $outf "-- ERROR\n" if $havedeal; |
printf $outf "-- ERROR\n" if $havedeal; |
239 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
240 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
241 |
return(-1); |
return(-1); |
242 |
} |
} |
243 |
next if $result->ntuples <= 0; |
next if $result->ntuples <= 0; |
244 |
if (! $havedeal) |
if (! $havedeal) |
245 |
{ |
{ |
246 |
my $syncid = GetSYNCID($conn, $outf); |
my $syncid = GetSYNCID($mconn, $outf); |
247 |
return($syncid) if $syncid < 0; |
return($syncid) if $syncid < 0; |
248 |
$havedeal = 1; |
$havedeal = 1; |
249 |
} |
} |
268 |
|
|
269 |
unless ($havedeal) |
unless ($havedeal) |
270 |
{ |
{ |
271 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
272 |
return(0); |
return(0); |
273 |
} |
} |
274 |
|
|
275 |
# Remember this snapshot info |
# Remember this snapshot info |
276 |
$result = $conn->exec("select _rserv_sync_($server)"); |
$result = $mconn->exec("select _rserv_sync_($server)"); |
277 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
278 |
{ |
{ |
279 |
printf $outf "-- ERROR\n"; |
printf $outf "-- ERROR\n"; |
280 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
281 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
282 |
return(-1); |
return(-1); |
283 |
} |
} |
284 |
|
|
285 |
$result = $conn->exec("COMMIT"); |
$result = $mconn->exec("COMMIT"); |
286 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
287 |
{ |
{ |
288 |
printf $outf "-- ERROR\n"; |
printf $outf "-- ERROR\n"; |
289 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
290 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
291 |
return(-1); |
return(-1); |
292 |
} |
} |
293 |
printf $outf "-- OK\n"; |
printf $outf "-- OK\n"; |
408 |
|
|
409 |
sub ApplySnapshot |
sub ApplySnapshot |
410 |
{ |
{ |
411 |
my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]); |
my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]); |
412 |
|
|
413 |
my $result = $conn->exec("BEGIN"); |
my $result = $sconn->exec("BEGIN"); |
414 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
415 |
{ |
{ |
416 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
417 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
418 |
return(-1); |
return(-1); |
419 |
} |
} |
420 |
|
|
421 |
$result = $conn->exec("SET CONSTRAINTS ALL DEFERRED"); |
$result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED"); |
422 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
423 |
{ |
{ |
424 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
425 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
426 |
return(-1); |
return(-1); |
427 |
} |
} |
428 |
|
|
432 |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
" where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" . |
433 |
" AND pga.attnum = rt.key"; |
" AND pga.attnum = rt.key"; |
434 |
|
|
435 |
$result = $conn->exec($sql); |
$result = $sconn->exec($sql); |
436 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
437 |
{ |
{ |
438 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
439 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
440 |
return(-1); |
return(-1); |
441 |
} |
} |
442 |
%Stables = (); |
%Stables = (); |
459 |
if ($cmt ne '--') |
if ($cmt ne '--') |
460 |
{ |
{ |
461 |
printf STDERR "Invalid format\n" unless ($quiet); |
printf STDERR "Invalid format\n" unless ($quiet); |
462 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
463 |
return(-2); |
return(-2); |
464 |
} |
} |
465 |
if ($cmd eq 'DELETE') |
if ($cmd eq 'DELETE') |
467 |
if ($syncid eq '') |
if ($syncid eq '') |
468 |
{ |
{ |
469 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
470 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
471 |
return(-2); |
return(-2); |
472 |
} |
} |
473 |
$result = DoDelete($conn, $inpf, $prm); |
$result = DoDelete($sconn, $inpf, $prm); |
474 |
if ($result) |
if ($result) |
475 |
{ |
{ |
476 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
477 |
return($result); |
return($result); |
478 |
} |
} |
479 |
} |
} |
482 |
if ($syncid eq '') |
if ($syncid eq '') |
483 |
{ |
{ |
484 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
485 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
486 |
return(-2); |
return(-2); |
487 |
} |
} |
488 |
$result = DoInsert($conn, $inpf, $prm); |
$result = DoInsert($sconn, $inpf, $prm); |
489 |
if ($result) |
if ($result) |
490 |
{ |
{ |
491 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
492 |
return($result); |
return($result); |
493 |
} |
} |
494 |
} |
} |
497 |
if ($syncid eq '') |
if ($syncid eq '') |
498 |
{ |
{ |
499 |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
printf STDERR "Sync ID unspecified\n" unless ($quiet); |
500 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
501 |
return(-2); |
return(-2); |
502 |
} |
} |
503 |
$result = DoUpdate($conn, $inpf, $prm); |
$result = DoUpdate($sconn, $inpf, $prm); |
504 |
if ($result) |
if ($result) |
505 |
{ |
{ |
506 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
507 |
return($result); |
return($result); |
508 |
} |
} |
509 |
} |
} |
512 |
if ($syncid ne '') |
if ($syncid ne '') |
513 |
{ |
{ |
514 |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
printf STDERR "Second Sync ID ?!\n" unless ($quiet); |
515 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
516 |
return(-2); |
return(-2); |
517 |
} |
} |
518 |
if ($prm !~ /^\d+$/) |
if ($prm !~ /^\d+$/) |
519 |
{ |
{ |
520 |
printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); |
printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); |
521 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
522 |
return(-2); |
return(-2); |
523 |
} |
} |
524 |
$syncid = $prm; |
$syncid = $prm; |
525 |
|
|
526 |
printf STDERR "Sync ID $syncid\n" unless ($quiet); |
printf STDERR "Sync ID $syncid\n" unless ($quiet); |
527 |
|
|
528 |
$result = $conn->exec("select syncid, synctime from " . |
$result = $sconn->exec("select syncid, synctime from " . |
529 |
"_RSERV_SLAVE_SYNC_ where syncid = " . |
"_RSERV_SLAVE_SYNC_ where syncid = " . |
530 |
"(select max(syncid) from _RSERV_SLAVE_SYNC_)"); |
"(select max(syncid) from _RSERV_SLAVE_SYNC_)"); |
531 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
532 |
{ |
{ |
533 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
534 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
535 |
return(-1); |
return(-1); |
536 |
} |
} |
537 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
538 |
if (! defined $row[0]) |
if (! defined $row[0]) |
539 |
{ |
{ |
540 |
$result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ". |
$result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ". |
541 |
"(syncid, synctime) values ($syncid, now())"); |
"(syncid, synctime) values ($syncid, now())"); |
542 |
} |
} |
543 |
elsif ($row[0] >= $prm) |
elsif ($row[0] >= $prm) |
544 |
{ |
{ |
545 |
printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); |
printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); |
546 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
547 |
return(0); |
return(0); |
548 |
} |
} |
549 |
else |
else |
550 |
{ |
{ |
551 |
$result = $conn->exec("update _RSERV_SLAVE_SYNC_" . |
$result = $sconn->exec("update _RSERV_SLAVE_SYNC_" . |
552 |
" set syncid = $syncid, synctime = now()"); |
" set syncid = $syncid, synctime = now()"); |
553 |
} |
} |
554 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
555 |
{ |
{ |
556 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
557 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
558 |
return(-1); |
return(-1); |
559 |
} |
} |
560 |
} |
} |
566 |
elsif ($cmd eq 'ERROR') |
elsif ($cmd eq 'ERROR') |
567 |
{ |
{ |
568 |
printf STDERR "ERROR signaled\n" unless ($quiet); |
printf STDERR "ERROR signaled\n" unless ($quiet); |
569 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
570 |
return(-2); |
return(-2); |
571 |
} |
} |
572 |
else |
else |
573 |
{ |
{ |
574 |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
printf STDERR "Unknown command $cmd\n" unless ($quiet); |
575 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
576 |
return(-2); |
return(-2); |
577 |
} |
} |
578 |
} |
} |
580 |
if (! $ok) |
if (! $ok) |
581 |
{ |
{ |
582 |
printf STDERR "No OK flag in input\n" unless ($quiet); |
printf STDERR "No OK flag in input\n" unless ($quiet); |
583 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
584 |
return(-2); |
return(-2); |
585 |
} |
} |
586 |
|
|
587 |
$result = $conn->exec("COMMIT"); |
$result = $sconn->exec("COMMIT"); |
588 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
589 |
{ |
{ |
590 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
591 |
$conn->exec("ROLLBACK"); |
$sconn->exec("ROLLBACK"); |
592 |
return(-1); |
return(-1); |
593 |
} |
} |
594 |
|
|
597 |
|
|
598 |
sub DoDelete |
sub DoDelete |
599 |
{ |
{ |
600 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
601 |
|
|
602 |
# only delete tables that the slave wants |
# only delete tables that the slave wants |
603 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
631 |
|
|
632 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
633 |
|
|
634 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
635 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
636 |
{ |
{ |
637 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
638 |
return(-1); |
return(-1); |
639 |
} |
} |
640 |
} |
} |
651 |
|
|
652 |
sub DoUpdate |
sub DoUpdate |
653 |
{ |
{ |
654 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
655 |
|
|
656 |
# only update the tables that the slave wants |
# only update the tables that the slave wants |
657 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
673 |
my $sql = "select attnum, attname from pg_attribute" . |
my $sql = "select attnum, attname from pg_attribute" . |
674 |
" where attrelid = $Stables{$tabname}->[0] AND attnum > 0"; |
" where attrelid = $Stables{$tabname}->[0] AND attnum > 0"; |
675 |
|
|
676 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
677 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
678 |
{ |
{ |
679 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
680 |
return(-1); |
return(-1); |
681 |
} |
} |
682 |
|
|
751 |
|
|
752 |
printf "$sql\n" if $debug; |
printf "$sql\n" if $debug; |
753 |
|
|
754 |
$result = $conn->exec($sql); |
$result = $sconn->exec($sql); |
755 |
|
|
756 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
757 |
{ |
{ |
758 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
759 |
return(-1); |
return(-1); |
760 |
} |
} |
761 |
next if $result->cmdTuples == 1; # updated |
next if $result->cmdTuples == 1; # updated |
772 |
|
|
773 |
if ($CBufLen >= $CBufMax) |
if ($CBufLen >= $CBufMax) |
774 |
{ |
{ |
775 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
$result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
776 |
return($result) if $result; |
return($result) if $result; |
777 |
@CopyBuf = (); |
@CopyBuf = (); |
778 |
$CBufLen = 0; |
$CBufLen = 0; |
788 |
if ($CBufLen) |
if ($CBufLen) |
789 |
{ |
{ |
790 |
print "@CopyBuf\n" if $debug; |
print "@CopyBuf\n" if $debug; |
791 |
$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
$result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
792 |
return($result) if $result; |
return($result) if $result; |
793 |
} |
} |
794 |
|
|
797 |
|
|
798 |
sub DoInsert |
sub DoInsert |
799 |
{ |
{ |
800 |
my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); |
801 |
|
|
802 |
# only insert rows into tables that the slave wants |
# only insert rows into tables that the slave wants |
803 |
if (! defined($Stables{$tabname})) { |
if (! defined($Stables{$tabname})) { |
839 |
|
|
840 |
if ($CBufLen >= $CBufMax) |
if ($CBufLen >= $CBufMax) |
841 |
{ |
{ |
842 |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
843 |
return($result) if $result; |
return($result) if $result; |
844 |
@CopyBuf = (); |
@CopyBuf = (); |
845 |
$CBufLen = 0; |
$CBufLen = 0; |
855 |
if ($CBufLen) |
if ($CBufLen) |
856 |
{ |
{ |
857 |
print "@CopyBuf\n" if $debug; |
print "@CopyBuf\n" if $debug; |
858 |
my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); |
my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf); |
859 |
return($result) if $result; |
return($result) if $result; |
860 |
} |
} |
861 |
|
|
865 |
|
|
866 |
sub DoCopy |
sub DoCopy |
867 |
{ |
{ |
868 |
my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); |
my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); |
869 |
|
|
870 |
my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . |
my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') . |
871 |
"FROM STDIN"; |
"FROM STDIN"; |
872 |
my $result = $conn->exec($sql); |
my $result = $sconn->exec($sql); |
873 |
if ($result->resultStatus ne PGRES_COPY_IN) |
if ($result->resultStatus ne PGRES_COPY_IN) |
874 |
{ |
{ |
875 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
876 |
return(-1); |
return(-1); |
877 |
} |
} |
878 |
|
|
879 |
foreach my $str (@{$CBuf}) |
foreach my $str (@{$CBuf}) |
880 |
{ |
{ |
881 |
$conn->putline($str); |
$sconn->putline($str); |
882 |
} |
} |
883 |
|
|
884 |
$conn->putline("\\.\n"); |
$sconn->putline("\\.\n"); |
885 |
|
|
886 |
if ($conn->endcopy) |
if ($sconn->endcopy) |
887 |
{ |
{ |
888 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
889 |
return(-1); |
return(-1); |
890 |
} |
} |
891 |
|
|
898 |
# |
# |
899 |
sub GetSyncID |
sub GetSyncID |
900 |
{ |
{ |
901 |
my ($conn) = @_; # (@_[0]); |
my ($sconn) = @_; # (@_[0]); |
902 |
|
|
903 |
my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); |
my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); |
904 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
905 |
{ |
{ |
906 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $sconn->errorMessage unless ($quiet); |
907 |
return(-1); |
return(-1); |
908 |
} |
} |
909 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
916 |
# |
# |
917 |
sub SyncSyncID |
sub SyncSyncID |
918 |
{ |
{ |
919 |
my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); |
my ($mconn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); |
920 |
|
|
921 |
my $result = $conn->exec("BEGIN"); |
my $result = $mconn->exec("BEGIN"); |
922 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
923 |
{ |
{ |
924 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
925 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
926 |
return(-1); |
return(-1); |
927 |
} |
} |
928 |
|
|
929 |
$result = $conn->exec("select synctime, status from _RSERV_SYNC_" . |
$result = $mconn->exec("select synctime, status from _RSERV_SYNC_" . |
930 |
" where server = $server AND syncid = $syncid" . |
" where server = $server AND syncid = $syncid" . |
931 |
" for update"); |
" for update"); |
932 |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
if ($result->resultStatus ne PGRES_TUPLES_OK) |
933 |
{ |
{ |
934 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
935 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
936 |
return(-1); |
return(-1); |
937 |
} |
} |
938 |
my @row = $result->fetchrow; |
my @row = $result->fetchrow; |
939 |
if (! defined $row[0]) |
if (! defined $row[0]) |
940 |
{ |
{ |
941 |
printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); |
printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); |
942 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
943 |
return(0); |
return(0); |
944 |
} |
} |
945 |
if ($row[1] > 0) |
if ($row[1] > 0) |
946 |
{ |
{ |
947 |
printf STDERR "SyncID $syncid for server ". |
printf STDERR "SyncID $syncid for server ". |
948 |
"$server already updated\n" unless ($quiet); |
"$server already updated\n" unless ($quiet); |
949 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
950 |
return(0); |
return(0); |
951 |
} |
} |
952 |
$result = $conn->exec("update _RSERV_SYNC_" . |
$result = $mconn->exec("update _RSERV_SYNC_" . |
953 |
" set synctime = now(), status = 1" . |
" set synctime = now(), status = 1" . |
954 |
" where server = $server AND syncid = $syncid"); |
" where server = $server AND syncid = $syncid"); |
955 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
956 |
{ |
{ |
957 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
958 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
959 |
return(-1); |
return(-1); |
960 |
} |
} |
961 |
$result = $conn->exec("delete from _RSERV_SYNC_" . |
$result = $mconn->exec("delete from _RSERV_SYNC_" . |
962 |
" where server = $server AND syncid < $syncid"); |
" where server = $server AND syncid < $syncid"); |
963 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
964 |
{ |
{ |
965 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
966 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
967 |
return(-1); |
return(-1); |
968 |
} |
} |
969 |
|
|
970 |
$result = $conn->exec("COMMIT"); |
$result = $mconn->exec("COMMIT"); |
971 |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
if ($result->resultStatus ne PGRES_COMMAND_OK) |
972 |
{ |
{ |
973 |
print STDERR $conn->errorMessage unless ($quiet); |
print STDERR $mconn->errorMessage unless ($quiet); |
974 |
$conn->exec("ROLLBACK"); |
$mconn->exec("ROLLBACK"); |
975 |
return(-1); |
return(-1); |
976 |
} |
} |
977 |
|
|