/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1.1.1 - (hide annotations) (vendor branch)
Wed Dec 20 17:22:35 2000 UTC (23 years, 4 months ago) by dpavlin
Branch: DbP
CVS Tags: rserv_0_1, r0, debian
Changes since 1.1: +0 -0 lines
import of rserv 0.1 distributed in directories

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9     @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);
10     @EXPORT_OK = qw();
11    
12     use Pg;
13    
14     $debug = 0;
15     $quiet = 1;
16    
17     my %Mtables = ();
18     my %Stables = ();
19    
20     sub PrepareSnapshot
21     {
22     my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);
23    
24     my $result = $conn->exec("BEGIN");
25     if ($result->resultStatus ne PGRES_COMMAND_OK)
26     {
27     print STDERR $conn->errorMessage unless ($quiet);
28     $conn->exec("ROLLBACK");
29     return(-1);
30     }
31     $result = $conn->exec("set transaction isolation level serializable");
32     if ($result->resultStatus ne PGRES_COMMAND_OK)
33     {
34     print STDERR $conn->errorMessage unless ($quiet);
35     $conn->exec("ROLLBACK");
36     return(-1);
37     }
38    
39     # MAP oid --> tabname, keyname
40     $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .
41     " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
42     " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
43     " and pga.attnum = rt.key");
44     if ($result->resultStatus ne PGRES_TUPLES_OK)
45     {
46     print STDERR $conn->errorMessage unless ($quiet);
47     $conn->exec("ROLLBACK");
48     return(-1);
49     }
50    
51     my @row;
52     while (@row = $result->fetchrow)
53     {
54     # printf "$row[0], $row[1], $row[2]\n";
55     push @{$Mtables{$row[0]}}, $row[1], $row[2];
56     }
57    
58     # Read last succeeded sync
59     $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
60     " where server = $server and syncid = (select max(syncid) from" .
61     " _RSERV_SYNC_ where server = $server and status > 0)";
62    
63     printf "$sql\n" if $debug;
64    
65     $result = $conn->exec($sql);
66     if ($result->resultStatus ne PGRES_TUPLES_OK)
67     {
68     print STDERR $conn->errorMessage unless ($quiet);
69     $conn->exec("ROLLBACK");
70     return(-1);
71     }
72    
73     my @lastsync = $result->fetchrow;
74    
75     my $sinfo = "";
76     if ($lastsync[3] ne '') # sync info
77     {
78     $sinfo = "and (l.logid >= $lastsync[3]";
79     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
80     $sinfo .= ")";
81     }
82    
83     my $havedeal = 0;
84    
85     # DELETED rows
86     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
87     " where l.deleted = 1 $sinfo order by l.reloid";
88    
89     printf "$sql\n" if $debug;
90    
91     $result = $conn->exec($sql);
92     if ($result->resultStatus ne PGRES_TUPLES_OK)
93     {
94     print STDERR $conn->errorMessage unless ($quiet);
95     $conn->exec("ROLLBACK");
96     return(-1);
97     }
98    
99     $lastoid = '';
100     while (@row = $result->fetchrow)
101     {
102     next unless exists $Mtables{$row[0]};
103     if ($lastoid != $row[0])
104     {
105     if ($lastoid eq '')
106     {
107     my $syncid = GetSYNCID($conn, $outf);
108     return($syncid) if $syncid < 0;
109     $havedeal = 1;
110     }
111     else
112     {
113     printf $outf "\\.\n";
114     }
115     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
116     $lastoid = $row[0];
117     }
118     if (! defined $row[1])
119     {
120     print STDERR "NULL key\n" unless ($quiet);
121     $conn->exec("ROLLBACK");
122     return(-2);
123     }
124     printf $outf "%s\n", OutputValue($row[1]);
125     }
126     printf $outf "\\.\n" if $lastoid ne '';
127    
128     # UPDATED rows
129    
130     my ($taboid, $tabname, $tabkey);
131     foreach $taboid (keys %Mtables)
132     {
133     ($tabname, $tabkey) = @{$Mtables{$taboid}};
134     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
135     $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .
136     " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .
137     " and l.key = _$tabname.${tabkey}::text";
138    
139     printf "$sql\n" if $debug;
140    
141     $result = $conn->exec($sql);
142     if ($result->resultStatus ne PGRES_TUPLES_OK)
143     {
144     printf $outf "-- ERROR\n" if $havedeal;
145     print STDERR $conn->errorMessage unless ($quiet);
146     $conn->exec("ROLLBACK");
147     return(-1);
148     }
149     next if $result->ntuples <= 0;
150     if (! $havedeal)
151     {
152     my $syncid = GetSYNCID($conn, $outf);
153     return($syncid) if $syncid < 0;
154     $havedeal = 1;
155     }
156     printf $outf "-- UPDATE $tabname\n";
157     while (@row = $result->fetchrow)
158     {
159     for ($i = 0; $i <= $#row; $i++)
160     {
161     printf $outf " " if $i;
162     printf $outf "%s", OutputValue($row[$i]);
163     }
164     printf $outf "\n";
165     }
166     printf $outf "\\.\n";
167     }
168    
169     unless ($havedeal)
170     {
171     $conn->exec("ROLLBACK");
172     return(0);
173     }
174    
175     # Remember this snapshot info
176     $result = $conn->exec("select _rserv_sync_($server)");
177     if ($result->resultStatus ne PGRES_TUPLES_OK)
178     {
179     printf $outf "-- ERROR\n";
180     print STDERR $conn->errorMessage unless ($quiet);
181     $conn->exec("ROLLBACK");
182     return(-1);
183     }
184    
185     $result = $conn->exec("COMMIT");
186     if ($result->resultStatus ne PGRES_COMMAND_OK)
187     {
188     printf $outf "-- ERROR\n";
189     print STDERR $conn->errorMessage unless ($quiet);
190     $conn->exec("ROLLBACK");
191     return(-1);
192     }
193     printf $outf "-- OK\n";
194    
195     return(1);
196    
197     }
198    
199     sub OutputValue
200     {
201     my ($val) = @_; # @_[0];
202    
203     return("\\N") unless defined $val;
204    
205     $val =~ s/\\/\\\\/g;
206     $val =~ s/ /\\011/g;
207     $val =~ s/\n/\\012/g;
208     $val =~ s/\'/\\047/g;
209    
210     return($val);
211     }
212    
213     # Get syncid for new snapshot
214     sub GetSYNCID
215     {
216     my ($conn, $outf) = @_; # (@_[0], @_[1]);
217    
218     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
219     if ($result->resultStatus ne PGRES_TUPLES_OK)
220     {
221     print STDERR $conn->errorMessage unless ($quiet);
222     $conn->exec("ROLLBACK");
223     return(-1);
224     }
225    
226     my @row = $result->fetchrow;
227    
228     printf $outf "-- SYNCID $row[0]\n";
229     return($row[0]);
230     }
231    
232    
233     sub CleanLog
234     {
235     my ($conn, $howold) = @_; # (@_[0], @_[1]);
236    
237     my $result = $conn->exec("BEGIN");
238     if ($result->resultStatus ne PGRES_COMMAND_OK)
239     {
240     print STDERR $conn->errorMessage unless ($quiet);
241     $conn->exec("ROLLBACK");
242     return(-1);
243     }
244    
245     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
246     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
247     " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";
248    
249     printf "$sql\n" if $debug;
250    
251     $result = $conn->exec($sql);
252     if ($result->resultStatus ne PGRES_TUPLES_OK)
253     {
254     print STDERR $conn->errorMessage unless ($quiet);
255     return(-1);
256     }
257     my $maxid = '';
258     my %active = ();
259     while (my @row = $result->fetchrow)
260     {
261     $maxid = $row[0] if $maxid eq '';
262     last if $row[0] > $maxid;
263     my @ids = split(/[ ]+,[ ]+/, $row[1]);
264     foreach $aid (@ids)
265     {
266     $active{$aid} = 1 unless exists $active{$aid};
267     }
268     }
269     if ($maxid eq '')
270     {
271     print STDERR "No Sync IDs\n" unless ($quiet);
272     return(0);
273     }
274     my $alist = join(',', keys %active);
275     my $sinfo = "logid < $maxid";
276     $sinfo .= " and logid not in ($alist)" if $alist ne '';
277    
278     $sql = "delete from _RSERV_LOG_ where " .
279     "logtime < now() - '$howold second'::interval and $sinfo";
280    
281     printf "$sql\n" if $debug;
282    
283     $result = $conn->exec($sql);
284     if ($result->resultStatus ne PGRES_COMMAND_OK)
285     {
286     print STDERR $conn->errorMessage unless ($quiet);
287     $conn->exec("ROLLBACK");
288     return(-1);
289     }
290     $maxid = $result->cmdTuples;
291    
292     $result = $conn->exec("COMMIT");
293     if ($result->resultStatus ne PGRES_COMMAND_OK)
294     {
295     print STDERR $conn->errorMessage unless ($quiet);
296     $conn->exec("ROLLBACK");
297     return(-1);
298     }
299    
300     return($maxid);
301     }
302    
303     sub ApplySnapshot
304     {
305     my ($conn, $inpf) = @_; # (@_[0], @_[1]);
306    
307     my $result = $conn->exec("BEGIN");
308     if ($result->resultStatus ne PGRES_COMMAND_OK)
309     {
310     print STDERR $conn->errorMessage unless ($quiet);
311     $conn->exec("ROLLBACK");
312     return(-1);
313     }
314    
315     $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
316     if ($result->resultStatus ne PGRES_COMMAND_OK)
317     {
318     print STDERR $conn->errorMessage unless ($quiet);
319     $conn->exec("ROLLBACK");
320     return(-1);
321     }
322    
323     # MAP name --> oid, keyname, keynum
324     my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
325     " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
326     " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
327     " and pga.attnum = rt.key";
328     $result = $conn->exec($sql);
329     if ($result->resultStatus ne PGRES_TUPLES_OK)
330     {
331     print STDERR $conn->errorMessage unless ($quiet);
332     $conn->exec("ROLLBACK");
333     return(-1);
334     }
335    
336     while (@row = $result->fetchrow)
337     {
338     # printf " %s %s\n", $row[1], $row[0];
339     push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
340     }
341    
342     my $ok = 0;
343     my $syncid = '';
344     while(<$inpf>)
345     {
346     $_ =~ s/\n//;
347     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
348     if ($cmt ne '--')
349     {
350     printf STDERR "Invalid format\n" unless ($quiet);
351     $conn->exec("ROLLBACK");
352     return(-2);
353     }
354     if ($cmd eq 'DELETE')
355     {
356     if ($syncid eq '')
357     {
358     printf STDERR "Sync ID unspecified\n" unless ($quiet);
359     $conn->exec("ROLLBACK");
360     return(-2);
361     }
362     $result = DoDelete($conn, $inpf, $prm);
363     if ($result)
364     {
365     $conn->exec("ROLLBACK");
366     return($result);
367     }
368     }
369     elsif ($cmd eq 'UPDATE')
370     {
371     if ($syncid eq '')
372     {
373     printf STDERR "Sync ID unspecified\n" unless ($quiet);
374     $conn->exec("ROLLBACK");
375     return(-2);
376     }
377     $result = DoUpdate($conn, $inpf, $prm);
378     if ($result)
379     {
380     $conn->exec("ROLLBACK");
381     return($result);
382     }
383     }
384     elsif ($cmd eq 'SYNCID')
385     {
386     if ($syncid ne '')
387     {
388     printf STDERR "Second Sync ID ?!\n" unless ($quiet);
389     $conn->exec("ROLLBACK");
390     return(-2);
391     }
392     if ($prm !~ /^\d+$/)
393     {
394     printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
395     $conn->exec("ROLLBACK");
396     return(-2);
397     }
398     $syncid = $prm;
399    
400     printf STDERR "Sync ID $syncid\n" unless ($quiet);
401    
402     $result = $conn->exec("select syncid, synctime from " .
403     "_RSERV_SLAVE_SYNC_ where syncid = " .
404     "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
405     if ($result->resultStatus ne PGRES_TUPLES_OK)
406     {
407     print STDERR $conn->errorMessage unless ($quiet);
408     $conn->exec("ROLLBACK");
409     return(-1);
410     }
411     my @row = $result->fetchrow;
412     if (! defined $row[0])
413     {
414     $result = $conn->exec("insert into" .
415     " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");
416     }
417     elsif ($row[0] >= $prm)
418     {
419     printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
420     $conn->exec("ROLLBACK");
421     return(0);
422     }
423     else
424     {
425     $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
426     " set syncid = $syncid, synctime = now()");
427     }
428     if ($result->resultStatus ne PGRES_COMMAND_OK)
429     {
430     print STDERR $conn->errorMessage unless ($quiet);
431     $conn->exec("ROLLBACK");
432     return(-1);
433     }
434     }
435     elsif ($cmd eq 'OK')
436     {
437     $ok = 1;
438     last;
439     }
440     elsif ($cmd eq 'ERROR')
441     {
442     printf STDERR "ERROR signaled\n" unless ($quiet);
443     $conn->exec("ROLLBACK");
444     return(-2);
445     }
446     else
447     {
448     printf STDERR "Unknown command $cmd\n" unless ($quiet);
449     $conn->exec("ROLLBACK");
450     return(-2);
451     }
452     }
453    
454     if (! $ok)
455     {
456     printf STDERR "No OK flag in input\n" unless ($quiet);
457     $conn->exec("ROLLBACK");
458     return(-2);
459     }
460    
461     $result = $conn->exec("COMMIT");
462     if ($result->resultStatus ne PGRES_COMMAND_OK)
463     {
464     print STDERR $conn->errorMessage unless ($quiet);
465     $conn->exec("ROLLBACK");
466     return(-1);
467     }
468    
469     return(1);
470     }
471    
472     sub DoDelete
473     {
474     my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
475    
476     my $ok = 0;
477     while(<$inpf>)
478     {
479     if ($_ !~ /\n$/)
480     {
481     printf STDERR "Invalid format\n" unless ($quiet);
482     return(-2);
483     }
484     my $key = $_;
485     $key =~ s/\n//;
486     if ($key eq '\.')
487     {
488     $ok = 1;
489     last;
490     }
491    
492     my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";
493    
494     printf "$sql\n" if $debug;
495    
496     my $result = $conn->exec($sql);
497     if ($result->resultStatus ne PGRES_COMMAND_OK)
498     {
499     print STDERR $conn->errorMessage unless ($quiet);
500     return(-1);
501     }
502     }
503    
504     if (! $ok)
505     {
506     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
507     return(-2);
508     }
509    
510     return(0);
511     }
512    
513    
514     sub DoUpdate
515     {
516     my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
517     my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
518    
519     my @CopyBuf = ();
520     my $CBufLen = 0;
521     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
522    
523     my $sql = "select attnum, attname from pg_attribute" .
524     " where attrelid = $Stables{$tabname}->[0] and attnum > 0";
525    
526     my $result = $conn->exec($sql);
527     if ($result->resultStatus ne PGRES_TUPLES_OK)
528     {
529     print STDERR $conn->errorMessage unless ($quiet);
530     return(-1);
531     }
532    
533     my @anames = ();
534     while (@row = $result->fetchrow)
535     {
536     $anames[$row[0]] = $row[1];
537     }
538    
539     my $istring;
540     my $ok = 0;
541     while(<$inpf>)
542     {
543     if ($_ !~ /\n$/)
544     {
545     printf STDERR "Invalid format\n" unless ($quiet);
546     return(-2);
547     }
548     $istring = $_;
549     $istring =~ s/\n//;
550     if ($istring eq '\.')
551     {
552     $ok = 1;
553     last;
554     }
555     my @vals = split(/ /, $istring);
556     if ($oidkey)
557     {
558     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
559     {
560     printf STDERR "Invalid OID\n" unless ($quiet);
561     return(-2);
562     }
563     $oidkey = $vals[0];
564     }
565     else
566     {
567     unshift @vals, '';
568     }
569    
570     $sql = "update $tabname set ";
571     my $ocnt = 0;
572     for (my $i = 1; $i <= $#anames; $i++)
573     {
574     if ($vals[$i] eq '\N')
575     {
576     if ($i == $Stables{$tabname}->[2])
577     {
578     printf STDERR "NULL key\n" unless ($quiet);
579     return(-2);
580     }
581     $vals[$i] = 'null';
582     }
583     else
584     {
585     $vals[$i] = "'" . $vals[$i] . "'";
586     next if $i == $Stables{$tabname}->[2];
587     }
588     $ocnt++;
589     $sql .= ', ' if $ocnt > 1;
590     $sql .= "$anames[$i] = $vals[$i]";
591     }
592     if ($oidkey)
593     {
594     $sql .= " where $Stables{$tabname}->[1] = $oidkey";
595     }
596     else
597     {
598     $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";
599     }
600    
601     printf "$sql\n" if $debug;
602    
603     $result = $conn->exec($sql);
604     if ($result->resultStatus ne PGRES_COMMAND_OK)
605     {
606     print STDERR $conn->errorMessage unless ($quiet);
607     return(-1);
608     }
609     next if $result->cmdTuples == 1; # updated
610    
611     if ($result->cmdTuples > 1)
612     {
613     printf STDERR "Duplicate keys\n" unless ($quiet);
614     return(-2);
615     }
616    
617     # no key - copy
618     push @CopyBuf, "$istring\n";
619     $CBufLen += length($istring);
620    
621     if ($CBufLen >= $CBufMax)
622     {
623     $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
624     return($result) if $result;
625     @CopyBuf = ();
626     $CBufLen = 0;
627     }
628     }
629    
630     if (! $ok)
631     {
632     printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
633     return(-2);
634     }
635    
636     if ($CBufLen)
637     {
638     $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
639     return($result) if $result;
640     }
641    
642     return(0);
643     }
644    
645    
646     sub DoCopy
647     {
648     my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
649    
650     my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .
651     "FROM STDIN";
652     my $result = $conn->exec($sql);
653     if ($result->resultStatus ne PGRES_COPY_IN)
654     {
655     print STDERR $conn->errorMessage unless ($quiet);
656     return(-1);
657     }
658    
659     foreach $str (@{$CBuf})
660     {
661     $conn->putline($str);
662     }
663    
664     $conn->putline("\\.\n");
665    
666     if ($conn->endcopy)
667     {
668     print STDERR $conn->errorMessage unless ($quiet);
669     return(-1);
670     }
671    
672     return(0);
673    
674     }
675    
676    
677     #
678     # Returns last SyncID applied on Slave
679     #
680     sub GetSyncID
681     {
682     my ($conn) = @_; # (@_[0]);
683    
684     my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
685     if ($result->resultStatus ne PGRES_TUPLES_OK)
686     {
687     print STDERR $conn->errorMessage unless ($quiet);
688     return(-1);
689     }
690     my @row = $result->fetchrow;
691     return(undef) unless defined $row[0]; # null
692     return($row[0]);
693     }
694    
695     #
696     # Updates _RSERV_SYNC_ on Master with Slave SyncID
697     #
698     sub SyncSyncID
699     {
700     my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
701    
702     my $result = $conn->exec("BEGIN");
703     if ($result->resultStatus ne PGRES_COMMAND_OK)
704     {
705     print STDERR $conn->errorMessage unless ($quiet);
706     $conn->exec("ROLLBACK");
707     return(-1);
708     }
709    
710     $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
711     " where server = $server and syncid = $syncid" .
712     " for update");
713     if ($result->resultStatus ne PGRES_TUPLES_OK)
714     {
715     print STDERR $conn->errorMessage unless ($quiet);
716     $conn->exec("ROLLBACK");
717     return(-1);
718     }
719     my @row = $result->fetchrow;
720     if (! defined $row[0])
721     {
722     printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
723     $conn->exec("ROLLBACK");
724     return(0);
725     }
726     if ($row[1] > 0)
727     {
728     printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);
729     $conn->exec("ROLLBACK");
730     return(0);
731     }
732     $result = $conn->exec("update _RSERV_SYNC_" .
733     " set synctime = now(), status = 1" .
734     " where server = $server and syncid = $syncid");
735     if ($result->resultStatus ne PGRES_COMMAND_OK)
736     {
737     print STDERR $conn->errorMessage unless ($quiet);
738     $conn->exec("ROLLBACK");
739     return(-1);
740     }
741     $result = $conn->exec("delete from _RSERV_SYNC_" .
742     " where server = $server and syncid < $syncid");
743     if ($result->resultStatus ne PGRES_COMMAND_OK)
744     {
745     print STDERR $conn->errorMessage unless ($quiet);
746     $conn->exec("ROLLBACK");
747     return(-1);
748     }
749    
750     $result = $conn->exec("COMMIT");
751     if ($result->resultStatus ne PGRES_COMMAND_OK)
752     {
753     print STDERR $conn->errorMessage unless ($quiet);
754     $conn->exec("ROLLBACK");
755     return(-1);
756     }
757    
758     return(1);
759     }
760    
761     1;

  ViewVC Help
Powered by ViewVC 1.1.26