/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1.1.1 - (show annotations) (vendor branch)
Wed Dec 20 17:22:35 2000 UTC (23 years, 5 months ago) by dpavlin
Branch: DbP
CVS Tags: rserv_0_1, r0, debian
Changes since 1.1: +0 -0 lines
import of rserv 0.1 distributed in directories

1 # -*- perl -*-
2 # RServ.pm
3 # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4
5 package RServ;
6
7 require Exporter;
8 @ISA = qw(Exporter);
9 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);
10 @EXPORT_OK = qw();
11
12 use Pg;
13
14 $debug = 0;
15 $quiet = 1;
16
17 my %Mtables = ();
18 my %Stables = ();
19
20 sub PrepareSnapshot
21 {
22 my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);
23
24 my $result = $conn->exec("BEGIN");
25 if ($result->resultStatus ne PGRES_COMMAND_OK)
26 {
27 print STDERR $conn->errorMessage unless ($quiet);
28 $conn->exec("ROLLBACK");
29 return(-1);
30 }
31 $result = $conn->exec("set transaction isolation level serializable");
32 if ($result->resultStatus ne PGRES_COMMAND_OK)
33 {
34 print STDERR $conn->errorMessage unless ($quiet);
35 $conn->exec("ROLLBACK");
36 return(-1);
37 }
38
39 # MAP oid --> tabname, keyname
40 $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .
41 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
42 " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
43 " and pga.attnum = rt.key");
44 if ($result->resultStatus ne PGRES_TUPLES_OK)
45 {
46 print STDERR $conn->errorMessage unless ($quiet);
47 $conn->exec("ROLLBACK");
48 return(-1);
49 }
50
51 my @row;
52 while (@row = $result->fetchrow)
53 {
54 # printf "$row[0], $row[1], $row[2]\n";
55 push @{$Mtables{$row[0]}}, $row[1], $row[2];
56 }
57
58 # Read last succeeded sync
59 $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
60 " where server = $server and syncid = (select max(syncid) from" .
61 " _RSERV_SYNC_ where server = $server and status > 0)";
62
63 printf "$sql\n" if $debug;
64
65 $result = $conn->exec($sql);
66 if ($result->resultStatus ne PGRES_TUPLES_OK)
67 {
68 print STDERR $conn->errorMessage unless ($quiet);
69 $conn->exec("ROLLBACK");
70 return(-1);
71 }
72
73 my @lastsync = $result->fetchrow;
74
75 my $sinfo = "";
76 if ($lastsync[3] ne '') # sync info
77 {
78 $sinfo = "and (l.logid >= $lastsync[3]";
79 $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
80 $sinfo .= ")";
81 }
82
83 my $havedeal = 0;
84
85 # DELETED rows
86 $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
87 " where l.deleted = 1 $sinfo order by l.reloid";
88
89 printf "$sql\n" if $debug;
90
91 $result = $conn->exec($sql);
92 if ($result->resultStatus ne PGRES_TUPLES_OK)
93 {
94 print STDERR $conn->errorMessage unless ($quiet);
95 $conn->exec("ROLLBACK");
96 return(-1);
97 }
98
99 $lastoid = '';
100 while (@row = $result->fetchrow)
101 {
102 next unless exists $Mtables{$row[0]};
103 if ($lastoid != $row[0])
104 {
105 if ($lastoid eq '')
106 {
107 my $syncid = GetSYNCID($conn, $outf);
108 return($syncid) if $syncid < 0;
109 $havedeal = 1;
110 }
111 else
112 {
113 printf $outf "\\.\n";
114 }
115 printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
116 $lastoid = $row[0];
117 }
118 if (! defined $row[1])
119 {
120 print STDERR "NULL key\n" unless ($quiet);
121 $conn->exec("ROLLBACK");
122 return(-2);
123 }
124 printf $outf "%s\n", OutputValue($row[1]);
125 }
126 printf $outf "\\.\n" if $lastoid ne '';
127
128 # UPDATED rows
129
130 my ($taboid, $tabname, $tabkey);
131 foreach $taboid (keys %Mtables)
132 {
133 ($tabname, $tabkey) = @{$Mtables{$taboid}};
134 my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
135 $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .
136 " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .
137 " and l.key = _$tabname.${tabkey}::text";
138
139 printf "$sql\n" if $debug;
140
141 $result = $conn->exec($sql);
142 if ($result->resultStatus ne PGRES_TUPLES_OK)
143 {
144 printf $outf "-- ERROR\n" if $havedeal;
145 print STDERR $conn->errorMessage unless ($quiet);
146 $conn->exec("ROLLBACK");
147 return(-1);
148 }
149 next if $result->ntuples <= 0;
150 if (! $havedeal)
151 {
152 my $syncid = GetSYNCID($conn, $outf);
153 return($syncid) if $syncid < 0;
154 $havedeal = 1;
155 }
156 printf $outf "-- UPDATE $tabname\n";
157 while (@row = $result->fetchrow)
158 {
159 for ($i = 0; $i <= $#row; $i++)
160 {
161 printf $outf " " if $i;
162 printf $outf "%s", OutputValue($row[$i]);
163 }
164 printf $outf "\n";
165 }
166 printf $outf "\\.\n";
167 }
168
169 unless ($havedeal)
170 {
171 $conn->exec("ROLLBACK");
172 return(0);
173 }
174
175 # Remember this snapshot info
176 $result = $conn->exec("select _rserv_sync_($server)");
177 if ($result->resultStatus ne PGRES_TUPLES_OK)
178 {
179 printf $outf "-- ERROR\n";
180 print STDERR $conn->errorMessage unless ($quiet);
181 $conn->exec("ROLLBACK");
182 return(-1);
183 }
184
185 $result = $conn->exec("COMMIT");
186 if ($result->resultStatus ne PGRES_COMMAND_OK)
187 {
188 printf $outf "-- ERROR\n";
189 print STDERR $conn->errorMessage unless ($quiet);
190 $conn->exec("ROLLBACK");
191 return(-1);
192 }
193 printf $outf "-- OK\n";
194
195 return(1);
196
197 }
198
199 sub OutputValue
200 {
201 my ($val) = @_; # @_[0];
202
203 return("\\N") unless defined $val;
204
205 $val =~ s/\\/\\\\/g;
206 $val =~ s/ /\\011/g;
207 $val =~ s/\n/\\012/g;
208 $val =~ s/\'/\\047/g;
209
210 return($val);
211 }
212
213 # Get syncid for new snapshot
214 sub GetSYNCID
215 {
216 my ($conn, $outf) = @_; # (@_[0], @_[1]);
217
218 my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
219 if ($result->resultStatus ne PGRES_TUPLES_OK)
220 {
221 print STDERR $conn->errorMessage unless ($quiet);
222 $conn->exec("ROLLBACK");
223 return(-1);
224 }
225
226 my @row = $result->fetchrow;
227
228 printf $outf "-- SYNCID $row[0]\n";
229 return($row[0]);
230 }
231
232
233 sub CleanLog
234 {
235 my ($conn, $howold) = @_; # (@_[0], @_[1]);
236
237 my $result = $conn->exec("BEGIN");
238 if ($result->resultStatus ne PGRES_COMMAND_OK)
239 {
240 print STDERR $conn->errorMessage unless ($quiet);
241 $conn->exec("ROLLBACK");
242 return(-1);
243 }
244
245 my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
246 " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
247 " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";
248
249 printf "$sql\n" if $debug;
250
251 $result = $conn->exec($sql);
252 if ($result->resultStatus ne PGRES_TUPLES_OK)
253 {
254 print STDERR $conn->errorMessage unless ($quiet);
255 return(-1);
256 }
257 my $maxid = '';
258 my %active = ();
259 while (my @row = $result->fetchrow)
260 {
261 $maxid = $row[0] if $maxid eq '';
262 last if $row[0] > $maxid;
263 my @ids = split(/[ ]+,[ ]+/, $row[1]);
264 foreach $aid (@ids)
265 {
266 $active{$aid} = 1 unless exists $active{$aid};
267 }
268 }
269 if ($maxid eq '')
270 {
271 print STDERR "No Sync IDs\n" unless ($quiet);
272 return(0);
273 }
274 my $alist = join(',', keys %active);
275 my $sinfo = "logid < $maxid";
276 $sinfo .= " and logid not in ($alist)" if $alist ne '';
277
278 $sql = "delete from _RSERV_LOG_ where " .
279 "logtime < now() - '$howold second'::interval and $sinfo";
280
281 printf "$sql\n" if $debug;
282
283 $result = $conn->exec($sql);
284 if ($result->resultStatus ne PGRES_COMMAND_OK)
285 {
286 print STDERR $conn->errorMessage unless ($quiet);
287 $conn->exec("ROLLBACK");
288 return(-1);
289 }
290 $maxid = $result->cmdTuples;
291
292 $result = $conn->exec("COMMIT");
293 if ($result->resultStatus ne PGRES_COMMAND_OK)
294 {
295 print STDERR $conn->errorMessage unless ($quiet);
296 $conn->exec("ROLLBACK");
297 return(-1);
298 }
299
300 return($maxid);
301 }
302
303 sub ApplySnapshot
304 {
305 my ($conn, $inpf) = @_; # (@_[0], @_[1]);
306
307 my $result = $conn->exec("BEGIN");
308 if ($result->resultStatus ne PGRES_COMMAND_OK)
309 {
310 print STDERR $conn->errorMessage unless ($quiet);
311 $conn->exec("ROLLBACK");
312 return(-1);
313 }
314
315 $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
316 if ($result->resultStatus ne PGRES_COMMAND_OK)
317 {
318 print STDERR $conn->errorMessage unless ($quiet);
319 $conn->exec("ROLLBACK");
320 return(-1);
321 }
322
323 # MAP name --> oid, keyname, keynum
324 my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
325 " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
326 " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
327 " and pga.attnum = rt.key";
328 $result = $conn->exec($sql);
329 if ($result->resultStatus ne PGRES_TUPLES_OK)
330 {
331 print STDERR $conn->errorMessage unless ($quiet);
332 $conn->exec("ROLLBACK");
333 return(-1);
334 }
335
336 while (@row = $result->fetchrow)
337 {
338 # printf " %s %s\n", $row[1], $row[0];
339 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
340 }
341
342 my $ok = 0;
343 my $syncid = '';
344 while(<$inpf>)
345 {
346 $_ =~ s/\n//;
347 my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
348 if ($cmt ne '--')
349 {
350 printf STDERR "Invalid format\n" unless ($quiet);
351 $conn->exec("ROLLBACK");
352 return(-2);
353 }
354 if ($cmd eq 'DELETE')
355 {
356 if ($syncid eq '')
357 {
358 printf STDERR "Sync ID unspecified\n" unless ($quiet);
359 $conn->exec("ROLLBACK");
360 return(-2);
361 }
362 $result = DoDelete($conn, $inpf, $prm);
363 if ($result)
364 {
365 $conn->exec("ROLLBACK");
366 return($result);
367 }
368 }
369 elsif ($cmd eq 'UPDATE')
370 {
371 if ($syncid eq '')
372 {
373 printf STDERR "Sync ID unspecified\n" unless ($quiet);
374 $conn->exec("ROLLBACK");
375 return(-2);
376 }
377 $result = DoUpdate($conn, $inpf, $prm);
378 if ($result)
379 {
380 $conn->exec("ROLLBACK");
381 return($result);
382 }
383 }
384 elsif ($cmd eq 'SYNCID')
385 {
386 if ($syncid ne '')
387 {
388 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
389 $conn->exec("ROLLBACK");
390 return(-2);
391 }
392 if ($prm !~ /^\d+$/)
393 {
394 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
395 $conn->exec("ROLLBACK");
396 return(-2);
397 }
398 $syncid = $prm;
399
400 printf STDERR "Sync ID $syncid\n" unless ($quiet);
401
402 $result = $conn->exec("select syncid, synctime from " .
403 "_RSERV_SLAVE_SYNC_ where syncid = " .
404 "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
405 if ($result->resultStatus ne PGRES_TUPLES_OK)
406 {
407 print STDERR $conn->errorMessage unless ($quiet);
408 $conn->exec("ROLLBACK");
409 return(-1);
410 }
411 my @row = $result->fetchrow;
412 if (! defined $row[0])
413 {
414 $result = $conn->exec("insert into" .
415 " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");
416 }
417 elsif ($row[0] >= $prm)
418 {
419 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
420 $conn->exec("ROLLBACK");
421 return(0);
422 }
423 else
424 {
425 $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
426 " set syncid = $syncid, synctime = now()");
427 }
428 if ($result->resultStatus ne PGRES_COMMAND_OK)
429 {
430 print STDERR $conn->errorMessage unless ($quiet);
431 $conn->exec("ROLLBACK");
432 return(-1);
433 }
434 }
435 elsif ($cmd eq 'OK')
436 {
437 $ok = 1;
438 last;
439 }
440 elsif ($cmd eq 'ERROR')
441 {
442 printf STDERR "ERROR signaled\n" unless ($quiet);
443 $conn->exec("ROLLBACK");
444 return(-2);
445 }
446 else
447 {
448 printf STDERR "Unknown command $cmd\n" unless ($quiet);
449 $conn->exec("ROLLBACK");
450 return(-2);
451 }
452 }
453
454 if (! $ok)
455 {
456 printf STDERR "No OK flag in input\n" unless ($quiet);
457 $conn->exec("ROLLBACK");
458 return(-2);
459 }
460
461 $result = $conn->exec("COMMIT");
462 if ($result->resultStatus ne PGRES_COMMAND_OK)
463 {
464 print STDERR $conn->errorMessage unless ($quiet);
465 $conn->exec("ROLLBACK");
466 return(-1);
467 }
468
469 return(1);
470 }
471
472 sub DoDelete
473 {
474 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
475
476 my $ok = 0;
477 while(<$inpf>)
478 {
479 if ($_ !~ /\n$/)
480 {
481 printf STDERR "Invalid format\n" unless ($quiet);
482 return(-2);
483 }
484 my $key = $_;
485 $key =~ s/\n//;
486 if ($key eq '\.')
487 {
488 $ok = 1;
489 last;
490 }
491
492 my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";
493
494 printf "$sql\n" if $debug;
495
496 my $result = $conn->exec($sql);
497 if ($result->resultStatus ne PGRES_COMMAND_OK)
498 {
499 print STDERR $conn->errorMessage unless ($quiet);
500 return(-1);
501 }
502 }
503
504 if (! $ok)
505 {
506 printf STDERR "No end of input in DELETE section\n" unless ($quiet);
507 return(-2);
508 }
509
510 return(0);
511 }
512
513
514 sub DoUpdate
515 {
516 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
517 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
518
519 my @CopyBuf = ();
520 my $CBufLen = 0;
521 my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
522
523 my $sql = "select attnum, attname from pg_attribute" .
524 " where attrelid = $Stables{$tabname}->[0] and attnum > 0";
525
526 my $result = $conn->exec($sql);
527 if ($result->resultStatus ne PGRES_TUPLES_OK)
528 {
529 print STDERR $conn->errorMessage unless ($quiet);
530 return(-1);
531 }
532
533 my @anames = ();
534 while (@row = $result->fetchrow)
535 {
536 $anames[$row[0]] = $row[1];
537 }
538
539 my $istring;
540 my $ok = 0;
541 while(<$inpf>)
542 {
543 if ($_ !~ /\n$/)
544 {
545 printf STDERR "Invalid format\n" unless ($quiet);
546 return(-2);
547 }
548 $istring = $_;
549 $istring =~ s/\n//;
550 if ($istring eq '\.')
551 {
552 $ok = 1;
553 last;
554 }
555 my @vals = split(/ /, $istring);
556 if ($oidkey)
557 {
558 if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
559 {
560 printf STDERR "Invalid OID\n" unless ($quiet);
561 return(-2);
562 }
563 $oidkey = $vals[0];
564 }
565 else
566 {
567 unshift @vals, '';
568 }
569
570 $sql = "update $tabname set ";
571 my $ocnt = 0;
572 for (my $i = 1; $i <= $#anames; $i++)
573 {
574 if ($vals[$i] eq '\N')
575 {
576 if ($i == $Stables{$tabname}->[2])
577 {
578 printf STDERR "NULL key\n" unless ($quiet);
579 return(-2);
580 }
581 $vals[$i] = 'null';
582 }
583 else
584 {
585 $vals[$i] = "'" . $vals[$i] . "'";
586 next if $i == $Stables{$tabname}->[2];
587 }
588 $ocnt++;
589 $sql .= ', ' if $ocnt > 1;
590 $sql .= "$anames[$i] = $vals[$i]";
591 }
592 if ($oidkey)
593 {
594 $sql .= " where $Stables{$tabname}->[1] = $oidkey";
595 }
596 else
597 {
598 $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";
599 }
600
601 printf "$sql\n" if $debug;
602
603 $result = $conn->exec($sql);
604 if ($result->resultStatus ne PGRES_COMMAND_OK)
605 {
606 print STDERR $conn->errorMessage unless ($quiet);
607 return(-1);
608 }
609 next if $result->cmdTuples == 1; # updated
610
611 if ($result->cmdTuples > 1)
612 {
613 printf STDERR "Duplicate keys\n" unless ($quiet);
614 return(-2);
615 }
616
617 # no key - copy
618 push @CopyBuf, "$istring\n";
619 $CBufLen += length($istring);
620
621 if ($CBufLen >= $CBufMax)
622 {
623 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
624 return($result) if $result;
625 @CopyBuf = ();
626 $CBufLen = 0;
627 }
628 }
629
630 if (! $ok)
631 {
632 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
633 return(-2);
634 }
635
636 if ($CBufLen)
637 {
638 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
639 return($result) if $result;
640 }
641
642 return(0);
643 }
644
645
646 sub DoCopy
647 {
648 my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
649
650 my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .
651 "FROM STDIN";
652 my $result = $conn->exec($sql);
653 if ($result->resultStatus ne PGRES_COPY_IN)
654 {
655 print STDERR $conn->errorMessage unless ($quiet);
656 return(-1);
657 }
658
659 foreach $str (@{$CBuf})
660 {
661 $conn->putline($str);
662 }
663
664 $conn->putline("\\.\n");
665
666 if ($conn->endcopy)
667 {
668 print STDERR $conn->errorMessage unless ($quiet);
669 return(-1);
670 }
671
672 return(0);
673
674 }
675
676
677 #
678 # Returns last SyncID applied on Slave
679 #
680 sub GetSyncID
681 {
682 my ($conn) = @_; # (@_[0]);
683
684 my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
685 if ($result->resultStatus ne PGRES_TUPLES_OK)
686 {
687 print STDERR $conn->errorMessage unless ($quiet);
688 return(-1);
689 }
690 my @row = $result->fetchrow;
691 return(undef) unless defined $row[0]; # null
692 return($row[0]);
693 }
694
695 #
696 # Updates _RSERV_SYNC_ on Master with Slave SyncID
697 #
698 sub SyncSyncID
699 {
700 my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
701
702 my $result = $conn->exec("BEGIN");
703 if ($result->resultStatus ne PGRES_COMMAND_OK)
704 {
705 print STDERR $conn->errorMessage unless ($quiet);
706 $conn->exec("ROLLBACK");
707 return(-1);
708 }
709
710 $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
711 " where server = $server and syncid = $syncid" .
712 " for update");
713 if ($result->resultStatus ne PGRES_TUPLES_OK)
714 {
715 print STDERR $conn->errorMessage unless ($quiet);
716 $conn->exec("ROLLBACK");
717 return(-1);
718 }
719 my @row = $result->fetchrow;
720 if (! defined $row[0])
721 {
722 printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
723 $conn->exec("ROLLBACK");
724 return(0);
725 }
726 if ($row[1] > 0)
727 {
728 printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);
729 $conn->exec("ROLLBACK");
730 return(0);
731 }
732 $result = $conn->exec("update _RSERV_SYNC_" .
733 " set synctime = now(), status = 1" .
734 " where server = $server and syncid = $syncid");
735 if ($result->resultStatus ne PGRES_COMMAND_OK)
736 {
737 print STDERR $conn->errorMessage unless ($quiet);
738 $conn->exec("ROLLBACK");
739 return(-1);
740 }
741 $result = $conn->exec("delete from _RSERV_SYNC_" .
742 " where server = $server and syncid < $syncid");
743 if ($result->resultStatus ne PGRES_COMMAND_OK)
744 {
745 print STDERR $conn->errorMessage unless ($quiet);
746 $conn->exec("ROLLBACK");
747 return(-1);
748 }
749
750 $result = $conn->exec("COMMIT");
751 if ($result->resultStatus ne PGRES_COMMAND_OK)
752 {
753 print STDERR $conn->errorMessage unless ($quiet);
754 $conn->exec("ROLLBACK");
755 return(-1);
756 }
757
758 return(1);
759 }
760
761 1;

  ViewVC Help
Powered by ViewVC 1.1.26