/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.17 - (hide annotations)
Sun Nov 2 21:07:20 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.16: +148 -135 lines
first multi-master version which pass regression tests (that doesn't
mean working; tests might just be too little...)
renamed multiplemaster to simple multimaster

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.11 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10 dpavlin 1.17 Rollback RollbackAndQuit Connect Exec Exec2 MkInfo ExecDebug
11 dpavlin 1.11 $debug $quiet $verbose
12     );
13 dpavlin 1.1 @EXPORT_OK = qw();
14 dpavlin 1.3 use strict;
15 dpavlin 1.1 use Pg;
16    
17 dpavlin 1.11 my $debug = 0;
18     my $quiet = 1;
19     my $verbose = 0;
20 dpavlin 1.1
21 dpavlin 1.12 $debug = 1;
22     $quiet = 0;
23     $verbose = 1;
24    
25 dpavlin 1.1 my %Mtables = ();
26     my %Stables = ();
27    
28 dpavlin 1.11 sub GetServerId
29 dpavlin 1.1 {
30 dpavlin 1.11 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
31    
32     print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
33 dpavlin 1.1
34 dpavlin 1.6 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
35 dpavlin 1.11 " host='$Host' AND dbase='$DB'");
36 dpavlin 1.3
37 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
38     {
39 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
40 dpavlin 1.2 return(-1);
41     }
42    
43 dpavlin 1.3 if ($result->cmdTuples && $result->cmdTuples > 1)
44 dpavlin 1.2 {
45 dpavlin 1.11 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
46 dpavlin 1.2 return(-2);
47     }
48    
49     my @row = $result->fetchrow;
50 dpavlin 1.10
51 dpavlin 1.17 print STDERR "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
52 dpavlin 1.10
53 dpavlin 1.2 return $row[0];
54     }
55 dpavlin 1.1
56 dpavlin 1.2 sub PrepareSnapshot
57     {
58 dpavlin 1.17 my ($mconn, $sconn, $outf, $mserver, $sserver, $multimaster, $onlytables) = @_;
59 dpavlin 1.9
60     if ($mserver == $sserver) {
61 dpavlin 1.10 print STDERR "master and slave numbers are same [$mserver] !\n";
62 dpavlin 1.9 return(-1);
63     }
64    
65 dpavlin 1.17 print STDERR "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
66 dpavlin 1.1
67 dpavlin 1.16 # dump master server ID into snapshot file (to prevent replication
68     # of colums from master back to slave)
69     print $outf "-- SERVER $mserver\n";
70    
71 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
72 dpavlin 1.15 my $result = Exec($sconn,"SELECT tname FROM _RSERV_SLAVE_TABLES_", -1);
73     return (-1) if ($result == -1);
74    
75 dpavlin 1.2 my @row;
76     while (@row = $result->fetchrow) {
77     $Stables{$row[0]} = 1;
78     }
79    
80 dpavlin 1.17 print STDERR "Prepare snapshot for tables: ",join(",",keys %Stables),"\n" if ($debug);
81 dpavlin 1.14
82 dpavlin 1.15 Exec($mconn,"BEGIN");
83     Exec($mconn,"set transaction isolation level serializable");
84 dpavlin 1.2
85     # MAP oid --> tabname, keyname, key_type
86 dpavlin 1.15 my $sql = qq{
87     select pgc.oid, pgc.relname, pga.attname, pgt.typname
88     from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga,
89     pg_type pgt
90     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
91     AND pga.attnum = rt.key AND pga.atttypid=pgt.oid
92     };
93     $result = Exec($mconn,$sql);
94    
95 dpavlin 1.2 while (@row = $result->fetchrow)
96     {
97 dpavlin 1.16 printf "$row[0], $row[1], $row[2]\n" if ($debug);
98 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
99     next unless (exists $onlytables->{$row[1]});
100     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
101     }
102 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
103     }
104 dpavlin 1.12
105 dpavlin 1.17 print STDERR "Master database table oids: ",join(",",keys %Mtables),"\n" if ($debug);
106 dpavlin 1.15 if (! %Mtables) {
107     print STDERR "FATAL: can't find oids for tables in master! Did you run SlaveAddTable?\n";
108     RollbackAndQuit($mconn);
109     }
110 dpavlin 1.12
111 dpavlin 1.2 # Read last succeeded sync
112 dpavlin 1.15 $sql = qq{
113     select syncid, synctime, minid, maxid, active from _RSERV_SYNC_
114     where server = $sserver AND syncid =
115     (select max(syncid) from _RSERV_SYNC_
116     where server = $sserver AND status > 0)
117     };
118 dpavlin 1.2
119 dpavlin 1.15 $result = Exec($mconn,$sql);
120 dpavlin 1.16
121 dpavlin 1.2 my @lastsync = $result->fetchrow;
122 dpavlin 1.17 print STDERR "lastsync: ",join(",",@lastsync),"\n" if ($debug);
123 dpavlin 1.9
124     # exclude data which originated from master server
125     my $sel_server = " and l.server = $mserver ";
126    
127 dpavlin 1.2 my $sinfo = "";
128 dpavlin 1.4 if (@lastsync && $lastsync[3] ne '') # sync info
129 dpavlin 1.2 {
130     $sinfo = "and (l.logid >= $lastsync[3]";
131     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
132     $sinfo .= ")";
133     }
134 dpavlin 1.16
135     my @keys; # keys in this snapshot
136    
137 dpavlin 1.2 my $havedeal = 0;
138    
139     # DELETED rows
140     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
141 dpavlin 1.9 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
142 dpavlin 1.2
143 dpavlin 1.9 printf "DELETED: $sql\n" if $debug;
144 dpavlin 1.2
145 dpavlin 1.6 $result = $mconn->exec($sql);
146 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
147 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
148     $mconn->exec("ROLLBACK");
149 dpavlin 1.2 return(-1);
150     }
151    
152 dpavlin 1.7 my $lastoid = -1;
153 dpavlin 1.17 while (@row = $result->fetchrow) {
154 dpavlin 1.2 next unless exists $Mtables{$row[0]};
155     next unless exists $Stables{$Mtables{$row[0]}[0]};
156    
157 dpavlin 1.17 if ($lastoid != $row[0]) {
158     if ($lastoid == -1) {
159 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
160 dpavlin 1.2 return($syncid) if $syncid < 0;
161     $havedeal = 1;
162 dpavlin 1.17 } else {
163 dpavlin 1.2 printf $outf "\\.\n";
164     }
165     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
166     $lastoid = $row[0];
167     }
168 dpavlin 1.17 if (! defined $row[1]) {
169 dpavlin 1.2 print STDERR "NULL key\n" unless ($quiet);
170 dpavlin 1.6 $mconn->exec("ROLLBACK");
171 dpavlin 1.2 return(-2);
172     }
173     printf $outf "%s\n", OutputValue($row[1]);
174 dpavlin 1.17 push @keys,OutputKey($row[2],$Mtables{$row[0]}[2]);
175 dpavlin 1.2 }
176 dpavlin 1.7 printf $outf "\\.\n" if ($lastoid != -1);
177 dpavlin 1.2
178     # UPDATED rows
179    
180     my ($taboid, $tabname, $tabkey);
181     foreach $taboid (keys %Mtables)
182     {
183 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
184 dpavlin 1.2 next unless exists $Stables{$tabname};
185    
186     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
187    
188 dpavlin 1.16 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\",$oidkey \"_$tabname\".* FROM \"$tabname\" ".
189 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
190 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
191     $sel_server;
192 dpavlin 1.2
193 dpavlin 1.9 printf "UPDATED: $sql\n" if $debug;
194 dpavlin 1.2
195 dpavlin 1.6 $result = $mconn->exec($sql);
196 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
197     {
198 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
199 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
200     $mconn->exec("ROLLBACK");
201 dpavlin 1.2 return(-1);
202     }
203     next if $result->ntuples <= 0;
204     if (! $havedeal)
205     {
206 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
207 dpavlin 1.2 return($syncid) if $syncid < 0;
208     $havedeal = 1;
209 dpavlin 1.1 }
210 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
211     printf "-- UPDATE $tabname\n" if $debug;
212     while (@row = $result->fetchrow)
213 dpavlin 1.1 {
214 dpavlin 1.17 push @keys,OutputKey(shift @row,$keytype);
215 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
216 dpavlin 1.2 {
217     printf $outf " " if $i;
218     printf " " if $i && $debug;
219     printf $outf "%s", OutputValue($row[$i]);
220     printf "%s", OutputValue($row[$i]) if $debug;;
221     }
222     printf $outf "\n";
223     printf "\n" if $debug;
224     }
225     printf $outf "\\.\n";
226     printf "\\.\n" if $debug;;
227     }
228    
229     # INSERTED rows
230    
231     foreach $taboid (keys %Mtables)
232     {
233 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
234 dpavlin 1.2 next unless exists $Stables{$tabname};
235    
236     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
237    
238 dpavlin 1.16 $sql = sprintf "SELECT \"_$tabname\".\"${tabkey}\", $oidkey \"_$tabname\".* FROM \"$tabname\" ".
239 dpavlin 1.2 "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
240 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
241     $sel_server;
242 dpavlin 1.2
243 dpavlin 1.9 printf "INSERTED: $sql\n" if $debug;
244 dpavlin 1.2
245 dpavlin 1.6 $result = $mconn->exec($sql);
246 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
247     {
248 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
249 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
250     $mconn->exec("ROLLBACK");
251 dpavlin 1.2 return(-1);
252     }
253     next if $result->ntuples <= 0;
254     if (! $havedeal)
255     {
256 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
257 dpavlin 1.2 return($syncid) if $syncid < 0;
258     $havedeal = 1;
259 dpavlin 1.1 }
260 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
261     printf "-- INSERT $tabname\n" if $debug;
262 dpavlin 1.1 while (@row = $result->fetchrow)
263     {
264 dpavlin 1.17 push @keys,OutputKey(shift @row,$keytype);
265 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
266 dpavlin 1.2 {
267     printf $outf " " if $i;
268     printf " " if $i && $debug;
269     printf $outf "%s", OutputValue($row[$i]);
270 dpavlin 1.17 printf "%s", OutputValue($row[$i]) if $debug;
271 dpavlin 1.2 }
272     printf $outf "\n";
273     printf "\n" if $debug;
274     }
275     printf $outf "\\.\n";
276     printf "\\.\n" if $debug;;
277     }
278    
279    
280     unless ($havedeal)
281     {
282 dpavlin 1.17 print STDERR "hon't have deal, rollback...\n" if ($debug);
283 dpavlin 1.6 $mconn->exec("ROLLBACK");
284 dpavlin 1.2 return(0);
285     }
286    
287     # Remember this snapshot info
288 dpavlin 1.8 $result = $mconn->exec("select _rserv_sync_($sserver)");
289 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
290     {
291     printf $outf "-- ERROR\n";
292 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
293     $mconn->exec("ROLLBACK");
294 dpavlin 1.2 return(-1);
295     }
296    
297 dpavlin 1.6 $result = $mconn->exec("COMMIT");
298 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
299     {
300     printf $outf "-- ERROR\n";
301 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
302     $mconn->exec("ROLLBACK");
303 dpavlin 1.2 return(-1);
304     }
305 dpavlin 1.16
306 dpavlin 1.17 if ($multimaster) {
307 dpavlin 1.16 # save keys
308 dpavlin 1.17 # my $key_out = "-- KEYS ".($#keys+1)."\n(\"key\" like ".join(") or (\"key\" like ",@keys).")\n";
309 dpavlin 1.16 my $key_out = "-- KEYS ".($#keys+1)."\n".join(",",@keys)."\n";
310     print $outf $key_out;
311     print $key_out if ($debug);
312     }
313    
314 dpavlin 1.2 printf $outf "-- OK\n";
315     printf "-- OK\n" if $debug;
316    
317     return(1);
318    
319 dpavlin 1.1 }
320    
321     sub OutputValue
322     {
323     my ($val) = @_; # @_[0];
324    
325     return("\\N") unless defined $val;
326    
327     $val =~ s/\\/\\\\/g;
328     $val =~ s/ /\\011/g;
329     $val =~ s/\n/\\012/g;
330     $val =~ s/\'/\\047/g;
331    
332     return($val);
333     }
334    
335 dpavlin 1.16 sub OutputKey {
336     my $val = shift;
337 dpavlin 1.17 my $cast = shift || '';
338     # $cast = "::$cast" if ($cast);
339     $cast = "::text";
340     # $cast = "";
341 dpavlin 1.16
342 dpavlin 1.17 return "null" if (! defined($val));
343    
344     print STDERR "Key: ${val}${cast}\n" if ($debug);
345 dpavlin 1.16
346     if ($val =~ m/^\d+$/) {
347 dpavlin 1.17 return "${val}${cast}";
348 dpavlin 1.16 } else {
349 dpavlin 1.17 return "'$val'${cast}";
350 dpavlin 1.16 }
351     }
352    
353 dpavlin 1.1 # Get syncid for new snapshot
354     sub GetSYNCID
355     {
356 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
357    
358     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
359     if ($result->resultStatus ne PGRES_TUPLES_OK)
360     {
361     print STDERR $conn->errorMessage unless ($quiet);
362     $conn->exec("ROLLBACK");
363     return(-1);
364     }
365 dpavlin 1.16
366 dpavlin 1.2 my @row = $result->fetchrow;
367    
368     printf $outf "-- SYNCID $row[0]\n";
369     printf "-- SYNCID $row[0]\n" if $debug;
370     return($row[0]);
371 dpavlin 1.1 }
372    
373    
374     sub CleanLog
375     {
376 dpavlin 1.5 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
377 dpavlin 1.2
378     my $result = $conn->exec("BEGIN");
379     if ($result->resultStatus ne PGRES_COMMAND_OK)
380     {
381     print STDERR $conn->errorMessage unless ($quiet);
382     $conn->exec("ROLLBACK");
383     return(-1);
384     }
385    
386     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
387     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
388     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
389    
390     printf "$sql\n" if $debug;
391    
392     $result = $conn->exec($sql);
393     if ($result->resultStatus ne PGRES_TUPLES_OK)
394     {
395     print STDERR $conn->errorMessage unless ($quiet);
396     return(-1);
397     }
398     my $maxid = '';
399     my %active = ();
400     while (my @row = $result->fetchrow)
401     {
402     $maxid = $row[0] if $maxid eq '';
403     last if $row[0] > $maxid;
404     my @ids = split(/[ ]+,[ ]+/, $row[1]);
405 dpavlin 1.3 foreach my $aid (@ids)
406 dpavlin 1.2 {
407     $active{$aid} = 1 unless exists $active{$aid};
408     }
409     }
410     if ($maxid eq '')
411     {
412     print STDERR "No Sync IDs\n" unless ($quiet);
413     return(0);
414     }
415     my $alist = join(',', keys %active);
416     my $sinfo = "logid < $maxid";
417     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
418 dpavlin 1.5 #if (ref($onlytables) eq 'HASH') {
419     # foreach my $onlytable (keys %{$onlytables}) {
420     # $sinfo
421     # }
422     #}
423 dpavlin 1.2 $sql = "delete from _RSERV_LOG_ where " .
424     "logtime < now() - '$howold second'::interval AND $sinfo";
425    
426     printf "$sql\n" if $debug;
427    
428     $result = $conn->exec($sql);
429     if ($result->resultStatus ne PGRES_COMMAND_OK)
430     {
431     print STDERR $conn->errorMessage unless ($quiet);
432     $conn->exec("ROLLBACK");
433     return(-1);
434     }
435     $maxid = $result->cmdTuples;
436    
437     $result = $conn->exec("COMMIT");
438     if ($result->resultStatus ne PGRES_COMMAND_OK)
439     {
440     print STDERR $conn->errorMessage unless ($quiet);
441     $conn->exec("ROLLBACK");
442     return(-1);
443     }
444    
445     return($maxid);
446     }
447 dpavlin 1.1
448 dpavlin 1.2 sub ApplySnapshot
449     {
450 dpavlin 1.17 my ($sconn, $inpf, $multimaster, $onlytables) = @_; # (@_[0], @_[1]);
451 dpavlin 1.16
452     my $serverId;
453    
454 dpavlin 1.6 my $result = $sconn->exec("BEGIN");
455 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
456 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
457     $sconn->exec("ROLLBACK");
458 dpavlin 1.2 return(-1);
459     }
460    
461 dpavlin 1.6 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
462 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
463 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
464     $sconn->exec("ROLLBACK");
465 dpavlin 1.2 return(-1);
466     }
467    
468     # MAP name --> oid, keyname, keynum
469 dpavlin 1.16 my $sql = qq{
470     select pgc.oid, pgc.relname, pga.attname, rt.key
471     from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga
472     where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid
473     AND pga.attnum = rt.key
474     };
475 dpavlin 1.2
476 dpavlin 1.6 $result = $sconn->exec($sql);
477 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
478 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
479     $sconn->exec("ROLLBACK");
480 dpavlin 1.2 return(-1);
481     }
482     %Stables = ();
483 dpavlin 1.16 while (my @row = $result->fetchrow) {
484 dpavlin 1.2 # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
485 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
486     next unless (exists $onlytables->{$row[1]});
487     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
488     }
489 dpavlin 1.2 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
490     }
491    
492 dpavlin 1.12 print STDERR "Snapshot tables oids: ",join(",",keys %Stables),"\n" if ($debug);
493    
494 dpavlin 1.16 # save keys from snapshot because we want to update _rserv_log_ with
495     # correct source server later...
496     my @keys;
497 dpavlin 1.17 my $keys_sql;
498 dpavlin 1.16
499 dpavlin 1.2 my $ok = 0;
500 dpavlin 1.9 my $syncid = -1;
501 dpavlin 1.16 while(<$inpf>) {
502 dpavlin 1.2 $_ =~ s/\n//;
503     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
504 dpavlin 1.11 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
505 dpavlin 1.16 if ($cmt ne '--') {
506 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
507 dpavlin 1.6 $sconn->exec("ROLLBACK");
508 dpavlin 1.2 return(-2);
509     }
510 dpavlin 1.16 if ($cmd eq 'DELETE') {
511     if ($syncid == -1) {
512 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
513 dpavlin 1.6 $sconn->exec("ROLLBACK");
514 dpavlin 1.2 return(-2);
515     }
516 dpavlin 1.6 $result = DoDelete($sconn, $inpf, $prm);
517 dpavlin 1.16 if ($result) {
518 dpavlin 1.6 $sconn->exec("ROLLBACK");
519 dpavlin 1.2 return($result);
520     }
521 dpavlin 1.16 } elsif ($cmd eq 'INSERT') {
522     if ($syncid == -1) {
523 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
524 dpavlin 1.6 $sconn->exec("ROLLBACK");
525 dpavlin 1.2 return(-2);
526     }
527 dpavlin 1.6 $result = DoInsert($sconn, $inpf, $prm);
528 dpavlin 1.16 if ($result) {
529 dpavlin 1.6 $sconn->exec("ROLLBACK");
530 dpavlin 1.2 return($result);
531     }
532 dpavlin 1.16 } elsif ($cmd eq 'UPDATE') {
533     if ($syncid == -1) {
534 dpavlin 1.2 printf STDERR "Sync ID unspecified\n" unless ($quiet);
535 dpavlin 1.6 $sconn->exec("ROLLBACK");
536 dpavlin 1.2 return(-2);
537     }
538 dpavlin 1.6 $result = DoUpdate($sconn, $inpf, $prm);
539 dpavlin 1.16 if ($result) {
540 dpavlin 1.6 $sconn->exec("ROLLBACK");
541 dpavlin 1.2 return($result);
542     }
543 dpavlin 1.16 } elsif ($cmd eq 'SYNCID') {
544     if ($syncid != -1) {
545 dpavlin 1.2 printf STDERR "Second Sync ID ?!\n" unless ($quiet);
546 dpavlin 1.6 $sconn->exec("ROLLBACK");
547 dpavlin 1.2 return(-2);
548     }
549 dpavlin 1.16 if ($prm !~ /^\d+$/) {
550 dpavlin 1.2 printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
551 dpavlin 1.6 $sconn->exec("ROLLBACK");
552 dpavlin 1.2 return(-2);
553     }
554     $syncid = $prm;
555    
556     printf STDERR "Sync ID $syncid\n" unless ($quiet);
557    
558 dpavlin 1.17 $result = $sconn->exec(qq{
559     select syncid, synctime
560     from _RSERV_SLAVE_SYNC_
561     where syncid =
562     (select max(syncid) from _RSERV_SLAVE_SYNC_)
563     });
564 dpavlin 1.16 if ($result->resultStatus ne PGRES_TUPLES_OK) {
565 dpavlin 1.17 print STDERR "can't get current syncid from _rserv_slave_sync_: ",$sconn->errorMessage unless ($quiet);
566 dpavlin 1.6 $sconn->exec("ROLLBACK");
567 dpavlin 1.1 return(-1);
568 dpavlin 1.2 }
569 dpavlin 1.17
570 dpavlin 1.2 my @row = $result->fetchrow;
571 dpavlin 1.17 print STDERR "Slave Sync ID ",($row[0] || "null"),"\n" if ($debug);
572 dpavlin 1.16 if (! defined $row[0]) {
573 dpavlin 1.17 $result = Exec($sconn,qq{
574     insert into _RSERV_SLAVE_SYNC_ (syncid, synctime)
575     values ($syncid, now())
576     });
577 dpavlin 1.16 } elsif ($row[0] >= $prm) {
578 dpavlin 1.2 printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
579 dpavlin 1.6 $sconn->exec("ROLLBACK");
580 dpavlin 1.2 return(0);
581 dpavlin 1.16 } else {
582 dpavlin 1.17 $result = Exec($sconn,qq{
583     update _RSERV_SLAVE_SYNC_ set syncid = $syncid, synctime = now()
584     });
585 dpavlin 1.2 }
586 dpavlin 1.16 if ($result->resultStatus ne PGRES_COMMAND_OK) {
587 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
588     $sconn->exec("ROLLBACK");
589 dpavlin 1.1 return(-1);
590 dpavlin 1.2 }
591 dpavlin 1.16 } elsif ($cmd eq 'OK') {
592 dpavlin 1.2 $ok = 1;
593     last;
594 dpavlin 1.16 } elsif ($cmd eq 'ERROR') {
595 dpavlin 1.2 printf STDERR "ERROR signaled\n" unless ($quiet);
596 dpavlin 1.6 $sconn->exec("ROLLBACK");
597 dpavlin 1.2 return(-2);
598 dpavlin 1.16 } elsif ($cmd eq 'SERVER') {
599     if ($prm !~ /^\d+$/) {
600     printf STDERR "Invalid Server ID $prm\n" unless ($quiet);
601     $sconn->exec("ROLLBACK");
602     return(-2);
603     }
604     $serverId = $prm;
605     print STDERR "Server ID $serverId\n" unless ($quiet);
606     } elsif ($cmd eq 'KEYS') {
607     if ($prm !~ /^\d+$/) {
608     printf STDERR "Invalid numer of keys $prm\n" unless ($quiet);
609     $sconn->exec("ROLLBACK");
610     return(-2);
611     }
612     my $keys = <$inpf>;
613     chomp($keys);
614 dpavlin 1.17 if ($multimaster) {
615     ExecFatch($sconn,"select count(*) from _rserv_log_");
616     ExecDebug($sconn,"select * from _rserv_log_");
617     my ($logid) = ExecFatch($sconn,"select distinct logid from _rserv_log_ where key in ($keys)");
618     $keys_sql = qq{
619     update _rserv_log_ set server=$serverId
620     where key in ($keys)
621     };
622     $keys_sql = qq{
623     update _rserv_log_ set server=$serverId
624     where logid = $logid
625     };
626    
627     print STDERR "$keys_sql\n" if ($debug);
628     $result = $sconn->exec($keys_sql);
629     ExecDebug($sconn,"explain analyze $keys_sql");
630     print STDERR "expected $prm updates, got ",$result->ntuples,"\n" if ($result->ntuples != $prm);
631     # if ($result->resultStatus ne PGRES_COMMAND_OK || $result->ntuples != $prm) {
632     if (0) {
633     print STDERR "FATAL: Cannot update source server in _rserv_log_: ",$sconn->errorMessage,"\n";
634 dpavlin 1.16 $sconn->exec("ROLLBACK");
635     return(-1);
636     }
637     }
638     } else {
639 dpavlin 1.2 printf STDERR "Unknown command $cmd\n" unless ($quiet);
640 dpavlin 1.6 $sconn->exec("ROLLBACK");
641 dpavlin 1.2 return(-2);
642     }
643     }
644    
645 dpavlin 1.17 ExecFatch($sconn,"select count(*) from _rserv_log_");
646     if (! $ok) {
647 dpavlin 1.2 printf STDERR "No OK flag in input\n" unless ($quiet);
648 dpavlin 1.6 $sconn->exec("ROLLBACK");
649 dpavlin 1.2 return(-2);
650     }
651    
652 dpavlin 1.6 $result = $sconn->exec("COMMIT");
653 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
654 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
655     $sconn->exec("ROLLBACK");
656 dpavlin 1.2 return(-1);
657     }
658    
659     return(1);
660 dpavlin 1.1 }
661    
662     sub DoDelete
663     {
664 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
665 dpavlin 1.2
666     # only delete tables that the slave wants
667     if (! defined($Stables{$tabname})) {
668 dpavlin 1.17 print STDERR "Not configured to delete rows from table $tabname\n" unless $quiet;
669 dpavlin 1.2 while (<$inpf>) {
670 dpavlin 1.3 my $istring = $_;
671 dpavlin 1.2 $istring =~ s/\n//;
672     last if ($istring eq '\.');
673     }
674     return(0);
675     }
676 dpavlin 1.1
677 dpavlin 1.2 my $ok = 0;
678     while(<$inpf>)
679     {
680     if ($_ !~ /\n$/)
681     {
682     printf STDERR "Invalid format\n" unless ($quiet);
683     return(-2);
684     }
685     my $key = $_;
686     $key =~ s/\n//;
687     if ($key eq '\.')
688 dpavlin 1.1 {
689 dpavlin 1.2 $ok = 1;
690     last;
691 dpavlin 1.1 }
692 dpavlin 1.2
693     my $sql = "delete from \"$tabname\" where ".
694     "\"$Stables{$tabname}->[1]\" = '$key'";
695    
696     printf "$sql\n" if $debug;
697    
698 dpavlin 1.6 my $result = $sconn->exec($sql);
699 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
700 dpavlin 1.1 {
701 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
702 dpavlin 1.2 return(-1);
703 dpavlin 1.1 }
704 dpavlin 1.2 }
705    
706     if (! $ok)
707     {
708     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
709     return(-2);
710     }
711    
712     return(0);
713 dpavlin 1.1 }
714    
715    
716     sub DoUpdate
717     {
718 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
719 dpavlin 1.1
720 dpavlin 1.2 # only update the tables that the slave wants
721     if (! defined($Stables{$tabname})) {
722 dpavlin 1.17 print STDERR "Not configured to update rows from table $tabname\n" unless $quiet;
723 dpavlin 1.2 while (<$inpf>) {
724 dpavlin 1.3 my $istring = $_;
725 dpavlin 1.2 $istring =~ s/\n//;
726     last if ($istring eq '\.');
727     }
728     return(0);
729     }
730 dpavlin 1.1
731 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
732    
733     my @CopyBuf = ();
734     my $CBufLen = 0;
735     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
736    
737     my $sql = "select attnum, attname from pg_attribute" .
738     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
739    
740 dpavlin 1.6 my $result = $sconn->exec($sql);
741 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
742     {
743 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
744 dpavlin 1.2 return(-1);
745     }
746    
747     my @anames = ();
748 dpavlin 1.17 while (my @row = $result->fetchrow) {
749 dpavlin 1.2 $anames[$row[0]] = $row[1];
750     }
751    
752     my $istring;
753     my $ok = 0;
754 dpavlin 1.17 while(<$inpf>) {
755     if ($_ !~ /\n$/) {
756 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
757     return(-2);
758     }
759     $istring = $_;
760     $istring =~ s/\n//;
761 dpavlin 1.17 if ($istring eq '\.') {
762 dpavlin 1.2 $ok = 1;
763     last;
764     }
765     my @vals = split(/ /, $istring);
766 dpavlin 1.17 if ($oidkey) {
767     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) {
768 dpavlin 1.2 printf STDERR "Invalid OID\n" unless ($quiet);
769     return(-2);
770     }
771     $oidkey = $vals[0];
772 dpavlin 1.17 } else {
773 dpavlin 1.2 unshift @vals, '';
774     }
775    
776     $sql = "update \"$tabname\" set ";
777     my $ocnt = 0;
778 dpavlin 1.17 for (my $i = 1; $i <= $#anames; $i++) {
779     if ($vals[$i] eq '\N') {
780     if ($i == $Stables{$tabname}->[2]) {
781 dpavlin 1.2 printf STDERR "NULL key\n" unless ($quiet);
782     return(-2);
783     }
784     $vals[$i] = 'null';
785 dpavlin 1.17 } else {
786 dpavlin 1.2 $vals[$i] = "'" . $vals[$i] . "'";
787     next if $i == $Stables{$tabname}->[2];
788     }
789     $ocnt++;
790     $sql .= ', ' if $ocnt > 1;
791     $sql .= "\"$anames[$i]\" = $vals[$i]";
792 dpavlin 1.17 } if ($oidkey) {
793 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
794 dpavlin 1.17 } else {
795 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
796     $vals[$Stables{$tabname}->[2]];
797 dpavlin 1.1 }
798 dpavlin 1.2
799     printf "$sql\n" if $debug;
800    
801 dpavlin 1.6 $result = $sconn->exec($sql);
802 dpavlin 1.2
803 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
804 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
805 dpavlin 1.2 return(-1);
806 dpavlin 1.1 }
807 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
808    
809 dpavlin 1.17 if ($result->cmdTuples > 1) {
810 dpavlin 1.2 printf STDERR "Duplicate keys\n" unless ($quiet);
811     return(-2);
812     }
813    
814     # no key - copy
815     push @CopyBuf, "$istring\n";
816     $CBufLen += length($istring);
817    
818 dpavlin 1.17 if ($CBufLen >= $CBufMax) {
819 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
820 dpavlin 1.2 return($result) if $result;
821     @CopyBuf = ();
822     $CBufLen = 0;
823     }
824     }
825    
826 dpavlin 1.17 if (! $ok) {
827 dpavlin 1.2 printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
828     return(-2);
829     }
830    
831 dpavlin 1.17 if ($CBufLen) {
832     print STDERR "@CopyBuf\n" if $debug;
833 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
834 dpavlin 1.2 return($result) if $result;
835     }
836 dpavlin 1.1
837 dpavlin 1.2 return(0);
838 dpavlin 1.1 }
839    
840 dpavlin 1.2 sub DoInsert
841 dpavlin 1.1 {
842 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
843 dpavlin 1.1
844 dpavlin 1.2 # only insert rows into tables that the slave wants
845     if (! defined($Stables{$tabname})) {
846 dpavlin 1.17 print STDERR "Not configured to insert rows from table $tabname\n" unless $quiet;
847 dpavlin 1.2 while (<$inpf>) {
848 dpavlin 1.3 my $istring = $_;
849 dpavlin 1.2 $istring =~ s/\n//;
850     last if ($istring eq '\.');
851 dpavlin 1.1 }
852 dpavlin 1.2 return(0);
853     }
854 dpavlin 1.1
855 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
856    
857     my @CopyBuf = ();
858     my $CBufLen = 0;
859     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
860    
861     my $istring;
862     my $ok = 0;
863 dpavlin 1.17 while(<$inpf>) {
864     if ($_ !~ /\n$/) {
865 dpavlin 1.2 printf STDERR "Invalid format\n" unless ($quiet);
866     return(-2);
867     }
868     $istring = $_;
869     $istring =~ s/\n//;
870 dpavlin 1.17 if ($istring eq '\.') {
871 dpavlin 1.2 $ok = 1;
872     last;
873     }
874    
875     # no key - copy
876     push @CopyBuf, "$istring\n";
877     $CBufLen += length($istring);
878    
879 dpavlin 1.17 if ($CBufLen >= $CBufMax) {
880 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
881 dpavlin 1.2 return($result) if $result;
882     @CopyBuf = ();
883     $CBufLen = 0;
884     }
885     }
886    
887 dpavlin 1.17 if (! $ok) {
888 dpavlin 1.2 printf STDERR "No end of input in INSERT section\n" unless ($quiet);
889     return(-2);
890     }
891    
892 dpavlin 1.17 if ($CBufLen) {
893     print STDERR "@CopyBuf\n" if $debug;
894 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
895 dpavlin 1.2 return($result) if $result;
896     }
897    
898     return(0);
899     }
900 dpavlin 1.1
901    
902 dpavlin 1.17 sub DoCopy {
903 dpavlin 1.6 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
904 dpavlin 1.2
905     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
906     "FROM STDIN";
907 dpavlin 1.6 my $result = $sconn->exec($sql);
908 dpavlin 1.17 if ($result->resultStatus ne PGRES_COPY_IN) {
909 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
910 dpavlin 1.2 return(-1);
911     }
912    
913 dpavlin 1.17 foreach my $str (@{$CBuf}) {
914 dpavlin 1.6 $sconn->putline($str);
915 dpavlin 1.2 }
916    
917 dpavlin 1.6 $sconn->putline("\\.\n");
918 dpavlin 1.2
919 dpavlin 1.17 if ($sconn->endcopy) {
920 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
921 dpavlin 1.2 return(-1);
922     }
923    
924     return(0);
925 dpavlin 1.1 }
926    
927    
928     #
929     # Returns last SyncID applied on Slave
930     #
931 dpavlin 1.17 sub GetSyncID {
932 dpavlin 1.6 my ($sconn) = @_; # (@_[0]);
933 dpavlin 1.2
934 dpavlin 1.6 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
935 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
936 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
937 dpavlin 1.2 return(-1);
938     }
939     my @row = $result->fetchrow;
940 dpavlin 1.17 print STDERR "GetSyncID: ",($row[0] || 'null'),"\n" if ($debug);
941 dpavlin 1.2 return(undef) unless defined $row[0]; # null
942     return($row[0]);
943 dpavlin 1.1 }
944    
945     #
946     # Updates _RSERV_SYNC_ on Master with Slave SyncID
947     #
948 dpavlin 1.17 sub SyncSyncID {
949 dpavlin 1.8 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
950 dpavlin 1.2
951 dpavlin 1.6 my $result = $mconn->exec("BEGIN");
952 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
953 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
954     $mconn->exec("ROLLBACK");
955 dpavlin 1.2 return(-1);
956     }
957    
958 dpavlin 1.6 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
959 dpavlin 1.8 " where server = $sserver AND syncid = $syncid" .
960 dpavlin 1.2 " for update");
961 dpavlin 1.17 if ($result->resultStatus ne PGRES_TUPLES_OK) {
962 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
963     $mconn->exec("ROLLBACK");
964 dpavlin 1.2 return(-1);
965     }
966     my @row = $result->fetchrow;
967 dpavlin 1.17 if (! defined $row[0]) {
968 dpavlin 1.8 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
969 dpavlin 1.6 $mconn->exec("ROLLBACK");
970 dpavlin 1.2 return(0);
971     }
972 dpavlin 1.17 if ($row[1] > 0) {
973 dpavlin 1.2 printf STDERR "SyncID $syncid for server ".
974 dpavlin 1.8 "$sserver already updated\n" unless ($quiet);
975 dpavlin 1.6 $mconn->exec("ROLLBACK");
976 dpavlin 1.2 return(0);
977     }
978 dpavlin 1.6 $result = $mconn->exec("update _RSERV_SYNC_" .
979 dpavlin 1.2 " set synctime = now(), status = 1" .
980 dpavlin 1.8 " where server = $sserver AND syncid = $syncid");
981 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
982 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
983     $mconn->exec("ROLLBACK");
984 dpavlin 1.2 return(-1);
985     }
986 dpavlin 1.6 $result = $mconn->exec("delete from _RSERV_SYNC_" .
987 dpavlin 1.8 " where server = $sserver AND syncid < $syncid");
988 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
989 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
990     $mconn->exec("ROLLBACK");
991 dpavlin 1.2 return(-1);
992     }
993    
994 dpavlin 1.6 $result = $mconn->exec("COMMIT");
995 dpavlin 1.17 if ($result->resultStatus ne PGRES_COMMAND_OK) {
996 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
997     $mconn->exec("ROLLBACK");
998 dpavlin 1.2 return(-1);
999     }
1000    
1001     return(1);
1002 dpavlin 1.1 }
1003    
1004 dpavlin 1.11 # stuff moved from perl scripts for better re-use
1005    
1006     sub Rollback {
1007     my $conn = shift @_;
1008    
1009 dpavlin 1.15 print STDERR $conn->errorMessage unless ($quiet);
1010 dpavlin 1.11 $conn->exec("ROLLBACK");
1011     }
1012    
1013     sub RollbackAndQuit {
1014     my $conn = shift @_;
1015    
1016     Rollback($conn);
1017     exit (-1);
1018     }
1019    
1020     sub Connect {
1021     my $info = shift @_;
1022    
1023     print("Connecting to $info\n") if ($debug || $verbose);
1024     my $conn = Pg::connectdb($info);
1025     if ($conn->status != PGRES_CONNECTION_OK) {
1026 dpavlin 1.17 die "Failed opening $info";
1027 dpavlin 1.11 }
1028     return $conn;
1029     }
1030    
1031     sub Exec {
1032 dpavlin 1.15 my $conn = shift || die "Exec needs connection!";
1033     my $sql = shift || die "Exec needs SQL statement!";
1034     # used to return error code if no tuples are retured
1035     my $return_code = shift;
1036    
1037     if ($debug) {
1038     # re-format SQL in one line (for nicer output)
1039     $sql =~ s/[\s\n\r]+/ /gs;
1040     print STDERR "Exec: $sql\n";
1041     }
1042 dpavlin 1.11 my $result = $conn->exec($sql);
1043 dpavlin 1.15 if ($result->resultStatus eq PGRES_COMMAND_OK) {
1044 dpavlin 1.17 return $result;
1045 dpavlin 1.15 } elsif ($result->resultStatus eq PGRES_TUPLES_OK) {
1046     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1047     return $result;
1048     } else {
1049     if (defined($return_code)) {
1050     print STDERR "ERROR: ",$conn->errorMessage,"\n" unless ($quiet);
1051     return($return_code);
1052     } else {
1053     RollbackAndQuit($conn)
1054     }
1055     }
1056 dpavlin 1.11 }
1057    
1058     sub Exec2 {
1059     my $mconn = shift @_;
1060     my $sconn = shift @_;
1061     my $sql = shift @_;
1062    
1063     my $result = $mconn->exec($sql);
1064     RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1065     $result = $sconn->exec($sql);
1066     RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1067 dpavlin 1.15 # XXX TODO: return results?!
1068 dpavlin 1.12 }
1069    
1070 dpavlin 1.17 sub ExecFatch {
1071     my $conn = shift || die "ExecFatch need conn!";
1072     my $sql = shift || die "ExecFatch need SQL!";
1073    
1074     print STDERR "ExecFatch: $sql\n" if ($debug);
1075    
1076     my $result = $conn->exec($sql);
1077     RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1078    
1079     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1080    
1081     my @row = $result->fetchrow;
1082     print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1083     return @row;
1084     }
1085    
1086     sub ExecDebug {
1087     my $conn = shift || die "ExecDebug need conn!";
1088     my $sql = shift || die "ExecDebug need SQL!";
1089    
1090     print STDERR "ExecDebug: $sql\n" if ($debug);
1091    
1092     my $result = $conn->exec($sql);
1093     RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_TUPLES_OK);
1094    
1095     print STDERR "Returned ",$result->ntuples," tuples\n" if ($debug);
1096    
1097     while (my @row = $result->fetchrow) {
1098     print STDERR "DATA: ",join(",",@row),"\n" if ($debug);
1099     }
1100     return $result->ntuples;
1101     }
1102 dpavlin 1.12 sub MkInfo {
1103     my $db = shift || die "need database name!";
1104     my $host = shift;
1105     my $port = shift;
1106     my $user = shift;
1107     my $password = shift;
1108    
1109     my $info = "dbname=$db";
1110     $info = "$info host=$host" if (defined($host));
1111     $info = "$info port=$port" if (defined($port));
1112     $info = "$info user=$user" if (defined($user));
1113     $info = "$info password=$password" if (defined($password));
1114    
1115     return $info;
1116 dpavlin 1.11 }
1117    
1118 dpavlin 1.1 1;

  ViewVC Help
Powered by ViewVC 1.1.26