/[rserv]/share/RServ.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /share/RServ.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.11 - (hide annotations)
Fri Oct 31 00:07:37 2003 UTC (20 years, 6 months ago) by dpavlin
Branch: MAIN
Changes since 1.10: +63 -11 lines
move common code from init scripts here, rename GetSlaveId -> GetServerId

1 dpavlin 1.1 # -*- perl -*-
2     # RServ.pm
3     # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
4    
5     package RServ;
6    
7     require Exporter;
8     @ISA = qw(Exporter);
9 dpavlin 1.11 @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog GetServerId
10     Rollback RollbackAndQuit Connect Exec Exec2
11     $debug $quiet $verbose
12     );
13 dpavlin 1.1 @EXPORT_OK = qw();
14 dpavlin 1.3 use strict;
15 dpavlin 1.1 use Pg;
16    
17 dpavlin 1.11 my $debug = 0;
18     my $quiet = 1;
19     my $verbose = 0;
20 dpavlin 1.1
21     my %Mtables = ();
22     my %Stables = ();
23    
24 dpavlin 1.11 sub GetServerId
25 dpavlin 1.1 {
26 dpavlin 1.11 my ($mconn, $DB, $Host) = @_; # (@_[0], @_[1]. @_[2]);
27    
28     print STDERR "GetServerId: host $Host, database $DB\n" if ($debug);
29 dpavlin 1.1
30 dpavlin 1.6 my $result = $mconn->exec("SELECT server FROM _RSERV_SERVERS_ WHERE".
31 dpavlin 1.11 " host='$Host' AND dbase='$DB'");
32 dpavlin 1.3
33 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
34     {
35 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
36 dpavlin 1.2 return(-1);
37     }
38    
39 dpavlin 1.3 if ($result->cmdTuples && $result->cmdTuples > 1)
40 dpavlin 1.2 {
41 dpavlin 1.11 printf STDERR "Duplicate host definitions.\n" unless ($quiet);
42 dpavlin 1.2 return(-2);
43     }
44    
45     my @row = $result->fetchrow;
46 dpavlin 1.10
47 dpavlin 1.11 print "GetServerId($DB,$Host) == $row[0]\n" if ($debug);
48 dpavlin 1.10
49 dpavlin 1.2 return $row[0];
50     }
51 dpavlin 1.1
52 dpavlin 1.2 sub PrepareSnapshot
53     {
54 dpavlin 1.9 my ($mconn, $sconn, $outf, $mserver, $sserver, $onlytables) = @_;
55    
56     if ($mserver == $sserver) {
57 dpavlin 1.10 print STDERR "master and slave numbers are same [$mserver] !\n";
58 dpavlin 1.9 return(-1);
59     }
60    
61     print "PrepareSnapshot master: $mserver slave: $sserver\n" if ($debug);
62 dpavlin 1.1
63 dpavlin 1.2 # first, we must know for wich tables the slave subscribed
64     my $result = $sconn->exec("SELECT tname FROM _RSERV_SLAVE_TABLES_");
65     if ($result->resultStatus ne PGRES_TUPLES_OK)
66     {
67 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
68 dpavlin 1.2 return(-1);
69     }
70    
71     my @row;
72     while (@row = $result->fetchrow) {
73     $Stables{$row[0]} = 1;
74     }
75    
76 dpavlin 1.6 $result = $mconn->exec("BEGIN");
77 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
78     {
79 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
80     $mconn->exec("ROLLBACK");
81 dpavlin 1.2 return(-1);
82     }
83 dpavlin 1.6 $result = $mconn->exec("set transaction isolation level serializable");
84 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
85     {
86 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
87     $mconn->exec("ROLLBACK");
88 dpavlin 1.2 return(-1);
89     }
90    
91     # MAP oid --> tabname, keyname, key_type
92 dpavlin 1.6 $result = $mconn->exec("select pgc.oid, pgc.relname, pga.attname, pgt.typname" .
93 dpavlin 1.2 " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
94     ", pg_type pgt".
95     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
96     " AND pga.attnum = rt.key AND pga.atttypid=pgt.oid");
97     if ($result->resultStatus ne PGRES_TUPLES_OK)
98     {
99 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
100     $mconn->exec("ROLLBACK");
101 dpavlin 1.2 return(-1);
102     }
103    
104     while (@row = $result->fetchrow)
105     {
106 dpavlin 1.1 # printf "$row[0], $row[1], $row[2]\n";
107 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
108     next unless (exists $onlytables->{$row[1]});
109     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
110     }
111 dpavlin 1.2 push @{$Mtables{$row[0]}}, $row[1], $row[2], $row[3];
112     }
113    
114     # Read last succeeded sync
115 dpavlin 1.3 my $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
116 dpavlin 1.8 " where server = $sserver AND syncid = (select max(syncid) from" .
117     " _RSERV_SYNC_ where server = $sserver AND status > 0)";
118 dpavlin 1.2
119     printf "$sql\n" if $debug;
120    
121 dpavlin 1.6 $result = $mconn->exec($sql);
122 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
123     {
124 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
125     $mconn->exec("ROLLBACK");
126 dpavlin 1.2 return(-1);
127     }
128    
129     my @lastsync = $result->fetchrow;
130 dpavlin 1.9
131     # exclude data which originated from master server
132     my $sel_server = " and l.server = $mserver ";
133    
134 dpavlin 1.2 my $sinfo = "";
135 dpavlin 1.4 if (@lastsync && $lastsync[3] ne '') # sync info
136 dpavlin 1.2 {
137     $sinfo = "and (l.logid >= $lastsync[3]";
138     $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
139     $sinfo .= ")";
140     }
141    
142     my $havedeal = 0;
143    
144     # DELETED rows
145     $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
146 dpavlin 1.9 " where l.delete = 1 $sinfo $sel_server order by l.reloid";
147 dpavlin 1.2
148 dpavlin 1.9 printf "DELETED: $sql\n" if $debug;
149 dpavlin 1.2
150 dpavlin 1.6 $result = $mconn->exec($sql);
151 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
152     {
153 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
154     $mconn->exec("ROLLBACK");
155 dpavlin 1.2 return(-1);
156     }
157    
158 dpavlin 1.7 my $lastoid = -1;
159 dpavlin 1.2 while (@row = $result->fetchrow)
160     {
161     next unless exists $Mtables{$row[0]};
162     next unless exists $Stables{$Mtables{$row[0]}[0]};
163    
164     if ($lastoid != $row[0])
165     {
166 dpavlin 1.7 if ($lastoid == -1)
167 dpavlin 1.2 {
168 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
169 dpavlin 1.2 return($syncid) if $syncid < 0;
170     $havedeal = 1;
171     }
172     else
173     {
174     printf $outf "\\.\n";
175     }
176     printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
177     $lastoid = $row[0];
178     }
179     if (! defined $row[1])
180     {
181     print STDERR "NULL key\n" unless ($quiet);
182 dpavlin 1.6 $mconn->exec("ROLLBACK");
183 dpavlin 1.2 return(-2);
184     }
185     printf $outf "%s\n", OutputValue($row[1]);
186     }
187 dpavlin 1.7 printf $outf "\\.\n" if ($lastoid != -1);
188 dpavlin 1.2
189     # UPDATED rows
190    
191     my ($taboid, $tabname, $tabkey);
192     foreach $taboid (keys %Mtables)
193     {
194 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
195 dpavlin 1.2 next unless exists $Stables{$tabname};
196    
197     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
198    
199     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
200     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.update = 1".
201 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
202     $sel_server;
203 dpavlin 1.2
204 dpavlin 1.9 printf "UPDATED: $sql\n" if $debug;
205 dpavlin 1.2
206 dpavlin 1.6 $result = $mconn->exec($sql);
207 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
208     {
209 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
210 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
211     $mconn->exec("ROLLBACK");
212 dpavlin 1.2 return(-1);
213     }
214     next if $result->ntuples <= 0;
215     if (! $havedeal)
216     {
217 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
218 dpavlin 1.2 return($syncid) if $syncid < 0;
219     $havedeal = 1;
220 dpavlin 1.1 }
221 dpavlin 1.2 printf $outf "-- UPDATE $tabname\n";
222     printf "-- UPDATE $tabname\n" if $debug;
223     while (@row = $result->fetchrow)
224 dpavlin 1.1 {
225 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
226 dpavlin 1.2 {
227     printf $outf " " if $i;
228     printf " " if $i && $debug;
229     printf $outf "%s", OutputValue($row[$i]);
230     printf "%s", OutputValue($row[$i]) if $debug;;
231     }
232     printf $outf "\n";
233     printf "\n" if $debug;
234     }
235     printf $outf "\\.\n";
236     printf "\\.\n" if $debug;;
237     }
238    
239     # INSERTED rows
240    
241     foreach $taboid (keys %Mtables)
242     {
243 dpavlin 1.3 my ($tabname, $tabkey, $keytype) = @{$Mtables{$taboid}};
244 dpavlin 1.2 next unless exists $Stables{$tabname};
245    
246     my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
247    
248     $sql = sprintf "SELECT $oidkey \"_$tabname\".* FROM \"$tabname\" ".
249     "\"_$tabname\", _RSERV_LOG_ l WHERE l.reloid = $taboid AND l.insert = 1".
250 dpavlin 1.9 " $sinfo AND \"_$tabname\".\"${tabkey}\"=l.key::${keytype}".
251     $sel_server;
252 dpavlin 1.2
253 dpavlin 1.9 printf "INSERTED: $sql\n" if $debug;
254 dpavlin 1.2
255 dpavlin 1.6 $result = $mconn->exec($sql);
256 dpavlin 1.1 if ($result->resultStatus ne PGRES_TUPLES_OK)
257     {
258 dpavlin 1.2 printf $outf "-- ERROR\n" if $havedeal;
259 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
260     $mconn->exec("ROLLBACK");
261 dpavlin 1.2 return(-1);
262     }
263     next if $result->ntuples <= 0;
264     if (! $havedeal)
265     {
266 dpavlin 1.6 my $syncid = GetSYNCID($mconn, $outf);
267 dpavlin 1.2 return($syncid) if $syncid < 0;
268     $havedeal = 1;
269 dpavlin 1.1 }
270 dpavlin 1.2 printf $outf "-- INSERT $tabname\n";
271     printf "-- INSERT $tabname\n" if $debug;
272 dpavlin 1.1 while (@row = $result->fetchrow)
273     {
274 dpavlin 1.3 for (my $i = 0; $i <= $#row; $i++)
275 dpavlin 1.2 {
276     printf $outf " " if $i;
277     printf " " if $i && $debug;
278     printf $outf "%s", OutputValue($row[$i]);
279     printf "%s", OutputValue($row[$i]) if $debug;;
280     }
281     printf $outf "\n";
282     printf "\n" if $debug;
283     }
284     printf $outf "\\.\n";
285     printf "\\.\n" if $debug;;
286     }
287    
288    
289     unless ($havedeal)
290     {
291 dpavlin 1.6 $mconn->exec("ROLLBACK");
292 dpavlin 1.2 return(0);
293     }
294    
295     # Remember this snapshot info
296 dpavlin 1.8 $result = $mconn->exec("select _rserv_sync_($sserver)");
297 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
298     {
299     printf $outf "-- ERROR\n";
300 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
301     $mconn->exec("ROLLBACK");
302 dpavlin 1.2 return(-1);
303     }
304    
305 dpavlin 1.6 $result = $mconn->exec("COMMIT");
306 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
307     {
308     printf $outf "-- ERROR\n";
309 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
310     $mconn->exec("ROLLBACK");
311 dpavlin 1.2 return(-1);
312     }
313     printf $outf "-- OK\n";
314     printf "-- OK\n" if $debug;
315    
316     return(1);
317    
318 dpavlin 1.1 }
319    
320     sub OutputValue
321     {
322     my ($val) = @_; # @_[0];
323    
324     return("\\N") unless defined $val;
325    
326     $val =~ s/\\/\\\\/g;
327     $val =~ s/ /\\011/g;
328     $val =~ s/\n/\\012/g;
329     $val =~ s/\'/\\047/g;
330    
331     return($val);
332     }
333    
334     # Get syncid for new snapshot
335     sub GetSYNCID
336     {
337 dpavlin 1.2 my ($conn, $outf) = @_; # (@_[0], @_[1]);
338    
339     my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
340     if ($result->resultStatus ne PGRES_TUPLES_OK)
341     {
342     print STDERR $conn->errorMessage unless ($quiet);
343     $conn->exec("ROLLBACK");
344     return(-1);
345     }
346    
347     my @row = $result->fetchrow;
348    
349     printf $outf "-- SYNCID $row[0]\n";
350     printf "-- SYNCID $row[0]\n" if $debug;
351     return($row[0]);
352 dpavlin 1.1 }
353    
354    
355     sub CleanLog
356     {
357 dpavlin 1.5 my ($conn, $howold, $onlytables) = @_; # (@_[0], @_[1]);
358 dpavlin 1.2
359     my $result = $conn->exec("BEGIN");
360     if ($result->resultStatus ne PGRES_COMMAND_OK)
361     {
362     print STDERR $conn->errorMessage unless ($quiet);
363     $conn->exec("ROLLBACK");
364     return(-1);
365     }
366    
367     my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
368     " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
369     " where rs2.server = rs.server AND rs2.status > 0) order by rs.maxid";
370    
371     printf "$sql\n" if $debug;
372    
373     $result = $conn->exec($sql);
374     if ($result->resultStatus ne PGRES_TUPLES_OK)
375     {
376     print STDERR $conn->errorMessage unless ($quiet);
377     return(-1);
378     }
379     my $maxid = '';
380     my %active = ();
381     while (my @row = $result->fetchrow)
382     {
383     $maxid = $row[0] if $maxid eq '';
384     last if $row[0] > $maxid;
385     my @ids = split(/[ ]+,[ ]+/, $row[1]);
386 dpavlin 1.3 foreach my $aid (@ids)
387 dpavlin 1.2 {
388     $active{$aid} = 1 unless exists $active{$aid};
389     }
390     }
391     if ($maxid eq '')
392     {
393     print STDERR "No Sync IDs\n" unless ($quiet);
394     return(0);
395     }
396     my $alist = join(',', keys %active);
397     my $sinfo = "logid < $maxid";
398     $sinfo .= " AND logid not in ($alist)" if $alist ne '';
399 dpavlin 1.5 #if (ref($onlytables) eq 'HASH') {
400     # foreach my $onlytable (keys %{$onlytables}) {
401     # $sinfo
402     # }
403     #}
404 dpavlin 1.2 $sql = "delete from _RSERV_LOG_ where " .
405     "logtime < now() - '$howold second'::interval AND $sinfo";
406    
407     printf "$sql\n" if $debug;
408    
409     $result = $conn->exec($sql);
410     if ($result->resultStatus ne PGRES_COMMAND_OK)
411     {
412     print STDERR $conn->errorMessage unless ($quiet);
413     $conn->exec("ROLLBACK");
414     return(-1);
415     }
416     $maxid = $result->cmdTuples;
417    
418     $result = $conn->exec("COMMIT");
419     if ($result->resultStatus ne PGRES_COMMAND_OK)
420     {
421     print STDERR $conn->errorMessage unless ($quiet);
422     $conn->exec("ROLLBACK");
423     return(-1);
424     }
425    
426     return($maxid);
427     }
428 dpavlin 1.1
429 dpavlin 1.2 sub ApplySnapshot
430     {
431 dpavlin 1.6 my ($sconn, $inpf, $onlytables) = @_; # (@_[0], @_[1]);
432 dpavlin 1.2
433 dpavlin 1.6 my $result = $sconn->exec("BEGIN");
434 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
435     {
436 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
437     $sconn->exec("ROLLBACK");
438 dpavlin 1.2 return(-1);
439     }
440    
441 dpavlin 1.6 $result = $sconn->exec("SET CONSTRAINTS ALL DEFERRED");
442 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
443     {
444 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
445     $sconn->exec("ROLLBACK");
446 dpavlin 1.2 return(-1);
447     }
448    
449     # MAP name --> oid, keyname, keynum
450     my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
451     " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
452     " where pgc.oid = rt.reloid AND pga.attrelid = rt.reloid" .
453     " AND pga.attnum = rt.key";
454    
455 dpavlin 1.6 $result = $sconn->exec($sql);
456 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
457     {
458 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
459     $sconn->exec("ROLLBACK");
460 dpavlin 1.2 return(-1);
461     }
462     %Stables = ();
463 dpavlin 1.3 while (my @row = $result->fetchrow)
464 dpavlin 1.2 {
465     # printf "\t%s\t%s\t%s\t\n", $row[1], $row[0], $row[2], $row[3];
466 dpavlin 1.5 if (ref($onlytables) eq 'HASH') {
467     next unless (exists $onlytables->{$row[1]});
468     $onlytables->{$row[1]} = $row[0] unless ($onlytables->{$row[1]});
469     }
470 dpavlin 1.2 push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
471     }
472    
473     my $ok = 0;
474 dpavlin 1.9 my $syncid = -1;
475 dpavlin 1.2 while(<$inpf>)
476     {
477     $_ =~ s/\n//;
478     my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
479 dpavlin 1.11 die "FATAL: snapshot format unknown or snapshot corrupted!" if (! $cmt);
480 dpavlin 1.2 if ($cmt ne '--')
481     {
482     printf STDERR "Invalid format\n" unless ($quiet);
483 dpavlin 1.6 $sconn->exec("ROLLBACK");
484 dpavlin 1.2 return(-2);
485     }
486     if ($cmd eq 'DELETE')
487     {
488 dpavlin 1.9 if ($syncid == -1)
489 dpavlin 1.2 {
490     printf STDERR "Sync ID unspecified\n" unless ($quiet);
491 dpavlin 1.6 $sconn->exec("ROLLBACK");
492 dpavlin 1.2 return(-2);
493     }
494 dpavlin 1.6 $result = DoDelete($sconn, $inpf, $prm);
495 dpavlin 1.2 if ($result)
496     {
497 dpavlin 1.6 $sconn->exec("ROLLBACK");
498 dpavlin 1.2 return($result);
499     }
500     }
501     elsif ($cmd eq 'INSERT')
502     {
503 dpavlin 1.9 if ($syncid == -1)
504 dpavlin 1.2 {
505     printf STDERR "Sync ID unspecified\n" unless ($quiet);
506 dpavlin 1.6 $sconn->exec("ROLLBACK");
507 dpavlin 1.2 return(-2);
508     }
509 dpavlin 1.6 $result = DoInsert($sconn, $inpf, $prm);
510 dpavlin 1.2 if ($result)
511     {
512 dpavlin 1.6 $sconn->exec("ROLLBACK");
513 dpavlin 1.2 return($result);
514     }
515     }
516     elsif ($cmd eq 'UPDATE')
517     {
518 dpavlin 1.9 if ($syncid == -1)
519 dpavlin 1.2 {
520     printf STDERR "Sync ID unspecified\n" unless ($quiet);
521 dpavlin 1.6 $sconn->exec("ROLLBACK");
522 dpavlin 1.2 return(-2);
523     }
524 dpavlin 1.6 $result = DoUpdate($sconn, $inpf, $prm);
525 dpavlin 1.2 if ($result)
526     {
527 dpavlin 1.6 $sconn->exec("ROLLBACK");
528 dpavlin 1.2 return($result);
529     }
530     }
531     elsif ($cmd eq 'SYNCID')
532     {
533 dpavlin 1.9 if ($syncid != -1)
534 dpavlin 1.2 {
535     printf STDERR "Second Sync ID ?!\n" unless ($quiet);
536 dpavlin 1.6 $sconn->exec("ROLLBACK");
537 dpavlin 1.2 return(-2);
538     }
539     if ($prm !~ /^\d+$/)
540     {
541     printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
542 dpavlin 1.6 $sconn->exec("ROLLBACK");
543 dpavlin 1.2 return(-2);
544     }
545     $syncid = $prm;
546    
547     printf STDERR "Sync ID $syncid\n" unless ($quiet);
548    
549 dpavlin 1.6 $result = $sconn->exec("select syncid, synctime from " .
550 dpavlin 1.2 "_RSERV_SLAVE_SYNC_ where syncid = " .
551     "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
552     if ($result->resultStatus ne PGRES_TUPLES_OK)
553     {
554 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
555     $sconn->exec("ROLLBACK");
556 dpavlin 1.1 return(-1);
557 dpavlin 1.2 }
558     my @row = $result->fetchrow;
559     if (! defined $row[0])
560     {
561 dpavlin 1.6 $result = $sconn->exec("insert into _RSERV_SLAVE_SYNC_ ".
562 dpavlin 1.2 "(syncid, synctime) values ($syncid, now())");
563     }
564     elsif ($row[0] >= $prm)
565     {
566     printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
567 dpavlin 1.6 $sconn->exec("ROLLBACK");
568 dpavlin 1.2 return(0);
569     }
570     else
571     {
572 dpavlin 1.6 $result = $sconn->exec("update _RSERV_SLAVE_SYNC_" .
573 dpavlin 1.2 " set syncid = $syncid, synctime = now()");
574     }
575     if ($result->resultStatus ne PGRES_COMMAND_OK)
576     {
577 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
578     $sconn->exec("ROLLBACK");
579 dpavlin 1.1 return(-1);
580 dpavlin 1.2 }
581     }
582     elsif ($cmd eq 'OK')
583     {
584     $ok = 1;
585     last;
586     }
587     elsif ($cmd eq 'ERROR')
588     {
589     printf STDERR "ERROR signaled\n" unless ($quiet);
590 dpavlin 1.6 $sconn->exec("ROLLBACK");
591 dpavlin 1.2 return(-2);
592     }
593     else
594     {
595     printf STDERR "Unknown command $cmd\n" unless ($quiet);
596 dpavlin 1.6 $sconn->exec("ROLLBACK");
597 dpavlin 1.2 return(-2);
598     }
599     }
600    
601     if (! $ok)
602     {
603     printf STDERR "No OK flag in input\n" unless ($quiet);
604 dpavlin 1.6 $sconn->exec("ROLLBACK");
605 dpavlin 1.2 return(-2);
606     }
607    
608 dpavlin 1.6 $result = $sconn->exec("COMMIT");
609 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
610     {
611 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
612     $sconn->exec("ROLLBACK");
613 dpavlin 1.2 return(-1);
614     }
615    
616     return(1);
617 dpavlin 1.1 }
618    
619     sub DoDelete
620     {
621 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
622 dpavlin 1.2
623     # only delete tables that the slave wants
624     if (! defined($Stables{$tabname})) {
625     print "Not configured to delete rows from table $tabname\n" unless $quiet;
626     while (<$inpf>) {
627 dpavlin 1.3 my $istring = $_;
628 dpavlin 1.2 $istring =~ s/\n//;
629     last if ($istring eq '\.');
630     }
631     return(0);
632     }
633 dpavlin 1.1
634 dpavlin 1.2 my $ok = 0;
635     while(<$inpf>)
636     {
637     if ($_ !~ /\n$/)
638     {
639     printf STDERR "Invalid format\n" unless ($quiet);
640     return(-2);
641     }
642     my $key = $_;
643     $key =~ s/\n//;
644     if ($key eq '\.')
645 dpavlin 1.1 {
646 dpavlin 1.2 $ok = 1;
647     last;
648 dpavlin 1.1 }
649 dpavlin 1.2
650     my $sql = "delete from \"$tabname\" where ".
651     "\"$Stables{$tabname}->[1]\" = '$key'";
652    
653     printf "$sql\n" if $debug;
654    
655 dpavlin 1.6 my $result = $sconn->exec($sql);
656 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
657 dpavlin 1.1 {
658 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
659 dpavlin 1.2 return(-1);
660 dpavlin 1.1 }
661 dpavlin 1.2 }
662    
663     if (! $ok)
664     {
665     printf STDERR "No end of input in DELETE section\n" unless ($quiet);
666     return(-2);
667     }
668    
669     return(0);
670 dpavlin 1.1 }
671    
672    
673     sub DoUpdate
674     {
675 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
676 dpavlin 1.1
677 dpavlin 1.2 # only update the tables that the slave wants
678     if (! defined($Stables{$tabname})) {
679     print "Not configured to update rows from table $tabname\n" unless $quiet;
680     while (<$inpf>) {
681 dpavlin 1.3 my $istring = $_;
682 dpavlin 1.2 $istring =~ s/\n//;
683     last if ($istring eq '\.');
684     }
685     return(0);
686     }
687 dpavlin 1.1
688 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
689    
690     my @CopyBuf = ();
691     my $CBufLen = 0;
692     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
693    
694     my $sql = "select attnum, attname from pg_attribute" .
695     " where attrelid = $Stables{$tabname}->[0] AND attnum > 0";
696    
697 dpavlin 1.6 my $result = $sconn->exec($sql);
698 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
699     {
700 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
701 dpavlin 1.2 return(-1);
702     }
703    
704     my @anames = ();
705 dpavlin 1.3 while (my @row = $result->fetchrow)
706 dpavlin 1.2 {
707     $anames[$row[0]] = $row[1];
708     }
709    
710     my $istring;
711     my $ok = 0;
712     while(<$inpf>)
713     {
714     if ($_ !~ /\n$/)
715     {
716     printf STDERR "Invalid format\n" unless ($quiet);
717     return(-2);
718     }
719     $istring = $_;
720     $istring =~ s/\n//;
721     if ($istring eq '\.')
722     {
723     $ok = 1;
724     last;
725     }
726     my @vals = split(/ /, $istring);
727     if ($oidkey)
728     {
729     if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
730     {
731     printf STDERR "Invalid OID\n" unless ($quiet);
732     return(-2);
733     }
734     $oidkey = $vals[0];
735     }
736     else
737 dpavlin 1.1 {
738 dpavlin 1.2 unshift @vals, '';
739     }
740    
741     $sql = "update \"$tabname\" set ";
742     my $ocnt = 0;
743     for (my $i = 1; $i <= $#anames; $i++)
744     {
745     if ($vals[$i] eq '\N')
746     {
747     if ($i == $Stables{$tabname}->[2])
748     {
749     printf STDERR "NULL key\n" unless ($quiet);
750     return(-2);
751     }
752     $vals[$i] = 'null';
753     }
754     else
755     {
756     $vals[$i] = "'" . $vals[$i] . "'";
757     next if $i == $Stables{$tabname}->[2];
758     }
759     $ocnt++;
760     $sql .= ', ' if $ocnt > 1;
761     $sql .= "\"$anames[$i]\" = $vals[$i]";
762 dpavlin 1.1 }
763 dpavlin 1.2 if ($oidkey)
764 dpavlin 1.1 {
765 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = $oidkey";
766 dpavlin 1.1 }
767 dpavlin 1.2 else
768 dpavlin 1.1 {
769 dpavlin 1.2 $sql .= " where \"$Stables{$tabname}->[1]\" = ".
770     $vals[$Stables{$tabname}->[2]];
771 dpavlin 1.1 }
772 dpavlin 1.2
773     printf "$sql\n" if $debug;
774    
775 dpavlin 1.6 $result = $sconn->exec($sql);
776 dpavlin 1.2
777     if ($result->resultStatus ne PGRES_COMMAND_OK)
778 dpavlin 1.1 {
779 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
780 dpavlin 1.2 return(-1);
781 dpavlin 1.1 }
782 dpavlin 1.2 next if $result->cmdTuples == 1; # updated
783    
784     if ($result->cmdTuples > 1)
785     {
786     printf STDERR "Duplicate keys\n" unless ($quiet);
787     return(-2);
788     }
789    
790     # no key - copy
791     push @CopyBuf, "$istring\n";
792     $CBufLen += length($istring);
793    
794     if ($CBufLen >= $CBufMax)
795 dpavlin 1.1 {
796 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
797 dpavlin 1.2 return($result) if $result;
798     @CopyBuf = ();
799     $CBufLen = 0;
800     }
801     }
802    
803     if (! $ok)
804     {
805     printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
806     return(-2);
807     }
808    
809     if ($CBufLen)
810     {
811     print "@CopyBuf\n" if $debug;
812 dpavlin 1.6 $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
813 dpavlin 1.2 return($result) if $result;
814     }
815 dpavlin 1.1
816 dpavlin 1.2 return(0);
817 dpavlin 1.1 }
818    
819 dpavlin 1.2 sub DoInsert
820 dpavlin 1.1 {
821 dpavlin 1.6 my ($sconn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
822 dpavlin 1.1
823 dpavlin 1.2 # only insert rows into tables that the slave wants
824     if (! defined($Stables{$tabname})) {
825     print "Not configured to insert rows from table $tabname\n" unless $quiet;
826     while (<$inpf>) {
827 dpavlin 1.3 my $istring = $_;
828 dpavlin 1.2 $istring =~ s/\n//;
829     last if ($istring eq '\.');
830 dpavlin 1.1 }
831 dpavlin 1.2 return(0);
832     }
833 dpavlin 1.1
834 dpavlin 1.2 my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
835    
836     my @CopyBuf = ();
837     my $CBufLen = 0;
838     my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
839    
840     my $istring;
841     my $ok = 0;
842     while(<$inpf>)
843     {
844     if ($_ !~ /\n$/)
845     {
846     printf STDERR "Invalid format\n" unless ($quiet);
847     return(-2);
848     }
849     $istring = $_;
850     $istring =~ s/\n//;
851     if ($istring eq '\.')
852     {
853     $ok = 1;
854     last;
855     }
856    
857     # no key - copy
858     push @CopyBuf, "$istring\n";
859     $CBufLen += length($istring);
860    
861     if ($CBufLen >= $CBufMax)
862 dpavlin 1.1 {
863 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
864 dpavlin 1.2 return($result) if $result;
865     @CopyBuf = ();
866     $CBufLen = 0;
867     }
868     }
869    
870     if (! $ok)
871     {
872     printf STDERR "No end of input in INSERT section\n" unless ($quiet);
873     return(-2);
874     }
875    
876     if ($CBufLen)
877     {
878     print "@CopyBuf\n" if $debug;
879 dpavlin 1.6 my $result = DoCopy($sconn, $tabname, $oidkey, \@CopyBuf);
880 dpavlin 1.2 return($result) if $result;
881     }
882    
883     return(0);
884     }
885 dpavlin 1.1
886    
887 dpavlin 1.2 sub DoCopy
888     {
889 dpavlin 1.6 my ($sconn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
890 dpavlin 1.2
891     my $sql = "COPY \"$tabname\" " . (($withoids) ? "WITH OIDS " : '') .
892     "FROM STDIN";
893 dpavlin 1.6 my $result = $sconn->exec($sql);
894 dpavlin 1.2 if ($result->resultStatus ne PGRES_COPY_IN)
895     {
896 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
897 dpavlin 1.2 return(-1);
898     }
899    
900 dpavlin 1.3 foreach my $str (@{$CBuf})
901 dpavlin 1.2 {
902 dpavlin 1.6 $sconn->putline($str);
903 dpavlin 1.2 }
904    
905 dpavlin 1.6 $sconn->putline("\\.\n");
906 dpavlin 1.2
907 dpavlin 1.6 if ($sconn->endcopy)
908 dpavlin 1.2 {
909 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
910 dpavlin 1.2 return(-1);
911     }
912    
913     return(0);
914 dpavlin 1.1 }
915    
916    
917     #
918     # Returns last SyncID applied on Slave
919     #
920     sub GetSyncID
921     {
922 dpavlin 1.6 my ($sconn) = @_; # (@_[0]);
923 dpavlin 1.2
924 dpavlin 1.6 my $result = $sconn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
925 dpavlin 1.2 if ($result->resultStatus ne PGRES_TUPLES_OK)
926     {
927 dpavlin 1.6 print STDERR $sconn->errorMessage unless ($quiet);
928 dpavlin 1.2 return(-1);
929     }
930     my @row = $result->fetchrow;
931     return(undef) unless defined $row[0]; # null
932     return($row[0]);
933 dpavlin 1.1 }
934    
935     #
936     # Updates _RSERV_SYNC_ on Master with Slave SyncID
937     #
938     sub SyncSyncID
939     {
940 dpavlin 1.8 my ($mconn, $sserver, $syncid) = @_; # (@_[0], @_[1], @_[2]);
941 dpavlin 1.2
942 dpavlin 1.6 my $result = $mconn->exec("BEGIN");
943 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
944     {
945 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
946     $mconn->exec("ROLLBACK");
947 dpavlin 1.2 return(-1);
948     }
949    
950 dpavlin 1.6 $result = $mconn->exec("select synctime, status from _RSERV_SYNC_" .
951 dpavlin 1.8 " where server = $sserver AND syncid = $syncid" .
952 dpavlin 1.2 " for update");
953     if ($result->resultStatus ne PGRES_TUPLES_OK)
954     {
955 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
956     $mconn->exec("ROLLBACK");
957 dpavlin 1.2 return(-1);
958     }
959     my @row = $result->fetchrow;
960     if (! defined $row[0])
961     {
962 dpavlin 1.8 printf STDERR "No SyncID $syncid found for server $sserver\n" unless ($quiet);
963 dpavlin 1.6 $mconn->exec("ROLLBACK");
964 dpavlin 1.2 return(0);
965     }
966     if ($row[1] > 0)
967     {
968     printf STDERR "SyncID $syncid for server ".
969 dpavlin 1.8 "$sserver already updated\n" unless ($quiet);
970 dpavlin 1.6 $mconn->exec("ROLLBACK");
971 dpavlin 1.2 return(0);
972     }
973 dpavlin 1.6 $result = $mconn->exec("update _RSERV_SYNC_" .
974 dpavlin 1.2 " set synctime = now(), status = 1" .
975 dpavlin 1.8 " where server = $sserver AND syncid = $syncid");
976 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
977     {
978 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
979     $mconn->exec("ROLLBACK");
980 dpavlin 1.2 return(-1);
981     }
982 dpavlin 1.6 $result = $mconn->exec("delete from _RSERV_SYNC_" .
983 dpavlin 1.8 " where server = $sserver AND syncid < $syncid");
984 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
985     {
986 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
987     $mconn->exec("ROLLBACK");
988 dpavlin 1.2 return(-1);
989     }
990    
991 dpavlin 1.6 $result = $mconn->exec("COMMIT");
992 dpavlin 1.2 if ($result->resultStatus ne PGRES_COMMAND_OK)
993     {
994 dpavlin 1.6 print STDERR $mconn->errorMessage unless ($quiet);
995     $mconn->exec("ROLLBACK");
996 dpavlin 1.2 return(-1);
997     }
998    
999     return(1);
1000 dpavlin 1.1 }
1001    
1002 dpavlin 1.11 # stuff moved from perl scripts for better re-use
1003    
1004     sub Rollback {
1005     my $conn = shift @_;
1006    
1007     print STDERR $conn->errorMessage;
1008     $conn->exec("ROLLBACK");
1009     }
1010    
1011     sub RollbackAndQuit {
1012     my $conn = shift @_;
1013    
1014     Rollback($conn);
1015     exit (-1);
1016     }
1017    
1018     sub Connect {
1019     my $info = shift @_;
1020    
1021     print("Connecting to $info\n") if ($debug || $verbose);
1022     my $conn = Pg::connectdb($info);
1023     if ($conn->status != PGRES_CONNECTION_OK) {
1024     print STDERR "Failed opening $info\n";
1025     exit 1;
1026     }
1027     return $conn;
1028     }
1029    
1030     sub Exec {
1031     my $conn = shift @_;
1032     my $sql = shift @_;
1033    
1034     my $result = $conn->exec($sql);
1035     print STDERR "$sql\n" if ($debug);
1036     RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1037     }
1038    
1039     sub Exec2 {
1040     my $mconn = shift @_;
1041     my $sconn = shift @_;
1042     my $sql = shift @_;
1043    
1044     my $result = $mconn->exec($sql);
1045     RollbackAndQuit($mconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1046     $result = $sconn->exec($sql);
1047     RollbackAndQuit($sconn) if ($result->resultStatus ne PGRES_COMMAND_OK);
1048     }
1049    
1050 dpavlin 1.1 1;

  ViewVC Help
Powered by ViewVC 1.1.26