/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.18 - (hide annotations)
Mon Nov 3 21:30:39 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
CVS Tags: HEAD
Changes since 1.17: +27 -73 lines
added _rserv_xid_ function to return current transaction xid and removed
all perl cludgery about keys (as well as KEYS directive from snapshots)

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.11 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 dpavlin 1.17 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11 dpavlin 1.11 $debug $quiet $verbose
12     );
13 dpavlin 1.1 @EXPORT_OK = qw();
14 dpavlin 1.3 use strict;
15 dpavlin 1.1 use Pg;
16    
17 dpavlin 1.11 my $debug = 0;
18     my $quiet = 1;
19     my $verbose = 0;
20 dpavlin 1.1
21 dpavlin 1.12 $debug = 1;
22     $quiet = 0;
23     $verbose = 1;
24    
25 dpavlin 1.1 my %Mtables = ();
26     my %Stables = ();
27    
28 dpavlin 1.11 sub GetServerId
29 dpavlin 1.1 {
30 dpavlin 1.11 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32     print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33 dpavlin 1.1
34 dpavlin 1.6 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 dpavlin 1.11 " host='$Host' AND dbase='$DB'");
36 dpavlin 1.3
37 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
38     {
39 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
40 dpavlin 1.2 return(-1);
41     }
42    
43 dpavlin 1.3 if ($result->cmdTuples && $result->cmdTuples > 1)
44 dpavlin 1.2 {
45 dpavlin 1.11 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 dpavlin 1.2 return(-2);
47     }
48    
49     my @row = $result->fetchrow;
50 dpavlin 1.10
51 dpavlin 1.17 print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52 dpavlin 1.10
53 dpavlin 1.2 return $row[0];
54     }
55 dpavlin 1.1
56 dpavlin 1.2 sub PrepareSnapshot
57     {
58 dpavlin 1.17 my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59 dpavlin 1.9
60     if ($mserver == $sserver) {
61 dpavlin 1.10 print STDERR "master and slave numbers are same [$mserver] !\n";
62 dpavlin 1.9 return(-1);
63     }
64    
65 dpavlin 1.17 print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66 dpavlin 1.1
67 dpavlin 1.16 # dump master server ID into snapshot file (to prevent replication
68     # of colums from master back to slave)
69     print $outf "-- SERVER $mserver\n";
70    
71 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
72 dpavlin 1.15 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73     return (-1) if ($result == -1);
74    
75 dpavlin 1.2 my @row;
76     while (@row = $result->fetchrow) {
77     $Stables{$row[0]} = 1;
78     }
79    
80 dpavlin 1.17 print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81 dpavlin 1.14
82 dpavlin 1.15 Exec($mconn,"BEGIN");
83     Exec($mconn,"set transaction isolation level serializable");
84 dpavlin 1.2
85     # MAP oid --> tabname, keyname, key_type
86 dpavlin 1.15 my $sql = qq{
87     select pgc.oid, pgc.relname, pga.attname, pgt.typname
88     from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89     pg_type pgt
90     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91     AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92     };
93     $result = Exec($mconn,$sql);
94    
95 dpavlin 1.2 while (@row = $result->fetchrow)
96     {
97 dpavlin 1.16 printf "$row[0], $row[1], $row[2]\n" if ($debug);
98 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
99     next unless (exists $onlytables->{$row[1]});
100     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101     }
102 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103     }
104 dpavlin 1.12
105 dpavlin 1.17 print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106 dpavlin 1.15 if (! %Mtables) {
107     print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108     RollbackAndQuit($mconn);
109     }
110 dpavlin 1.12
111 dpavlin 1.2 # Read last succeeded sync
112 dpavlin 1.15 $sql = qq{
113     select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114     where server = $sserver AND syncid =
115     (select max(syncid) from _RSERV_SYNC_
116     where server = $sserver AND status > 0)
117     };
118 dpavlin 1.2
119 dpavlin 1.15 $result = Exec($mconn,$sql);
120 dpavlin 1.16
121 dpavlin 1.2 my @lastsync = $result->fetchrow;
122 dpavlin 1.17 print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123 dpavlin 1.9
124     # exclude data which originated from master server
125     my $sel_server = " and l.server = $mserver ";
126    
127 dpavlin 1.2 my $sinfo = "";
128 dpavlin 1.4 if (@lastsync && $lastsync[3] ne '') # sync info
129 dpavlin 1.2 {
130     $sinfo = "and (l.logid >= $lastsync[3]";
131     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132     $sinfo .= ")";
133     }
134 dpavlin 1.16
135 dpavlin 1.2 my $havedeal = 0;
136    
137     # DELETED rows
138     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
139 dpavlin 1.9 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
140 dpavlin 1.2
141 dpavlin 1.9 printf "DELETED: $sql\n" if $debug;
142 dpavlin 1.2
143 dpavlin 1.6 $result = $mconn->exec($sql);
144 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
145 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
146     $mconn->exec("ROLLBACK");
147 dpavlin 1.2 return(-1);
148     }
149    
150 dpavlin 1.7 my $lastoid = -1;
151 dpavlin 1.17 while (@row = $result->fetchrow) {
152 dpavlin 1.2 next unless exists $Mtables{$row[0]};
153     next unless exists $Stables{$Mtables{$row[0]}[0]};
154    
155 dpavlin 1.17 if ($lastoid != $row[0]) {
156     if ($lastoid == -1) {
157 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
158 dpavlin 1.2 return($syncid) if $syncid < 0;
159     $havedeal = 1;
160 dpavlin 1.17 } else {
161 dpavlin 1.2 printf $outf "\\.\n";
162     }
163     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
164     $lastoid = $row[0];
165     }
166 dpavlin 1.17 if (! defined $row[1]) {
167 dpavlin 1.2 print STDERR "NULL key\n" unless ($quiet);
168 dpavlin 1.6 $mconn->exec("ROLLBACK");
169 dpavlin 1.2 return(-2);
170     }
171     printf $outf "%s\n", OutputValue($row[1]);
172     }
173 dpavlin 1.7 printf $outf "\\.\n" if ($lastoid != -1);
174 dpavlin 1.2
175     # UPDATED rows
176    
177     my ($taboid, $tabname, $tabkey);
178     foreach $taboid (keys %Mtables)
179     {
180 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
181 dpavlin 1.2 next unless exists $Stables{$tabname};
182    
183     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
184    
185 dpavlin 1.18 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
186 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
187 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
188     $sel_server;
189 dpavlin 1.2
190 dpavlin 1.9 printf "UPDATED: $sql\n" if $debug;
191 dpavlin 1.2
192 dpavlin 1.6 $result = $mconn->exec($sql);
193 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
194     {
195 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
196 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
197     $mconn->exec("ROLLBACK");
198 dpavlin 1.2 return(-1);
199     }
200     next if $result->ntuples <= 0;
201     if (! $havedeal)
202     {
203 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
204 dpavlin 1.2 return($syncid) if $syncid < 0;
205     $havedeal = 1;
206 dpavlin 1.1 }
207 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
208     printf "-- UPDATE $tabname\n" if $debug;
209     while (@row = $result->fetchrow)
210 dpavlin 1.1 {
211 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
212 dpavlin 1.2 {
213     printf $outf " " if $i;
214     printf " " if $i && $debug;
215     printf $outf "%s", OutputValue($row[$i]);
216     printf "%s", OutputValue($row[$i]) if $debug;;
217     }
218     printf $outf "\n";
219     printf "\n" if $debug;
220     }
221     printf $outf "\\.\n";
222     printf "\\.\n" if $debug;;
223     }
224    
225     # INSERTED rows
226    
227     foreach $taboid (keys %Mtables)
228     {
229 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
230 dpavlin 1.2 next unless exists $Stables{$tabname};
231    
232     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
233    
234 dpavlin 1.18 $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
235 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
236 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
237     $sel_server;
238 dpavlin 1.2
239 dpavlin 1.9 printf "INSERTED: $sql\n" if $debug;
240 dpavlin 1.2
241 dpavlin 1.6 $result = $mconn->exec($sql);
242 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
243     {
244 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
245 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
246     $mconn->exec("ROLLBACK");
247 dpavlin 1.2 return(-1);
248     }
249     next if $result->ntuples <= 0;
250     if (! $havedeal)
251     {
252 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
253 dpavlin 1.2 return($syncid) if $syncid < 0;
254     $havedeal = 1;
255 dpavlin 1.1 }
256 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
257     printf "-- INSERT $tabname\n" if $debug;
258 dpavlin 1.1 while (@row = $result->fetchrow)
259     {
260 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
261 dpavlin 1.2 {
262     printf $outf " " if $i;
263     printf " " if $i && $debug;
264     printf $outf "%s", OutputValue($row[$i]);
265 dpavlin 1.17 printf "%s", OutputValue($row[$i]) if $debug;
266 dpavlin 1.2 }
267     printf $outf "\n";
268     printf "\n" if $debug;
269     }
270     printf $outf "\\.\n";
271     printf "\\.\n" if $debug;;
272     }
273    
274    
275     unless ($havedeal)
276     {
277 dpavlin 1.17 print STDERR "hon't have deal, rollback...\n" if ($debug);
278 dpavlin 1.6 $mconn->exec("ROLLBACK");
279 dpavlin 1.2 return(0);
280     }
281    
282     # Remember this snapshot info
283 dpavlin 1.8 $result = $mconn->exec("select _rserv_sync_($sserver)");
284 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
285     {
286     printf $outf "-- ERROR\n";
287 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
288     $mconn->exec("ROLLBACK");
289 dpavlin 1.2 return(-1);
290     }
291    
292 dpavlin 1.6 $result = $mconn->exec("COMMIT");
293 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
294     {
295     printf $outf "-- ERROR\n";
296 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
297     $mconn->exec("ROLLBACK");
298 dpavlin 1.2 return(-1);
299     }
300 dpavlin 1.16
301 dpavlin 1.2 printf $outf "-- OK\n";
302     printf "-- OK\n" if $debug;
303    
304     return(1);
305    
306 dpavlin 1.1 }
307    
308     sub OutputValue
309     {
310     my ($val) = @_; # @_[0];
311    
312     return("\\N") unless defined $val;
313    
314     $val =~ s/\\/\\\\/g;
315     $val =~ s/ /\\011/g;
316     $val =~ s/\n/\\012/g;
317     $val =~ s/\'/\\047/g;
318    
319     return($val);
320     }
321    
322     # Get syncid for new snapshot
323     sub GetSYNCID
324     {
325 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
326    
327     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
328     if ($result->resultStatus ne PGRES_TUPLES_OK)
329     {
330     print STDERR $conn->errorMessage unless ($quiet);
331     $conn->exec("ROLLBACK");
332     return(-1);
333     }
334 dpavlin 1.16
335 dpavlin 1.2 my @row = $result->fetchrow;
336    
337     printf $outf "-- SYNCID $row[0]\n";
338     printf "-- SYNCID $row[0]\n" if $debug;
339     return($row[0]);
340 dpavlin 1.1 }
341    
342    
343     sub CleanLog
344     {
345 dpavlin 1.5 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
346 dpavlin 1.2
347     my $result = $conn->exec("BEGIN");
348     if ($result->resultStatus ne PGRES_COMMAND_OK)
349     {
350     print STDERR $conn->errorMessage unless ($quiet);
351     $conn->exec("ROLLBACK");
352     return(-1);
353     }
354    
355     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
356     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
357     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
358    
359     printf "$sql\n" if $debug;
360    
361     $result = $conn->exec($sql);
362     if ($result->resultStatus ne PGRES_TUPLES_OK)
363     {
364     print STDERR $conn->errorMessage unless ($quiet);
365     return(-1);
366     }
367     my $maxid = '';
368     my %active = ();
369     while (my @row = $result->fetchrow)
370     {
371     $maxid = $row[0] if $maxid eq '';
372     last if $row[0] > $maxid;
373     my @ids = split(/[ ]+,[ ]+/, $row[1]);
374 dpavlin 1.3 foreach my $aid (@ids)
375 dpavlin 1.2 {
376     $active{$aid} = 1 unless exists $active{$aid};
377     }
378     }
379     if ($maxid eq '')
380     {
381     print STDERR "No Sync IDs\n" unless ($quiet);
382     return(0);
383     }
384     my $alist = join(',', keys %active);
385     my $sinfo = "logid < $maxid";
386     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
387 dpavlin 1.5 #if (ref($onlytables) eq 'HASH') {
388     # foreach my $onlytable (keys %{$onlytables}) {
389     # $sinfo
390     # }
391     #}
392 dpavlin 1.2 $sql = "delete from _RSERV_LOG_ where " .
393     "logtime < now() - '$howold second'::interval AND $sinfo";
394    
395     printf "$sql\n" if $debug;
396    
397     $result = $conn->exec($sql);
398     if ($result->resultStatus ne PGRES_COMMAND_OK)
399     {
400     print STDERR $conn->errorMessage unless ($quiet);
401     $conn->exec("ROLLBACK");
402     return(-1);
403     }
404     $maxid = $result->cmdTuples;
405    
406     $result = $conn->exec("COMMIT");
407     if ($result->resultStatus ne PGRES_COMMAND_OK)
408     {
409     print STDERR $conn->errorMessage unless ($quiet);
410     $conn->exec("ROLLBACK");
411     return(-1);
412     }
413    
414     return($maxid);
415     }
416 dpavlin 1.1
417 dpavlin 1.2 sub ApplySnapshot
418     {
419 dpavlin 1.17 my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
420 dpavlin 1.16
421     my $serverId;
422    
423 dpavlin 1.6 my $result = $sconn->exec("BEGIN");
424 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
425 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
426     $sconn->exec("ROLLBACK");
427 dpavlin 1.2 return(-1);
428     }
429    
430 dpavlin 1.6 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
431 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
432 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
433     $sconn->exec("ROLLBACK");
434 dpavlin 1.2 return(-1);
435     }
436    
437     # MAP name --> oid, keyname, keynum
438 dpavlin 1.16 my $sql = qq{
439     select pgc.oid, pgc.relname, pga.attname, rt.key
440     from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
441     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
442     AND pga.attnum = rt.key
443     };
444 dpavlin 1.2
445 dpavlin 1.6 $result = $sconn->exec($sql);
446 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
447 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
448     $sconn->exec("ROLLBACK");
449 dpavlin 1.2 return(-1);
450     }
451     %Stables = ();
452 dpavlin 1.16 while (my @row = $result->fetchrow) {
453 dpavlin 1.2 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
454 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
455     next unless (exists $onlytables->{$row[1]});
456     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
457     }
458 dpavlin 1.2 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
459     }
460    
461 dpavlin 1.12 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
462    
463 dpavlin 1.2 my $ok = 0;
464 dpavlin 1.9 my $syncid = -1;
465 dpavlin 1.16 while(<$inpf>) {
466 dpavlin 1.2 $_ =~ s/\n//;
467     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
468 dpavlin 1.11 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
469 dpavlin 1.16 if ($cmt ne '--') {
470 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
471 dpavlin 1.6 $sconn->exec("ROLLBACK");
472 dpavlin 1.2 return(-2);
473     }
474 dpavlin 1.16 if ($cmd eq 'DELETE') {
475     if ($syncid == -1) {
476 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
477 dpavlin 1.6 $sconn->exec("ROLLBACK");
478 dpavlin 1.2 return(-2);
479     }
480 dpavlin 1.6 $result = DoDelete($sconn, $inpf, $prm);
481 dpavlin 1.16 if ($result) {
482 dpavlin 1.6 $sconn->exec("ROLLBACK");
483 dpavlin 1.2 return($result);
484     }
485 dpavlin 1.16 } elsif ($cmd eq 'INSERT') {
486     if ($syncid == -1) {
487 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
488 dpavlin 1.6 $sconn->exec("ROLLBACK");
489 dpavlin 1.2 return(-2);
490     }
491 dpavlin 1.6 $result = DoInsert($sconn, $inpf, $prm);
492 dpavlin 1.16 if ($result) {
493 dpavlin 1.6 $sconn->exec("ROLLBACK");
494 dpavlin 1.2 return($result);
495     }
496 dpavlin 1.16 } elsif ($cmd eq 'UPDATE') {
497     if ($syncid == -1) {
498 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
499 dpavlin 1.6 $sconn->exec("ROLLBACK");
500 dpavlin 1.2 return(-2);
501     }
502 dpavlin 1.6 $result = DoUpdate($sconn, $inpf, $prm);
503 dpavlin 1.16 if ($result) {
504 dpavlin 1.6 $sconn->exec("ROLLBACK");
505 dpavlin 1.2 return($result);
506     }
507 dpavlin 1.16 } elsif ($cmd eq 'SYNCID') {
508     if ($syncid != -1) {
509 dpavlin 1.2 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
510 dpavlin 1.6 $sconn->exec("ROLLBACK");
511 dpavlin 1.2 return(-2);
512     }
513 dpavlin 1.16 if ($prm !~ /^\d+$/) {
514 dpavlin 1.2 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
515 dpavlin 1.6 $sconn->exec("ROLLBACK");
516 dpavlin 1.2 return(-2);
517     }
518     $syncid = $prm;
519    
520     printf STDERR "Sync ID $syncid\n" unless ($quiet);
521    
522 dpavlin 1.17 $result = $sconn->exec(qq{
523     select syncid, synctime
524     from _RSERV_SLAVE_SYNC_
525     where syncid =
526     (select max(syncid) from _RSERV_SLAVE_SYNC_)
527     });
528 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
529 dpavlin 1.17 print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
530 dpavlin 1.6 $sconn->exec("ROLLBACK");
531 dpavlin 1.1 return(-1);
532 dpavlin 1.2 }
533 dpavlin 1.17
534 dpavlin 1.2 my @row = $result->fetchrow;
535 dpavlin 1.17 print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
536 dpavlin 1.16 if (! defined $row[0]) {
537 dpavlin 1.17 $result = Exec($sconn,qq{
538     insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
539     values ($syncid, now())
540     });
541 dpavlin 1.16 } elsif ($row[0] >= $prm) {
542 dpavlin 1.2 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
543 dpavlin 1.6 $sconn->exec("ROLLBACK");
544 dpavlin 1.2 return(0);
545 dpavlin 1.16 } else {
546 dpavlin 1.17 $result = Exec($sconn,qq{
547     update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
548     });
549 dpavlin 1.2 }
550 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
551 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
552     $sconn->exec("ROLLBACK");
553 dpavlin 1.1 return(-1);
554 dpavlin 1.2 }
555 dpavlin 1.16 } elsif ($cmd eq 'OK') {
556 dpavlin 1.2 $ok = 1;
557 dpavlin 1.18 if ($multimaster) {
558     # now, update server in _rserv_log_ based on transaction xid
559     ExecFatch($sconn,"select count(*) from _rserv_log_");
560     ExecDebug($sconn,"select * from _rserv_log_");
561     my $keys_sql = qq{
562     update _rserv_log_ set server=$serverId
563     where logid = (select _rserv_xid_())
564     };
565    
566     Exec($sconn,$keys_sql);
567     }
568 dpavlin 1.2 last;
569 dpavlin 1.16 } elsif ($cmd eq 'ERROR') {
570 dpavlin 1.2 printf STDERR "ERROR signaled\n" unless ($quiet);
571 dpavlin 1.6 $sconn->exec("ROLLBACK");
572 dpavlin 1.2 return(-2);
573 dpavlin 1.16 } elsif ($cmd eq 'SERVER') {
574     if ($prm !~ /^\d+$/) {
575     printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
576     $sconn->exec("ROLLBACK");
577     return(-2);
578     }
579     $serverId = $prm;
580     print STDERR "Server ID $serverId\n" unless ($quiet);
581     } else {
582 dpavlin 1.2 printf STDERR "Unknown command $cmd\n" unless ($quiet);
583 dpavlin 1.6 $sconn->exec("ROLLBACK");
584 dpavlin 1.2 return(-2);
585     }
586     }
587    
588 dpavlin 1.17 if (! $ok) {
589 dpavlin 1.2 printf STDERR "No OK flag in input\n" unless ($quiet);
590 dpavlin 1.6 $sconn->exec("ROLLBACK");
591 dpavlin 1.2 return(-2);
592     }
593    
594 dpavlin 1.6 $result = $sconn->exec("COMMIT");
595 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
596 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
597     $sconn->exec("ROLLBACK");
598 dpavlin 1.2 return(-1);
599     }
600    
601     return(1);
602 dpavlin 1.1 }
603    
604     sub DoDelete
605     {
606 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
607 dpavlin 1.2
608     # only delete tables that the slave wants
609     if (! defined($Stables{$tabname})) {
610 dpavlin 1.17 print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
611 dpavlin 1.2 while (<$inpf>) {
612 dpavlin 1.3 my $istring = $_;
613 dpavlin 1.2 $istring =~ s/\n//;
614     last if ($istring eq '\.');
615     }
616     return(0);
617     }
618 dpavlin 1.1
619 dpavlin 1.2 my $ok = 0;
620     while(<$inpf>)
621     {
622     if ($_ !~ /\n$/)
623     {
624     printf STDERR "Invalid format\n" unless ($quiet);
625     return(-2);
626     }
627     my $key = $_;
628     $key =~ s/\n//;
629     if ($key eq '\.')
630 dpavlin 1.1 {
631 dpavlin 1.2 $ok = 1;
632     last;
633 dpavlin 1.1 }
634 dpavlin 1.2
635     my $sql = "delete from \"$tabname\" where ".
636     "\"$Stables{$tabname}->[1]\" = '$key'";
637    
638     printf "$sql\n" if $debug;
639    
640 dpavlin 1.6 my $result = $sconn->exec($sql);
641 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
642 dpavlin 1.1 {
643 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
644 dpavlin 1.2 return(-1);
645 dpavlin 1.1 }
646 dpavlin 1.2 }
647    
648     if (! $ok)
649     {
650     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
651     return(-2);
652     }
653    
654     return(0);
655 dpavlin 1.1 }
656    
657    
658     sub DoUpdate
659     {
660 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
661 dpavlin 1.1
662 dpavlin 1.2 # only update the tables that the slave wants
663     if (! defined($Stables{$tabname})) {
664 dpavlin 1.17 print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
665 dpavlin 1.2 while (<$inpf>) {
666 dpavlin 1.3 my $istring = $_;
667 dpavlin 1.2 $istring =~ s/\n//;
668     last if ($istring eq '\.');
669     }
670     return(0);
671     }
672 dpavlin 1.1
673 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
674    
675     my @CopyBuf = ();
676     my $CBufLen = 0;
677     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
678    
679     my $sql = "select attnum, attname from pg_attribute" .
680     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
681    
682 dpavlin 1.6 my $result = $sconn->exec($sql);
683 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
684     {
685 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
686 dpavlin 1.2 return(-1);
687     }
688    
689     my @anames = ();
690 dpavlin 1.17 while (my @row = $result->fetchrow) {
691 dpavlin 1.2 $anames[$row[0]] = $row[1];
692     }
693    
694     my $istring;
695     my $ok = 0;
696 dpavlin 1.17 while(<$inpf>) {
697     if ($_ !~ /\n$/) {
698 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
699     return(-2);
700     }
701     $istring = $_;
702     $istring =~ s/\n//;
703 dpavlin 1.17 if ($istring eq '\.') {
704 dpavlin 1.2 $ok = 1;
705     last;
706     }
707     my @vals = split(/ /, $istring);
708 dpavlin 1.17 if ($oidkey) {
709     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
710 dpavlin 1.2 printf STDERR "Invalid OID\n" unless ($quiet);
711     return(-2);
712     }
713     $oidkey = $vals[0];
714 dpavlin 1.17 } else {
715 dpavlin 1.2 unshift @vals, '';
716     }
717    
718     $sql = "update \"$tabname\" set ";
719     my $ocnt = 0;
720 dpavlin 1.17 for (my $i = 1; $i <= $#anames; $i++) {
721     if ($vals[$i] eq '\N') {
722     if ($i == $Stables{$tabname}->[2]) {
723 dpavlin 1.2 printf STDERR "NULL key\n" unless ($quiet);
724     return(-2);
725     }
726     $vals[$i] = 'null';
727 dpavlin 1.17 } else {
728 dpavlin 1.2 $vals[$i] = "'" . $vals[$i] . "'";
729     next if $i == $Stables{$tabname}->[2];
730     }
731     $ocnt++;
732     $sql .= ', ' if $ocnt > 1;
733     $sql .= "\"$anames[$i]\" = $vals[$i]";
734 dpavlin 1.17 } if ($oidkey) {
735 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
736 dpavlin 1.17 } else {
737 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
738     $vals[$Stables{$tabname}->[2]];
739 dpavlin 1.1 }
740 dpavlin 1.2
741     printf "$sql\n" if $debug;
742    
743 dpavlin 1.6 $result = $sconn->exec($sql);
744 dpavlin 1.2
745 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
746 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
747 dpavlin 1.2 return(-1);
748 dpavlin 1.1 }
749 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
750    
751 dpavlin 1.17 if ($result->cmdTuples > 1) {
752 dpavlin 1.2 printf STDERR "Duplicate keys\n" unless ($quiet);
753     return(-2);
754     }
755    
756     # no key - copy
757     push @CopyBuf, "$istring\n";
758     $CBufLen += length($istring);
759    
760 dpavlin 1.17 if ($CBufLen >= $CBufMax) {
761 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
762 dpavlin 1.2 return($result) if $result;
763     @CopyBuf = ();
764     $CBufLen = 0;
765     }
766     }
767    
768 dpavlin 1.17 if (! $ok) {
769 dpavlin 1.2 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
770     return(-2);
771     }
772    
773 dpavlin 1.17 if ($CBufLen) {
774     print STDERR "@CopyBuf\n" if $debug;
775 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
776 dpavlin 1.2 return($result) if $result;
777     }
778 dpavlin 1.1
779 dpavlin 1.2 return(0);
780 dpavlin 1.1 }
781    
782 dpavlin 1.2 sub DoInsert
783 dpavlin 1.1 {
784 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
785 dpavlin 1.1
786 dpavlin 1.2 # only insert rows into tables that the slave wants
787     if (! defined($Stables{$tabname})) {
788 dpavlin 1.17 print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
789 dpavlin 1.2 while (<$inpf>) {
790 dpavlin 1.3 my $istring = $_;
791 dpavlin 1.2 $istring =~ s/\n//;
792     last if ($istring eq '\.');
793 dpavlin 1.1 }
794 dpavlin 1.2 return(0);
795     }
796 dpavlin 1.1
797 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
798    
799     my @CopyBuf = ();
800     my $CBufLen = 0;
801     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
802    
803     my $istring;
804     my $ok = 0;
805 dpavlin 1.17 while(<$inpf>) {
806     if ($_ !~ /\n$/) {
807 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
808     return(-2);
809     }
810     $istring = $_;
811     $istring =~ s/\n//;
812 dpavlin 1.17 if ($istring eq '\.') {
813 dpavlin 1.2 $ok = 1;
814     last;
815     }
816    
817     # no key - copy
818     push @CopyBuf, "$istring\n";
819     $CBufLen += length($istring);
820    
821 dpavlin 1.17 if ($CBufLen >= $CBufMax) {
822 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
823 dpavlin 1.2 return($result) if $result;
824     @CopyBuf = ();
825     $CBufLen = 0;
826     }
827     }
828    
829 dpavlin 1.17 if (! $ok) {
830 dpavlin 1.2 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
831     return(-2);
832     }
833    
834 dpavlin 1.17 if ($CBufLen) {
835     print STDERR "@CopyBuf\n" if $debug;
836 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
837 dpavlin 1.2 return($result) if $result;
838     }
839    
840     return(0);
841     }
842 dpavlin 1.1
843    
844 dpavlin 1.17 sub DoCopy {
845 dpavlin 1.6 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
846 dpavlin 1.2
847     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
848     "FROM STDIN";
849 dpavlin 1.6 my $result = $sconn->exec($sql);
850 dpavlin 1.17 if ($result->resultStatus ne PGRES_COPY_IN) {
851 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
852 dpavlin 1.2 return(-1);
853     }
854    
855 dpavlin 1.17 foreach my $str (@{$CBuf}) {
856 dpavlin 1.6 $sconn->putline($str);
857 dpavlin 1.2 }
858    
859 dpavlin 1.6 $sconn->putline("\\.\n");
860 dpavlin 1.2
861 dpavlin 1.17 if ($sconn->endcopy) {
862 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
863 dpavlin 1.2 return(-1);
864     }
865    
866     return(0);
867 dpavlin 1.1 }
868    
869    
870     #
871     # Returns last SyncID applied on Slave
872     #
873 dpavlin 1.17 sub GetSyncID {
874 dpavlin 1.6 my ($sconn) = @_; # (@_[0]);
875 dpavlin 1.2
876 dpavlin 1.6 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
877 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
878 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
879 dpavlin 1.2 return(-1);
880     }
881     my @row = $result->fetchrow;
882 dpavlin 1.17 print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
883 dpavlin 1.2 return(undef) unless defined $row[0]; # null
884     return($row[0]);
885 dpavlin 1.1 }
886    
887     #
888     # Updates _RSERV_SYNC_ on Master with Slave SyncID
889     #
890 dpavlin 1.17 sub SyncSyncID {
891 dpavlin 1.8 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
892 dpavlin 1.2
893 dpavlin 1.6 my $result = $mconn->exec("BEGIN");
894 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
895 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
896     $mconn->exec("ROLLBACK");
897 dpavlin 1.2 return(-1);
898     }
899    
900 dpavlin 1.6 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
901 dpavlin 1.8 " where server = $sserver AND syncid = $syncid" .
902 dpavlin 1.2 " for update");
903 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
904 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
905     $mconn->exec("ROLLBACK");
906 dpavlin 1.2 return(-1);
907     }
908     my @row = $result->fetchrow;
909 dpavlin 1.17 if (! defined $row[0]) {
910 dpavlin 1.8 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
911 dpavlin 1.6 $mconn->exec("ROLLBACK");
912 dpavlin 1.2 return(0);
913     }
914 dpavlin 1.17 if ($row[1] > 0) {
915 dpavlin 1.2 printf STDERR "SyncID $syncid for server ".
916 dpavlin 1.8 "$sserver already updated\n" unless ($quiet);
917 dpavlin 1.6 $mconn->exec("ROLLBACK");
918 dpavlin 1.2 return(0);
919     }
920 dpavlin 1.6 $result = $mconn->exec("update _RSERV_SYNC_" .
921 dpavlin 1.2 " set synctime = now(), status = 1" .
922 dpavlin 1.8 " where server = $sserver AND syncid = $syncid");
923 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
924 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
925     $mconn->exec("ROLLBACK");
926 dpavlin 1.2 return(-1);
927     }
928 dpavlin 1.6 $result = $mconn->exec("delete from _RSERV_SYNC_" .
929 dpavlin 1.8 " where server = $sserver AND syncid < $syncid");
930 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
931 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
932     $mconn->exec("ROLLBACK");
933 dpavlin 1.2 return(-1);
934     }
935    
936 dpavlin 1.6 $result = $mconn->exec("COMMIT");
937 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
938 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
939     $mconn->exec("ROLLBACK");
940 dpavlin 1.2 return(-1);
941     }
942    
943     return(1);
944 dpavlin 1.1 }
945    
946 dpavlin 1.11 # stuff moved from perl scripts for better re-use
947    
948     sub Rollback {
949     my $conn = shift @_;
950    
951 dpavlin 1.15 print STDERR $conn->errorMessage unless ($quiet);
952 dpavlin 1.11 $conn->exec("ROLLBACK");
953     }
954    
955     sub RollbackAndQuit {
956     my $conn = shift @_;
957    
958     Rollback($conn);
959     exit (-1);
960     }
961    
962     sub Connect {
963     my $info = shift @_;
964    
965     print("Connecting to $info\n") if ($debug || $verbose);
966     my $conn = Pg::connectdb($info);
967     if ($conn->status != PGRES_CONNECTION_OK) {
968 dpavlin 1.17 die "Failed opening $info";
969 dpavlin 1.11 }
970     return $conn;
971     }
972    
973     sub Exec {
974 dpavlin 1.15 my $conn = shift || die "Exec needs connection!";
975     my $sql = shift || die "Exec needs SQL statement!";
976     # used to return error code if no tuples are retured
977     my $return_code = shift;
978    
979     if ($debug) {
980     # re-format SQL in one line (for nicer output)
981     $sql =~ s/[\s\n\r]+/ /gs;
982     print STDERR "Exec: $sql\n";
983     }
984 dpavlin 1.11 my $result = $conn->exec($sql);
985 dpavlin 1.15 if ($result->resultStatus eq PGRES_COMMAND_OK) {
986 dpavlin 1.17 return $result;
987 dpavlin 1.15 } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
988     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
989     return $result;
990     } else {
991     if (defined($return_code)) {
992     print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
993     return($return_code);
994     } else {
995     RollbackAndQuit($conn)
996     }
997     }
998 dpavlin 1.11 }
999    
1000     sub Exec2 {
1001     my $mconn = shift @_;
1002     my $sconn = shift @_;
1003     my $sql = shift @_;
1004    
1005     my $result = $mconn->exec($sql);
1006     RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1007     $result = $sconn->exec($sql);
1008     RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1009 dpavlin 1.15 # XXX TODO: return results?!
1010 dpavlin 1.12 }
1011    
1012 dpavlin 1.18 # exec sql query and return one row from it
1013 dpavlin 1.17 sub ExecFatch {
1014     my $conn = shift || die "ExecFatch need conn!";
1015     my $sql = shift || die "ExecFatch need SQL!";
1016    
1017 dpavlin 1.18 if ($debug) {
1018     # re-format SQL in one line (for nicer output)
1019     $sql =~ s/[\s\n\r]+/ /gs;
1020     print STDERR "Exec: $sql\n";
1021     }
1022 dpavlin 1.17
1023     my $result = $conn->exec($sql);
1024     RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1025    
1026     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1027    
1028     my @row = $result->fetchrow;
1029     print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1030     return @row;
1031     }
1032    
1033 dpavlin 1.18 # exec sql query and dump all rows retured to STDERR (great for debugging)
1034 dpavlin 1.17 sub ExecDebug {
1035 dpavlin 1.18 return if (! $debug);
1036    
1037 dpavlin 1.17 my $conn = shift || die "ExecDebug need conn!";
1038     my $sql = shift || die "ExecDebug need SQL!";
1039    
1040 dpavlin 1.18 if ($debug) {
1041     # re-format SQL in one line (for nicer output)
1042     $sql =~ s/[\s\n\r]+/ /gs;
1043     print STDERR "Exec: $sql\n";
1044     }
1045 dpavlin 1.17
1046     my $result = $conn->exec($sql);
1047     RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1048    
1049     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1050    
1051     while (my @row = $result->fetchrow) {
1052     print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1053     }
1054     return $result->ntuples;
1055     }
1056 dpavlin 1.12 sub MkInfo {
1057     my $db = shift || die "need database name!";
1058     my $host = shift;
1059     my $port = shift;
1060     my $user = shift;
1061     my $password = shift;
1062    
1063     my $info = "dbname=$db";
1064     $info = "$info host=$host" if (defined($host));
1065     $info = "$info port=$port" if (defined($port));
1066     $info = "$info user=$user" if (defined($user));
1067     $info = "$info password=$password" if (defined($password));
1068    
1069     return $info;
1070 dpavlin 1.11 }
1071    
1072 dpavlin 1.1 1;

  ViewVC Help
Powered by ViewVC 1.1.26