/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (hide annotations)
Sun Oct 26 23:43:54 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
CVS Tags: before_multmaster, r_0_3
Changes since 1.4: +16 -4 lines
applied patch from Michael A Nachbaur to select tables which are replicating

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.2 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetSlaveId);
10 dpavlin 1.1 @EXPORT_OK = qw();
11 dpavlin 1.3 use strict;
12 dpavlin 1.1 use Pg;
13    
14 dpavlin 1.3 my $debug = 0;
15     my $quiet = 1;
16 dpavlin 1.1
17     my %Mtables = ();
18     my %Stables = ();
19    
20 dpavlin 1.2 sub GetSlaveId
21 dpavlin 1.1 {
22 dpavlin 1.2 my ($conn, $slaveDB, $slaveHost) = @_; # (@_[0], @_[1]. @_[2]);
23 dpavlin 1.1
24 dpavlin 1.3 my $result = $conn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
25 dpavlin 1.2 " host='$slaveHost' AND dbase='$slaveDB'");
26 dpavlin 1.3
27 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
28     {
29     print STDERR $conn->errorMessage unless ($quiet);
30     return(-1);
31     }
32    
33 dpavlin 1.3 if ($result->cmdTuples && $result->cmdTuples > 1)
34 dpavlin 1.2 {
35     printf STDERR "Duplicate slave definitions.\n" unless ($quiet);
36     return(-2);
37     }
38    
39     my @row = $result->fetchrow;
40    
41     return $row[0];
42     }
43 dpavlin 1.1
44 dpavlin 1.2 sub PrepareSnapshot
45     {
46 dpavlin 1.5 my ($conn, $sconn, $outf, $server, $onlytables) = @_;
47 dpavlin 1.1
48 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
49     my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
50     if ($result->resultStatus ne PGRES_TUPLES_OK)
51     {
52     print STDERR $conn->errorMessage unless ($quiet);
53     return(-1);
54     }
55    
56     my @row;
57     while (@row = $result->fetchrow) {
58     $Stables{$row[0]} = 1;
59     }
60    
61     $result = $conn->exec("BEGIN");
62     if ($result->resultStatus ne PGRES_COMMAND_OK)
63     {
64     print STDERR $conn->errorMessage unless ($quiet);
65     $conn->exec("ROLLBACK");
66     return(-1);
67     }
68     $result = $conn->exec("set transaction isolation level serializable");
69     if ($result->resultStatus ne PGRES_COMMAND_OK)
70     {
71     print STDERR $conn->errorMessage unless ($quiet);
72     $conn->exec("ROLLBACK");
73     return(-1);
74     }
75    
76     # MAP oid --> tabname, keyname, key_type
77     $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
78     " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
79     ", pg_type pgt".
80     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
81     " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
82     if ($result->resultStatus ne PGRES_TUPLES_OK)
83     {
84     print STDERR $conn->errorMessage unless ($quiet);
85     $conn->exec("ROLLBACK");
86     return(-1);
87     }
88    
89     while (@row = $result->fetchrow)
90     {
91 dpavlin 1.1 # printf "$row[0], $row[1], $row[2]\n";
92 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
93     next unless (exists $onlytables->{$row[1]});
94     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
95     }
96 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
97     }
98    
99     # Read last succeeded sync
100 dpavlin 1.3 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
101 dpavlin 1.2 " where server = $server AND syncid = (select max(syncid) from" .
102     " _RSERV_SYNC_ where server = $server AND status > 0)";
103    
104     printf "$sql\n" if $debug;
105    
106     $result = $conn->exec($sql);
107     if ($result->resultStatus ne PGRES_TUPLES_OK)
108     {
109     print STDERR $conn->errorMessage unless ($quiet);
110     $conn->exec("ROLLBACK");
111     return(-1);
112     }
113    
114     my @lastsync = $result->fetchrow;
115    
116     my $sinfo = "";
117 dpavlin 1.4 if (@lastsync && $lastsync[3] ne '') # sync info
118 dpavlin 1.2 {
119     $sinfo = "and (l.logid >= $lastsync[3]";
120     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
121     $sinfo .= ")";
122     }
123    
124     my $havedeal = 0;
125    
126     # DELETED rows
127     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
128     " where l.delete = 1 $sinfo order by l.reloid";
129    
130     printf "$sql\n" if $debug;
131    
132     $result = $conn->exec($sql);
133     if ($result->resultStatus ne PGRES_TUPLES_OK)
134     {
135     print STDERR $conn->errorMessage unless ($quiet);
136     $conn->exec("ROLLBACK");
137     return(-1);
138     }
139    
140 dpavlin 1.3 my $lastoid = '';
141 dpavlin 1.2 while (@row = $result->fetchrow)
142     {
143     next unless exists $Mtables{$row[0]};
144     next unless exists $Stables{$Mtables{$row[0]}[0]};
145    
146     if ($lastoid != $row[0])
147     {
148     if ($lastoid eq '')
149     {
150     my $syncid = GetSYNCID($conn, $outf);
151     return($syncid) if $syncid < 0;
152     $havedeal = 1;
153     }
154     else
155     {
156     printf $outf "\\.\n";
157     }
158     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
159     $lastoid = $row[0];
160     }
161     if (! defined $row[1])
162     {
163     print STDERR "NULL key\n" unless ($quiet);
164     $conn->exec("ROLLBACK");
165     return(-2);
166     }
167     printf $outf "%s\n", OutputValue($row[1]);
168     }
169     printf $outf "\\.\n" if $lastoid ne '';
170    
171     # UPDATED rows
172    
173     my ($taboid, $tabname, $tabkey);
174     foreach $taboid (keys %Mtables)
175     {
176 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
177 dpavlin 1.2 next unless exists $Stables{$tabname};
178    
179     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
180    
181     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
182     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
183     " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
184    
185 dpavlin 1.1 printf "$sql\n" if $debug;
186 dpavlin 1.2
187 dpavlin 1.1 $result = $conn->exec($sql);
188     if ($result->resultStatus ne PGRES_TUPLES_OK)
189     {
190 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
191     print STDERR $conn->errorMessage unless ($quiet);
192     $conn->exec("ROLLBACK");
193     return(-1);
194     }
195     next if $result->ntuples <= 0;
196     if (! $havedeal)
197     {
198     my $syncid = GetSYNCID($conn, $outf);
199     return($syncid) if $syncid < 0;
200     $havedeal = 1;
201 dpavlin 1.1 }
202 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
203     printf "-- UPDATE $tabname\n" if $debug;
204     while (@row = $result->fetchrow)
205 dpavlin 1.1 {
206 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
207 dpavlin 1.2 {
208     printf $outf " " if $i;
209     printf " " if $i && $debug;
210     printf $outf "%s", OutputValue($row[$i]);
211     printf "%s", OutputValue($row[$i]) if $debug;;
212     }
213     printf $outf "\n";
214     printf "\n" if $debug;
215     }
216     printf $outf "\\.\n";
217     printf "\\.\n" if $debug;;
218     }
219    
220     # INSERTED rows
221    
222     foreach $taboid (keys %Mtables)
223     {
224 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
225 dpavlin 1.2 next unless exists $Stables{$tabname};
226    
227     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
228    
229     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
230     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
231     " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}";
232    
233 dpavlin 1.1 printf "$sql\n" if $debug;
234 dpavlin 1.2
235 dpavlin 1.1 $result = $conn->exec($sql);
236     if ($result->resultStatus ne PGRES_TUPLES_OK)
237     {
238 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
239     print STDERR $conn->errorMessage unless ($quiet);
240     $conn->exec("ROLLBACK");
241     return(-1);
242     }
243     next if $result->ntuples <= 0;
244     if (! $havedeal)
245     {
246     my $syncid = GetSYNCID($conn, $outf);
247     return($syncid) if $syncid < 0;
248     $havedeal = 1;
249 dpavlin 1.1 }
250 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
251     printf "-- INSERT $tabname\n" if $debug;
252 dpavlin 1.1 while (@row = $result->fetchrow)
253     {
254 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
255 dpavlin 1.2 {
256     printf $outf " " if $i;
257     printf " " if $i && $debug;
258     printf $outf "%s", OutputValue($row[$i]);
259     printf "%s", OutputValue($row[$i]) if $debug;;
260     }
261     printf $outf "\n";
262     printf "\n" if $debug;
263     }
264     printf $outf "\\.\n";
265     printf "\\.\n" if $debug;;
266     }
267    
268    
269     unless ($havedeal)
270     {
271     $conn->exec("ROLLBACK");
272     return(0);
273     }
274    
275     # Remember this snapshot info
276     $result = $conn->exec("select _rserv_sync_($server)");
277     if ($result->resultStatus ne PGRES_TUPLES_OK)
278     {
279     printf $outf "-- ERROR\n";
280     print STDERR $conn->errorMessage unless ($quiet);
281     $conn->exec("ROLLBACK");
282     return(-1);
283     }
284    
285     $result = $conn->exec("COMMIT");
286     if ($result->resultStatus ne PGRES_COMMAND_OK)
287     {
288     printf $outf "-- ERROR\n";
289     print STDERR $conn->errorMessage unless ($quiet);
290     $conn->exec("ROLLBACK");
291     return(-1);
292     }
293     printf $outf "-- OK\n";
294     printf "-- OK\n" if $debug;
295    
296     return(1);
297    
298 dpavlin 1.1 }
299    
300     sub OutputValue
301     {
302     my ($val) = @_; # @_[0];
303    
304     return("\\N") unless defined $val;
305    
306     $val =~ s/\\/\\\\/g;
307     $val =~ s/ /\\011/g;
308     $val =~ s/\n/\\012/g;
309     $val =~ s/\'/\\047/g;
310    
311     return($val);
312     }
313    
314     # Get syncid for new snapshot
315     sub GetSYNCID
316     {
317 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
318    
319     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
320     if ($result->resultStatus ne PGRES_TUPLES_OK)
321     {
322     print STDERR $conn->errorMessage unless ($quiet);
323     $conn->exec("ROLLBACK");
324     return(-1);
325     }
326    
327     my @row = $result->fetchrow;
328    
329     printf $outf "-- SYNCID $row[0]\n";
330     printf "-- SYNCID $row[0]\n" if $debug;
331     return($row[0]);
332 dpavlin 1.1 }
333    
334    
335     sub CleanLog
336     {
337 dpavlin 1.5 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
338 dpavlin 1.2
339     my $result = $conn->exec("BEGIN");
340     if ($result->resultStatus ne PGRES_COMMAND_OK)
341     {
342     print STDERR $conn->errorMessage unless ($quiet);
343     $conn->exec("ROLLBACK");
344     return(-1);
345     }
346    
347     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
348     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
349     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
350    
351     printf "$sql\n" if $debug;
352    
353     $result = $conn->exec($sql);
354     if ($result->resultStatus ne PGRES_TUPLES_OK)
355     {
356     print STDERR $conn->errorMessage unless ($quiet);
357     return(-1);
358     }
359     my $maxid = '';
360     my %active = ();
361     while (my @row = $result->fetchrow)
362     {
363     $maxid = $row[0] if $maxid eq '';
364     last if $row[0] > $maxid;
365     my @ids = split(/[ ]+,[ ]+/, $row[1]);
366 dpavlin 1.3 foreach my $aid (@ids)
367 dpavlin 1.2 {
368     $active{$aid} = 1 unless exists $active{$aid};
369     }
370     }
371     if ($maxid eq '')
372     {
373     print STDERR "No Sync IDs\n" unless ($quiet);
374     return(0);
375     }
376     my $alist = join(',', keys %active);
377     my $sinfo = "logid < $maxid";
378     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
379 dpavlin 1.5 #if (ref($onlytables) eq 'HASH') {
380     # foreach my $onlytable (keys %{$onlytables}) {
381     # $sinfo
382     # }
383     #}
384 dpavlin 1.2 $sql = "delete from _RSERV_LOG_ where " .
385     "logtime < now() - '$howold second'::interval AND $sinfo";
386    
387     printf "$sql\n" if $debug;
388    
389     $result = $conn->exec($sql);
390     if ($result->resultStatus ne PGRES_COMMAND_OK)
391     {
392     print STDERR $conn->errorMessage unless ($quiet);
393     $conn->exec("ROLLBACK");
394     return(-1);
395     }
396     $maxid = $result->cmdTuples;
397    
398     $result = $conn->exec("COMMIT");
399     if ($result->resultStatus ne PGRES_COMMAND_OK)
400     {
401     print STDERR $conn->errorMessage unless ($quiet);
402     $conn->exec("ROLLBACK");
403     return(-1);
404     }
405    
406     return($maxid);
407     }
408 dpavlin 1.1
409 dpavlin 1.2 sub ApplySnapshot
410     {
411 dpavlin 1.5 my ($conn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
412 dpavlin 1.2
413     my $result = $conn->exec("BEGIN");
414     if ($result->resultStatus ne PGRES_COMMAND_OK)
415     {
416     print STDERR $conn->errorMessage unless ($quiet);
417     $conn->exec("ROLLBACK");
418     return(-1);
419     }
420    
421     $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
422     if ($result->resultStatus ne PGRES_COMMAND_OK)
423     {
424     print STDERR $conn->errorMessage unless ($quiet);
425     $conn->exec("ROLLBACK");
426     return(-1);
427     }
428    
429     # MAP name --> oid, keyname, keynum
430     my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
431     " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
432     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
433     " AND pga.attnum = rt.key";
434    
435     $result = $conn->exec($sql);
436     if ($result->resultStatus ne PGRES_TUPLES_OK)
437     {
438     print STDERR $conn->errorMessage unless ($quiet);
439     $conn->exec("ROLLBACK");
440     return(-1);
441     }
442     %Stables = ();
443 dpavlin 1.3 while (my @row = $result->fetchrow)
444 dpavlin 1.2 {
445     # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
446 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
447     next unless (exists $onlytables->{$row[1]});
448     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
449     }
450 dpavlin 1.2 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
451     }
452    
453     my $ok = 0;
454     my $syncid = '';
455     while(<$inpf>)
456     {
457     $_ =~ s/\n//;
458     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
459     if ($cmt ne '--')
460     {
461     printf STDERR "Invalid format\n" unless ($quiet);
462     $conn->exec("ROLLBACK");
463     return(-2);
464     }
465     if ($cmd eq 'DELETE')
466     {
467     if ($syncid eq '')
468     {
469     printf STDERR "Sync ID unspecified\n" unless ($quiet);
470 dpavlin 1.1 $conn->exec("ROLLBACK");
471 dpavlin 1.2 return(-2);
472     }
473     $result = DoDelete($conn, $inpf, $prm);
474     if ($result)
475     {
476     $conn->exec("ROLLBACK");
477     return($result);
478     }
479     }
480     elsif ($cmd eq 'INSERT')
481     {
482     if ($syncid eq '')
483     {
484     printf STDERR "Sync ID unspecified\n" unless ($quiet);
485 dpavlin 1.1 $conn->exec("ROLLBACK");
486 dpavlin 1.2 return(-2);
487     }
488     $result = DoInsert($conn, $inpf, $prm);
489     if ($result)
490     {
491     $conn->exec("ROLLBACK");
492     return($result);
493     }
494     }
495     elsif ($cmd eq 'UPDATE')
496     {
497     if ($syncid eq '')
498     {
499     printf STDERR "Sync ID unspecified\n" unless ($quiet);
500 dpavlin 1.1 $conn->exec("ROLLBACK");
501 dpavlin 1.2 return(-2);
502     }
503     $result = DoUpdate($conn, $inpf, $prm);
504     if ($result)
505     {
506     $conn->exec("ROLLBACK");
507     return($result);
508     }
509     }
510     elsif ($cmd eq 'SYNCID')
511     {
512     if ($syncid ne '')
513     {
514     printf STDERR "Second Sync ID ?!\n" unless ($quiet);
515 dpavlin 1.1 $conn->exec("ROLLBACK");
516 dpavlin 1.2 return(-2);
517     }
518     if ($prm !~ /^\d+$/)
519     {
520     printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
521 dpavlin 1.1 $conn->exec("ROLLBACK");
522 dpavlin 1.2 return(-2);
523     }
524     $syncid = $prm;
525    
526     printf STDERR "Sync ID $syncid\n" unless ($quiet);
527    
528     $result = $conn->exec("select syncid, synctime from " .
529     "_RSERV_SLAVE_SYNC_ where syncid = " .
530     "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
531     if ($result->resultStatus ne PGRES_TUPLES_OK)
532     {
533 dpavlin 1.1 print STDERR $conn->errorMessage unless ($quiet);
534     $conn->exec("ROLLBACK");
535     return(-1);
536 dpavlin 1.2 }
537     my @row = $result->fetchrow;
538     if (! defined $row[0])
539     {
540     $result = $conn->exec("insert into _RSERV_SLAVE_SYNC_ ".
541     "(syncid, synctime) values ($syncid, now())");
542     }
543     elsif ($row[0] >= $prm)
544     {
545     printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
546 dpavlin 1.1 $conn->exec("ROLLBACK");
547 dpavlin 1.2 return(0);
548     }
549     else
550     {
551     $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
552     " set syncid = $syncid, synctime = now()");
553     }
554     if ($result->resultStatus ne PGRES_COMMAND_OK)
555     {
556 dpavlin 1.1 print STDERR $conn->errorMessage unless ($quiet);
557     $conn->exec("ROLLBACK");
558     return(-1);
559 dpavlin 1.2 }
560     }
561     elsif ($cmd eq 'OK')
562     {
563     $ok = 1;
564     last;
565     }
566     elsif ($cmd eq 'ERROR')
567     {
568     printf STDERR "ERROR signaled\n" unless ($quiet);
569     $conn->exec("ROLLBACK");
570     return(-2);
571     }
572     else
573     {
574     printf STDERR "Unknown command $cmd\n" unless ($quiet);
575     $conn->exec("ROLLBACK");
576     return(-2);
577     }
578     }
579    
580     if (! $ok)
581     {
582     printf STDERR "No OK flag in input\n" unless ($quiet);
583     $conn->exec("ROLLBACK");
584     return(-2);
585     }
586    
587     $result = $conn->exec("COMMIT");
588     if ($result->resultStatus ne PGRES_COMMAND_OK)
589     {
590     print STDERR $conn->errorMessage unless ($quiet);
591     $conn->exec("ROLLBACK");
592     return(-1);
593     }
594    
595     return(1);
596 dpavlin 1.1 }
597    
598     sub DoDelete
599     {
600 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
601    
602     # only delete tables that the slave wants
603     if (! defined($Stables{$tabname})) {
604     print "Not configured to delete rows from table $tabname\n" unless $quiet;
605     while (<$inpf>) {
606 dpavlin 1.3 my $istring = $_;
607 dpavlin 1.2 $istring =~ s/\n//;
608     last if ($istring eq '\.');
609     }
610     return(0);
611     }
612 dpavlin 1.1
613 dpavlin 1.2 my $ok = 0;
614     while(<$inpf>)
615     {
616     if ($_ !~ /\n$/)
617     {
618     printf STDERR "Invalid format\n" unless ($quiet);
619     return(-2);
620     }
621     my $key = $_;
622     $key =~ s/\n//;
623     if ($key eq '\.')
624 dpavlin 1.1 {
625 dpavlin 1.2 $ok = 1;
626     last;
627 dpavlin 1.1 }
628 dpavlin 1.2
629     my $sql = "delete from \"$tabname\" where ".
630     "\"$Stables{$tabname}->[1]\" = '$key'";
631    
632     printf "$sql\n" if $debug;
633    
634     my $result = $conn->exec($sql);
635     if ($result->resultStatus ne PGRES_COMMAND_OK)
636 dpavlin 1.1 {
637 dpavlin 1.2 print STDERR $conn->errorMessage unless ($quiet);
638     return(-1);
639 dpavlin 1.1 }
640 dpavlin 1.2 }
641    
642     if (! $ok)
643     {
644     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
645     return(-2);
646     }
647    
648     return(0);
649 dpavlin 1.1 }
650    
651    
652     sub DoUpdate
653     {
654 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
655 dpavlin 1.1
656 dpavlin 1.2 # only update the tables that the slave wants
657     if (! defined($Stables{$tabname})) {
658     print "Not configured to update rows from table $tabname\n" unless $quiet;
659     while (<$inpf>) {
660 dpavlin 1.3 my $istring = $_;
661 dpavlin 1.2 $istring =~ s/\n//;
662     last if ($istring eq '\.');
663     }
664     return(0);
665     }
666 dpavlin 1.1
667 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
668    
669     my @CopyBuf = ();
670     my $CBufLen = 0;
671     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
672    
673     my $sql = "select attnum, attname from pg_attribute" .
674     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
675    
676     my $result = $conn->exec($sql);
677     if ($result->resultStatus ne PGRES_TUPLES_OK)
678     {
679     print STDERR $conn->errorMessage unless ($quiet);
680     return(-1);
681     }
682    
683     my @anames = ();
684 dpavlin 1.3 while (my @row = $result->fetchrow)
685 dpavlin 1.2 {
686     $anames[$row[0]] = $row[1];
687     }
688    
689     my $istring;
690     my $ok = 0;
691     while(<$inpf>)
692     {
693     if ($_ !~ /\n$/)
694     {
695     printf STDERR "Invalid format\n" unless ($quiet);
696     return(-2);
697     }
698     $istring = $_;
699     $istring =~ s/\n//;
700     if ($istring eq '\.')
701     {
702     $ok = 1;
703     last;
704     }
705     my @vals = split(/ /, $istring);
706     if ($oidkey)
707     {
708     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
709     {
710     printf STDERR "Invalid OID\n" unless ($quiet);
711     return(-2);
712     }
713     $oidkey = $vals[0];
714     }
715     else
716 dpavlin 1.1 {
717 dpavlin 1.2 unshift @vals, '';
718     }
719    
720     $sql = "update \"$tabname\" set ";
721     my $ocnt = 0;
722     for (my $i = 1; $i <= $#anames; $i++)
723     {
724     if ($vals[$i] eq '\N')
725     {
726     if ($i == $Stables{$tabname}->[2])
727     {
728     printf STDERR "NULL key\n" unless ($quiet);
729     return(-2);
730     }
731     $vals[$i] = 'null';
732     }
733     else
734     {
735     $vals[$i] = "'" . $vals[$i] . "'";
736     next if $i == $Stables{$tabname}->[2];
737     }
738     $ocnt++;
739     $sql .= ', ' if $ocnt > 1;
740     $sql .= "\"$anames[$i]\" = $vals[$i]";
741 dpavlin 1.1 }
742 dpavlin 1.2 if ($oidkey)
743 dpavlin 1.1 {
744 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
745 dpavlin 1.1 }
746 dpavlin 1.2 else
747 dpavlin 1.1 {
748 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
749     $vals[$Stables{$tabname}->[2]];
750 dpavlin 1.1 }
751 dpavlin 1.2
752     printf "$sql\n" if $debug;
753    
754     $result = $conn->exec($sql);
755    
756     if ($result->resultStatus ne PGRES_COMMAND_OK)
757 dpavlin 1.1 {
758 dpavlin 1.2 print STDERR $conn->errorMessage unless ($quiet);
759     return(-1);
760 dpavlin 1.1 }
761 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
762    
763     if ($result->cmdTuples > 1)
764     {
765     printf STDERR "Duplicate keys\n" unless ($quiet);
766     return(-2);
767     }
768    
769     # no key - copy
770     push @CopyBuf, "$istring\n";
771     $CBufLen += length($istring);
772    
773     if ($CBufLen >= $CBufMax)
774 dpavlin 1.1 {
775 dpavlin 1.2 $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
776     return($result) if $result;
777     @CopyBuf = ();
778     $CBufLen = 0;
779     }
780     }
781    
782     if (! $ok)
783     {
784     printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
785     return(-2);
786     }
787    
788     if ($CBufLen)
789     {
790     print "@CopyBuf\n" if $debug;
791     $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
792     return($result) if $result;
793     }
794 dpavlin 1.1
795 dpavlin 1.2 return(0);
796 dpavlin 1.1 }
797    
798 dpavlin 1.2 sub DoInsert
799 dpavlin 1.1 {
800 dpavlin 1.2 my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
801 dpavlin 1.1
802 dpavlin 1.2 # only insert rows into tables that the slave wants
803     if (! defined($Stables{$tabname})) {
804     print "Not configured to insert rows from table $tabname\n" unless $quiet;
805     while (<$inpf>) {
806 dpavlin 1.3 my $istring = $_;
807 dpavlin 1.2 $istring =~ s/\n//;
808     last if ($istring eq '\.');
809 dpavlin 1.1 }
810 dpavlin 1.2 return(0);
811     }
812 dpavlin 1.1
813 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
814    
815     my @CopyBuf = ();
816     my $CBufLen = 0;
817     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
818    
819     my $istring;
820     my $ok = 0;
821     while(<$inpf>)
822     {
823     if ($_ !~ /\n$/)
824     {
825     printf STDERR "Invalid format\n" unless ($quiet);
826     return(-2);
827     }
828     $istring = $_;
829     $istring =~ s/\n//;
830     if ($istring eq '\.')
831     {
832     $ok = 1;
833     last;
834     }
835    
836     # no key - copy
837     push @CopyBuf, "$istring\n";
838     $CBufLen += length($istring);
839    
840     if ($CBufLen >= $CBufMax)
841 dpavlin 1.1 {
842 dpavlin 1.3 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
843 dpavlin 1.2 return($result) if $result;
844     @CopyBuf = ();
845     $CBufLen = 0;
846     }
847     }
848    
849     if (! $ok)
850     {
851     printf STDERR "No end of input in INSERT section\n" unless ($quiet);
852     return(-2);
853     }
854    
855     if ($CBufLen)
856     {
857     print "@CopyBuf\n" if $debug;
858 dpavlin 1.3 my $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
859 dpavlin 1.2 return($result) if $result;
860     }
861    
862     return(0);
863     }
864 dpavlin 1.1
865    
866 dpavlin 1.2 sub DoCopy
867     {
868     my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
869    
870     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
871     "FROM STDIN";
872     my $result = $conn->exec($sql);
873     if ($result->resultStatus ne PGRES_COPY_IN)
874     {
875     print STDERR $conn->errorMessage unless ($quiet);
876     return(-1);
877     }
878    
879 dpavlin 1.3 foreach my $str (@{$CBuf})
880 dpavlin 1.2 {
881     $conn->putline($str);
882     }
883    
884     $conn->putline("\\.\n");
885    
886     if ($conn->endcopy)
887     {
888     print STDERR $conn->errorMessage unless ($quiet);
889     return(-1);
890     }
891    
892     return(0);
893 dpavlin 1.1 }
894    
895    
896     #
897     # Returns last SyncID applied on Slave
898     #
899     sub GetSyncID
900     {
901 dpavlin 1.2 my ($conn) = @_; # (@_[0]);
902    
903     my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
904     if ($result->resultStatus ne PGRES_TUPLES_OK)
905     {
906     print STDERR $conn->errorMessage unless ($quiet);
907     return(-1);
908     }
909     my @row = $result->fetchrow;
910     return(undef) unless defined $row[0]; # null
911     return($row[0]);
912 dpavlin 1.1 }
913    
914     #
915     # Updates _RSERV_SYNC_ on Master with Slave SyncID
916     #
917     sub SyncSyncID
918     {
919 dpavlin 1.2 my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
920    
921     my $result = $conn->exec("BEGIN");
922     if ($result->resultStatus ne PGRES_COMMAND_OK)
923     {
924     print STDERR $conn->errorMessage unless ($quiet);
925     $conn->exec("ROLLBACK");
926     return(-1);
927     }
928    
929     $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
930     " where server = $server AND syncid = $syncid" .
931     " for update");
932     if ($result->resultStatus ne PGRES_TUPLES_OK)
933     {
934     print STDERR $conn->errorMessage unless ($quiet);
935     $conn->exec("ROLLBACK");
936     return(-1);
937     }
938     my @row = $result->fetchrow;
939     if (! defined $row[0])
940     {
941     printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
942     $conn->exec("ROLLBACK");
943     return(0);
944     }
945     if ($row[1] > 0)
946     {
947     printf STDERR "SyncID $syncid for server ".
948     "$server already updated\n" unless ($quiet);
949     $conn->exec("ROLLBACK");
950     return(0);
951     }
952     $result = $conn->exec("update _RSERV_SYNC_" .
953     " set synctime = now(), status = 1" .
954     " where server = $server AND syncid = $syncid");
955     if ($result->resultStatus ne PGRES_COMMAND_OK)
956     {
957     print STDERR $conn->errorMessage unless ($quiet);
958     $conn->exec("ROLLBACK");
959     return(-1);
960     }
961     $result = $conn->exec("delete from _RSERV_SYNC_" .
962     " where server = $server AND syncid < $syncid");
963     if ($result->resultStatus ne PGRES_COMMAND_OK)
964     {
965     print STDERR $conn->errorMessage unless ($quiet);
966     $conn->exec("ROLLBACK");
967     return(-1);
968     }
969    
970     $result = $conn->exec("COMMIT");
971     if ($result->resultStatus ne PGRES_COMMAND_OK)
972     {
973     print STDERR $conn->errorMessage unless ($quiet);
974     $conn->exec("ROLLBACK");
975     return(-1);
976     }
977    
978     return(1);
979 dpavlin 1.1 }
980    
981     1;

  ViewVC Help
Powered by ViewVC 1.1.26