/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (hide annotations)
Tue Aug 5 09:52:41 2003 UTC (20 years, 9 months ago) by dpavlin
Branch: MAIN
Changes since 1.1: +843 -635 lines
rserv 0.2 changes by Nélio Alves Pereira Filho

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.2 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10 dpavlin 1.1 @EXPORT_OK = qw();
11    
12     use Pg;
13    
14     $debug = 0;
15     $quiet = 1;
16    
17     my %Mtables = ();
18     my %Stables = ();
19    
20 dpavlin 1.2 sub GetSlaveId
21 dpavlin 1.1 {
22 dpavlin 1.2 my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23 dpavlin 1.1
24 dpavlin 1.2 $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25     " host='$slaveHost' AND dbase='$slaveDB'");
26     if ($result->resultStatus ne PGRES_TUPLES_OK)
27     {
28     print STDERR $conn->errorMessage unless ($quiet);
29     return(-1);
30     }
31    
32     if ($result->cmdTuples > 1)
33     {
34     printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
35     return(-2);
36     }
37    
38     my @row = $result->fetchrow;
39    
40     return $row[0];
41     }
42 dpavlin 1.1
43 dpavlin 1.2 sub PrepareSnapshot
44     {
45     my ($conn, $sconn, $outf, $server) = @_; # (@_[0], @_[1], @_[2], $_[3]);
46 dpavlin 1.1
47 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
48     my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
49     if ($result->resultStatus ne PGRES_TUPLES_OK)
50     {
51     print STDERR $conn->errorMessage unless ($quiet);
52     return(-1);
53     }
54    
55     my @row;
56     while (@row = $result->fetchrow) {
57     $Stables{$row[0]} = 1;
58     }
59    
60     $result = $conn->exec("BEGIN");
61     if ($result->resultStatus ne PGRES_COMMAND_OK)
62     {
63     print STDERR $conn->errorMessage unless ($quiet);
64     $conn->exec("ROLLBACK");
65     return(-1);
66     }
67     $result = $conn->exec("set transaction isolation level serializable");
68     if ($result->resultStatus ne PGRES_COMMAND_OK)
69     {
70     print STDERR $conn->errorMessage unless ($quiet);
71     $conn->exec("ROLLBACK");
72     return(-1);
73     }
74    
75     # MAP oid --> tabname, keyname, key_type
76     $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
77     " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
78     ", pg_type pgt".
79     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
80     " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
81     if ($result->resultStatus ne PGRES_TUPLES_OK)
82     {
83     print STDERR $conn->errorMessage unless ($quiet);
84     $conn->exec("ROLLBACK");
85     return(-1);
86     }
87    
88     while (@row = $result->fetchrow)
89     {
90 dpavlin 1.1 # printf "$row[0], $row[1], $row[2]\n";
91 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
92     }
93    
94     # Read last succeeded sync
95     $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
96     " where server = $server AND syncid = (select max(syncid) from" .
97     " _RSERV_SYNC_ where server = $server AND status > 0)";
98    
99     printf "$sql\n" if $debug;
100    
101     $result = $conn->exec($sql);
102     if ($result->resultStatus ne PGRES_TUPLES_OK)
103     {
104     print STDERR $conn->errorMessage unless ($quiet);
105     $conn->exec("ROLLBACK");
106     return(-1);
107     }
108    
109     my @lastsync = $result->fetchrow;
110    
111     my $sinfo = "";
112     if ($lastsync[3] ne '') # sync info
113     {
114     $sinfo = "and (l.logid >= $lastsync[3]";
115     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
116     $sinfo .= ")";
117     }
118    
119     my $havedeal = 0;
120    
121     # DELETED rows
122     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
123     " where l.delete = 1 $sinfo order by l.reloid";
124    
125     printf "$sql\n" if $debug;
126    
127     $result = $conn->exec($sql);
128     if ($result->resultStatus ne PGRES_TUPLES_OK)
129     {
130     print STDERR $conn->errorMessage unless ($quiet);
131     $conn->exec("ROLLBACK");
132     return(-1);
133     }
134    
135     $lastoid = '';
136     while (@row = $result->fetchrow)
137     {
138     next unless exists $Mtables{$row[0]};
139     next unless exists $Stables{$Mtables{$row[0]}[0]};
140    
141     if ($lastoid != $row[0])
142     {
143     if ($lastoid eq '')
144     {
145     my $syncid = GetSYNCID($conn, $outf);
146     return($syncid) if $syncid < 0;
147     $havedeal = 1;
148     }
149     else
150     {
151     printf $outf "\\.\n";
152     }
153     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
154     $lastoid = $row[0];
155     }
156     if (! defined $row[1])
157     {
158     print STDERR "NULL key\n" unless ($quiet);
159     $conn->exec("ROLLBACK");
160     return(-2);
161     }
162     printf $outf "%s\n", OutputValue($row[1]);
163     }
164     printf $outf "\\.\n" if $lastoid ne '';
165    
166     # UPDATED rows
167    
168     my ($taboid, $tabname, $tabkey);
169     foreach $taboid (keys %Mtables)
170     {
171     ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
172     next unless exists $Stables{$tabname};
173    
174     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
175    
176     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
177     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
178     " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
179    
180 dpavlin 1.1 printf "$sql\n" if $debug;
181 dpavlin 1.2
182 dpavlin 1.1 $result = $conn->exec($sql);
183     if ($result->resultStatus ne PGRES_TUPLES_OK)
184     {
185 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
186     print STDERR $conn->errorMessage unless ($quiet);
187     $conn->exec("ROLLBACK");
188     return(-1);
189     }
190     next if $result->ntuples <= 0;
191     if (! $havedeal)
192     {
193     my $syncid = GetSYNCID($conn, $outf);
194     return($syncid) if $syncid < 0;
195     $havedeal = 1;
196 dpavlin 1.1 }
197 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
198     printf "-- UPDATE $tabname\n" if $debug;
199     while (@row = $result->fetchrow)
200 dpavlin 1.1 {
201 dpavlin 1.2 for ($i = 0; $i <= $#row; $i++)
202     {
203     printf $outf " " if $i;
204     printf " " if $i && $debug;
205     printf $outf "%s", OutputValue($row[$i]);
206     printf "%s", OutputValue($row[$i]) if $debug;;
207     }
208     printf $outf "\n";
209     printf "\n" if $debug;
210     }
211     printf $outf "\\.\n";
212     printf "\\.\n" if $debug;;
213     }
214    
215     # INSERTED rows
216    
217     my ($taboid, $tabname, $tabkey);
218     foreach $taboid (keys %Mtables)
219     {
220     ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
221     next unless exists $Stables{$tabname};
222    
223     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
224    
225     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
226     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
227     " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
228    
229 dpavlin 1.1 printf "$sql\n" if $debug;
230 dpavlin 1.2
231 dpavlin 1.1 $result = $conn->exec($sql);
232     if ($result->resultStatus ne PGRES_TUPLES_OK)
233     {
234 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
235     print STDERR $conn->errorMessage unless ($quiet);
236     $conn->exec("ROLLBACK");
237     return(-1);
238     }
239     next if $result->ntuples <= 0;
240     if (! $havedeal)
241     {
242     my $syncid = GetSYNCID($conn, $outf);
243     return($syncid) if $syncid < 0;
244     $havedeal = 1;
245 dpavlin 1.1 }
246 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
247     printf "-- INSERT $tabname\n" if $debug;
248 dpavlin 1.1 while (@row = $result->fetchrow)
249     {
250 dpavlin 1.2 for ($i = 0; $i <= $#row; $i++)
251     {
252     printf $outf " " if $i;
253     printf " " if $i && $debug;
254     printf $outf "%s", OutputValue($row[$i]);
255     printf "%s", OutputValue($row[$i]) if $debug;;
256     }
257     printf $outf "\n";
258     printf "\n" if $debug;
259     }
260     printf $outf "\\.\n";
261     printf "\\.\n" if $debug;;
262     }
263    
264    
265     unless ($havedeal)
266     {
267     $conn->exec("ROLLBACK");
268     return(0);
269     }
270    
271     # Remember this snapshot info
272     $result = $conn->exec("select _rserv_sync_($server)");
273     if ($result->resultStatus ne PGRES_TUPLES_OK)
274     {
275     printf $outf "-- ERROR\n";
276     print STDERR $conn->errorMessage unless ($quiet);
277     $conn->exec("ROLLBACK");
278     return(-1);
279     }
280    
281     $result = $conn->exec("COMMIT");
282     if ($result->resultStatus ne PGRES_COMMAND_OK)
283     {
284     printf $outf "-- ERROR\n";
285     print STDERR $conn->errorMessage unless ($quiet);
286     $conn->exec("ROLLBACK");
287     return(-1);
288     }
289     printf $outf "-- OK\n";
290     printf "-- OK\n" if $debug;
291    
292     return(1);
293    
294 dpavlin 1.1 }
295    
296     sub OutputValue
297     {
298     my ($val) = @_; # @_[0];
299    
300     return("\\N") unless defined $val;
301    
302     $val =~ s/\\/\\\\/g;
303     $val =~ s/ /\\011/g;
304     $val =~ s/\n/\\012/g;
305     $val =~ s/\'/\\047/g;
306    
307     return($val);
308     }
309    
310     # Get syncid for new snapshot
311     sub GetSYNCID
312     {
313 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
314    
315     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
316     if ($result->resultStatus ne PGRES_TUPLES_OK)
317     {
318     print STDERR $conn->errorMessage unless ($quiet);
319     $conn->exec("ROLLBACK");
320     return(-1);
321     }
322    
323     my @row = $result->fetchrow;
324    
325     printf $outf "-- SYNCID $row[0]\n";
326     printf "-- SYNCID $row[0]\n" if $debug;
327     return($row[0]);
328 dpavlin 1.1 }
329    
330    
331     sub CleanLog
332     {
333 dpavlin 1.2 my ($conn, $howold) = @_; # (@_[0], @_[1]);
334    
335     my $result = $conn->exec("BEGIN");
336     if ($result->resultStatus ne PGRES_COMMAND_OK)
337     {
338     print STDERR $conn->errorMessage unless ($quiet);
339     $conn->exec("ROLLBACK");
340     return(-1);
341     }
342    
343     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
344     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
345     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
346    
347     printf "$sql\n" if $debug;
348    
349     $result = $conn->exec($sql);
350     if ($result->resultStatus ne PGRES_TUPLES_OK)
351     {
352     print STDERR $conn->errorMessage unless ($quiet);
353     return(-1);
354     }
355     my $maxid = '';
356     my %active = ();
357     while (my @row = $result->fetchrow)
358     {
359     $maxid = $row[0] if $maxid eq '';
360     last if $row[0] > $maxid;
361     my @ids = split(/[ ]+,[ ]+/, $row[1]);
362     foreach $aid (@ids)
363     {
364     $active{$aid} = 1 unless exists $active{$aid};
365     }
366     }
367     if ($maxid eq '')
368     {
369     print STDERR "No Sync IDs\n" unless ($quiet);
370     return(0);
371     }
372     my $alist = join(',', keys %active);
373     my $sinfo = "logid < $maxid";
374     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
375    
376     $sql = "delete from _RSERV_LOG_ where " .
377     "logtime < now() - '$howold second'::interval AND $sinfo";
378    
379     printf "$sql\n" if $debug;
380    
381     $result = $conn->exec($sql);
382     if ($result->resultStatus ne PGRES_COMMAND_OK)
383     {
384     print STDERR $conn->errorMessage unless ($quiet);
385     $conn->exec("ROLLBACK");
386     return(-1);
387     }
388     $maxid = $result->cmdTuples;
389    
390     $result = $conn->exec("COMMIT");
391     if ($result->resultStatus ne PGRES_COMMAND_OK)
392     {
393     print STDERR $conn->errorMessage unless ($quiet);
394     $conn->exec("ROLLBACK");
395     return(-1);
396     }
397    
398     return($maxid);
399     }
400 dpavlin 1.1
401 dpavlin 1.2 sub ApplySnapshot
402     {
403     my ($conn, $inpf) = @_; # (@_[0], @_[1]);
404    
405     my $result = $conn->exec("BEGIN");
406     if ($result->resultStatus ne PGRES_COMMAND_OK)
407     {
408     print STDERR $conn->errorMessage unless ($quiet);
409     $conn->exec("ROLLBACK");
410     return(-1);
411     }
412    
413     $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
414     if ($result->resultStatus ne PGRES_COMMAND_OK)
415     {
416     print STDERR $conn->errorMessage unless ($quiet);
417     $conn->exec("ROLLBACK");
418     return(-1);
419     }
420    
421     # MAP name --> oid, keyname, keynum
422     my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
423     " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
424     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
425     " AND pga.attnum = rt.key";
426    
427     $result = $conn->exec($sql);
428     if ($result->resultStatus ne PGRES_TUPLES_OK)
429     {
430     print STDERR $conn->errorMessage unless ($quiet);
431     $conn->exec("ROLLBACK");
432     return(-1);
433     }
434     %Stables = ();
435     while (@row = $result->fetchrow)
436     {
437     # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
438     push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
439     }
440    
441     my $ok = 0;
442     my $syncid = '';
443     while(<$inpf>)
444     {
445     $_ =~ s/\n//;
446     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
447     if ($cmt ne '--')
448     {
449     printf STDERR "Invalid format\n" unless ($quiet);
450     $conn->exec("ROLLBACK");
451     return(-2);
452     }
453     if ($cmd eq 'DELETE')
454     {
455     if ($syncid eq '')
456     {
457     printf STDERR "Sync ID unspecified\n" unless ($quiet);
458 dpavlin 1.1 $conn->exec("ROLLBACK");
459 dpavlin 1.2 return(-2);
460     }
461     $result = DoDelete($conn, $inpf, $prm);
462     if ($result)
463     {
464     $conn->exec("ROLLBACK");
465     return($result);
466     }
467     }
468     elsif ($cmd eq 'INSERT')
469     {
470     if ($syncid eq '')
471     {
472     printf STDERR "Sync ID unspecified\n" unless ($quiet);
473 dpavlin 1.1 $conn->exec("ROLLBACK");
474 dpavlin 1.2 return(-2);
475     }
476     $result = DoInsert($conn, $inpf, $prm);
477     if ($result)
478     {
479     $conn->exec("ROLLBACK");
480     return($result);
481     }
482     }
483     elsif ($cmd eq 'UPDATE')
484     {
485     if ($syncid eq '')
486     {
487     printf STDERR "Sync ID unspecified\n" unless ($quiet);
488 dpavlin 1.1 $conn->exec("ROLLBACK");
489 dpavlin 1.2 return(-2);
490     }
491     $result = DoUpdate($conn, $inpf, $prm);
492     if ($result)
493     {
494     $conn->exec("ROLLBACK");
495     return($result);
496     }
497     }
498     elsif ($cmd eq 'SYNCID')
499     {
500     if ($syncid ne '')
501     {
502     printf STDERR "Second Sync ID ?!\n" unless ($quiet);
503 dpavlin 1.1 $conn->exec("ROLLBACK");
504 dpavlin 1.2 return(-2);
505     }
506     if ($prm !~ /^\d+$/)
507     {
508     printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
509 dpavlin 1.1 $conn->exec("ROLLBACK");
510 dpavlin 1.2 return(-2);
511     }
512     $syncid = $prm;
513    
514     printf STDERR "Sync ID $syncid\n" unless ($quiet);
515    
516     $result = $conn->exec("select syncid, synctime from " .
517     "_RSERV_SLAVE_SYNC_ where syncid = " .
518     "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
519     if ($result->resultStatus ne PGRES_TUPLES_OK)
520     {
521 dpavlin 1.1 print STDERR $conn->errorMessage unless ($quiet);
522     $conn->exec("ROLLBACK");
523     return(-1);
524 dpavlin 1.2 }
525     my @row = $result->fetchrow;
526     if (! defined $row[0])
527     {
528     $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".
529     "(syncid, synctime) values ($syncid, now())");
530     }
531     elsif ($row[0] >= $prm)
532     {
533     printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
534 dpavlin 1.1 $conn->exec("ROLLBACK");
535 dpavlin 1.2 return(0);
536     }
537     else
538     {
539     $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
540     " set syncid = $syncid, synctime = now()");
541     }
542     if ($result->resultStatus ne PGRES_COMMAND_OK)
543     {
544 dpavlin 1.1 print STDERR $conn->errorMessage unless ($quiet);
545     $conn->exec("ROLLBACK");
546     return(-1);
547 dpavlin 1.2 }
548     }
549     elsif ($cmd eq 'OK')
550     {
551     $ok = 1;
552     last;
553     }
554     elsif ($cmd eq 'ERROR')
555     {
556     printf STDERR "ERROR signaled\n" unless ($quiet);
557     $conn->exec("ROLLBACK");
558     return(-2);
559     }
560     else
561     {
562     printf STDERR "Unknown command $cmd\n" unless ($quiet);
563     $conn->exec("ROLLBACK");
564     return(-2);
565     }
566     }
567    
568     if (! $ok)
569     {
570     printf STDERR "No OK flag in input\n" unless ($quiet);
571     $conn->exec("ROLLBACK");
572     return(-2);
573     }
574    
575     $result = $conn->exec("COMMIT");
576     if ($result->resultStatus ne PGRES_COMMAND_OK)
577     {
578     print STDERR $conn->errorMessage unless ($quiet);
579     $conn->exec("ROLLBACK");
580     return(-1);
581     }
582    
583     return(1);
584 dpavlin 1.1 }
585    
586     sub DoDelete
587     {
588 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
589    
590     # only delete tables that the slave wants
591     if (! defined($Stables{$tabname})) {
592     print "Not configured to delete rows from table $tabname\n" unless $quiet;
593     while (<$inpf>) {
594     $istring = $_;
595     $istring =~ s/\n//;
596     last if ($istring eq '\.');
597     }
598     return(0);
599     }
600 dpavlin 1.1
601 dpavlin 1.2 my $ok = 0;
602     while(<$inpf>)
603     {
604     if ($_ !~ /\n$/)
605     {
606     printf STDERR "Invalid format\n" unless ($quiet);
607     return(-2);
608     }
609     my $key = $_;
610     $key =~ s/\n//;
611     if ($key eq '\.')
612 dpavlin 1.1 {
613 dpavlin 1.2 $ok = 1;
614     last;
615 dpavlin 1.1 }
616 dpavlin 1.2
617     my $sql = "delete from \"$tabname\" where ".
618     "\"$Stables{$tabname}->[1]\" = '$key'";
619    
620     printf "$sql\n" if $debug;
621    
622     my $result = $conn->exec($sql);
623     if ($result->resultStatus ne PGRES_COMMAND_OK)
624 dpavlin 1.1 {
625 dpavlin 1.2 print STDERR $conn->errorMessage unless ($quiet);
626     return(-1);
627 dpavlin 1.1 }
628 dpavlin 1.2 }
629    
630     if (! $ok)
631     {
632     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
633     return(-2);
634     }
635    
636     return(0);
637 dpavlin 1.1 }
638    
639    
640     sub DoUpdate
641     {
642 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
643 dpavlin 1.1
644 dpavlin 1.2 # only update the tables that the slave wants
645     if (! defined($Stables{$tabname})) {
646     print "Not configured to update rows from table $tabname\n" unless $quiet;
647     while (<$inpf>) {
648     $istring = $_;
649     $istring =~ s/\n//;
650     last if ($istring eq '\.');
651     }
652     return(0);
653     }
654 dpavlin 1.1
655 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
656    
657     my @CopyBuf = ();
658     my $CBufLen = 0;
659     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
660    
661     my $sql = "select attnum, attname from pg_attribute" .
662     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
663    
664     my $result = $conn->exec($sql);
665     if ($result->resultStatus ne PGRES_TUPLES_OK)
666     {
667     print STDERR $conn->errorMessage unless ($quiet);
668     return(-1);
669     }
670    
671     my @anames = ();
672     while (@row = $result->fetchrow)
673     {
674     $anames[$row[0]] = $row[1];
675     }
676    
677     my $istring;
678     my $ok = 0;
679     while(<$inpf>)
680     {
681     if ($_ !~ /\n$/)
682     {
683     printf STDERR "Invalid format\n" unless ($quiet);
684     return(-2);
685     }
686     $istring = $_;
687     $istring =~ s/\n//;
688     if ($istring eq '\.')
689     {
690     $ok = 1;
691     last;
692     }
693     my @vals = split(/ /, $istring);
694     if ($oidkey)
695     {
696     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
697     {
698     printf STDERR "Invalid OID\n" unless ($quiet);
699     return(-2);
700     }
701     $oidkey = $vals[0];
702     }
703     else
704 dpavlin 1.1 {
705 dpavlin 1.2 unshift @vals, '';
706     }
707    
708     $sql = "update \"$tabname\" set ";
709     my $ocnt = 0;
710     for (my $i = 1; $i <= $#anames; $i++)
711     {
712     if ($vals[$i] eq '\N')
713     {
714     if ($i == $Stables{$tabname}->[2])
715     {
716     printf STDERR "NULL key\n" unless ($quiet);
717     return(-2);
718     }
719     $vals[$i] = 'null';
720     }
721     else
722     {
723     $vals[$i] = "'" . $vals[$i] . "'";
724     next if $i == $Stables{$tabname}->[2];
725     }
726     $ocnt++;
727     $sql .= ', ' if $ocnt > 1;
728     $sql .= "\"$anames[$i]\" = $vals[$i]";
729 dpavlin 1.1 }
730 dpavlin 1.2 if ($oidkey)
731 dpavlin 1.1 {
732 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
733 dpavlin 1.1 }
734 dpavlin 1.2 else
735 dpavlin 1.1 {
736 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
737     $vals[$Stables{$tabname}->[2]];
738 dpavlin 1.1 }
739 dpavlin 1.2
740     printf "$sql\n" if $debug;
741    
742     $result = $conn->exec($sql);
743    
744     if ($result->resultStatus ne PGRES_COMMAND_OK)
745 dpavlin 1.1 {
746 dpavlin 1.2 print STDERR $conn->errorMessage unless ($quiet);
747     return(-1);
748 dpavlin 1.1 }
749 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
750    
751     if ($result->cmdTuples > 1)
752     {
753     printf STDERR "Duplicate keys\n" unless ($quiet);
754     return(-2);
755     }
756    
757     # no key - copy
758     push @CopyBuf, "$istring\n";
759     $CBufLen += length($istring);
760    
761     if ($CBufLen >= $CBufMax)
762 dpavlin 1.1 {
763 dpavlin 1.2 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
764     return($result) if $result;
765     @CopyBuf = ();
766     $CBufLen = 0;
767     }
768     }
769    
770     if (! $ok)
771     {
772     printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
773     return(-2);
774     }
775    
776     if ($CBufLen)
777     {
778     print "@CopyBuf\n" if $debug;
779     $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
780     return($result) if $result;
781     }
782 dpavlin 1.1
783 dpavlin 1.2 return(0);
784 dpavlin 1.1 }
785    
786 dpavlin 1.2 sub DoInsert
787 dpavlin 1.1 {
788 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
789 dpavlin 1.1
790 dpavlin 1.2 # only insert rows into tables that the slave wants
791     if (! defined($Stables{$tabname})) {
792     print "Not configured to insert rows from table $tabname\n" unless $quiet;
793     while (<$inpf>) {
794     $istring = $_;
795     $istring =~ s/\n//;
796     last if ($istring eq '\.');
797 dpavlin 1.1 }
798 dpavlin 1.2 return(0);
799     }
800 dpavlin 1.1
801 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
802    
803     my @CopyBuf = ();
804     my $CBufLen = 0;
805     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
806    
807     my $istring;
808     my $ok = 0;
809     while(<$inpf>)
810     {
811     if ($_ !~ /\n$/)
812     {
813     printf STDERR "Invalid format\n" unless ($quiet);
814     return(-2);
815     }
816     $istring = $_;
817     $istring =~ s/\n//;
818     if ($istring eq '\.')
819     {
820     $ok = 1;
821     last;
822     }
823    
824     # no key - copy
825     push @CopyBuf, "$istring\n";
826     $CBufLen += length($istring);
827    
828     if ($CBufLen >= $CBufMax)
829 dpavlin 1.1 {
830 dpavlin 1.2 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
831     return($result) if $result;
832     @CopyBuf = ();
833     $CBufLen = 0;
834     }
835     }
836    
837     if (! $ok)
838     {
839     printf STDERR "No end of input in INSERT section\n" unless ($quiet);
840     return(-2);
841     }
842    
843     if ($CBufLen)
844     {
845     print "@CopyBuf\n" if $debug;
846     $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
847     return($result) if $result;
848     }
849    
850     return(0);
851     }
852 dpavlin 1.1
853    
854 dpavlin 1.2 sub DoCopy
855     {
856     my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
857    
858     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
859     "FROM STDIN";
860     my $result = $conn->exec($sql);
861     if ($result->resultStatus ne PGRES_COPY_IN)
862     {
863     print STDERR $conn->errorMessage unless ($quiet);
864     return(-1);
865     }
866    
867     foreach $str (@{$CBuf})
868     {
869     $conn->putline($str);
870     }
871    
872     $conn->putline("\\.\n");
873    
874     if ($conn->endcopy)
875     {
876     print STDERR $conn->errorMessage unless ($quiet);
877     return(-1);
878     }
879    
880     return(0);
881 dpavlin 1.1 }
882    
883    
884     #
885     # Returns last SyncID applied on Slave
886     #
887     sub GetSyncID
888     {
889 dpavlin 1.2 my ($conn) = @_; # (@_[0]);
890    
891     my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
892     if ($result->resultStatus ne PGRES_TUPLES_OK)
893     {
894     print STDERR $conn->errorMessage unless ($quiet);
895     return(-1);
896     }
897     my @row = $result->fetchrow;
898     return(undef) unless defined $row[0]; # null
899     return($row[0]);
900 dpavlin 1.1 }
901    
902     #
903     # Updates _RSERV_SYNC_ on Master with Slave SyncID
904     #
905     sub SyncSyncID
906     {
907 dpavlin 1.2 my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
908    
909     my $result = $conn->exec("BEGIN");
910     if ($result->resultStatus ne PGRES_COMMAND_OK)
911     {
912     print STDERR $conn->errorMessage unless ($quiet);
913     $conn->exec("ROLLBACK");
914     return(-1);
915     }
916    
917     $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
918     " where server = $server AND syncid = $syncid" .
919     " for update");
920     if ($result->resultStatus ne PGRES_TUPLES_OK)
921     {
922     print STDERR $conn->errorMessage unless ($quiet);
923     $conn->exec("ROLLBACK");
924     return(-1);
925     }
926     my @row = $result->fetchrow;
927     if (! defined $row[0])
928     {
929     printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
930     $conn->exec("ROLLBACK");
931     return(0);
932     }
933     if ($row[1] > 0)
934     {
935     printf STDERR "SyncID $syncid for server ".
936     "$server already updated\n" unless ($quiet);
937     $conn->exec("ROLLBACK");
938     return(0);
939     }
940     $result = $conn->exec("update _RSERV_SYNC_" .
941     " set synctime = now(), status = 1" .
942     " where server = $server AND syncid = $syncid");
943     if ($result->resultStatus ne PGRES_COMMAND_OK)
944     {
945     print STDERR $conn->errorMessage unless ($quiet);
946     $conn->exec("ROLLBACK");
947     return(-1);
948     }
949     $result = $conn->exec("delete from _RSERV_SYNC_" .
950     " where server = $server AND syncid < $syncid");
951     if ($result->resultStatus ne PGRES_COMMAND_OK)
952     {
953     print STDERR $conn->errorMessage unless ($quiet);
954     $conn->exec("ROLLBACK");
955     return(-1);
956     }
957    
958     $result = $conn->exec("COMMIT");
959     if ($result->resultStatus ne PGRES_COMMAND_OK)
960     {
961     print STDERR $conn->errorMessage unless ($quiet);
962     $conn->exec("ROLLBACK");
963     return(-1);
964     }
965    
966     return(1);
967 dpavlin 1.1 }
968    
969     1;

  ViewVC Help
Powered by ViewVC 1.1.26