/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (hide annotations)
Tue Aug 5 09:52:39 2003 UTC (20 years, 7 months ago) by dpavlin
Branch: MAIN
Changes since 1.1: +258 -190 lines
File MIME type: text/plain
rserv 0.2 changes by Nélio Alves Pereira Filho

1 dpavlin 1.1 /* rserv.c
2     * Support functions for erServer replication.
3     * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4     */
5    
6     #include "executor/spi.h" /* this is what you need to work with SPI */
7     #include "commands/trigger.h" /* -"- and triggers */
8     #include "utils/tqual.h" /* -"- and SnapshotData */
9     #include <ctype.h> /* tolower () */
10    
11     #ifdef PG_FUNCTION_INFO_V1
12     #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13     #endif
14    
15     #ifdef PG_FUNCTION_INFO_V1
16     PG_FUNCTION_INFO_V1(_rserv_log_);
17     PG_FUNCTION_INFO_V1(_rserv_sync_);
18     PG_FUNCTION_INFO_V1(_rserv_debug_);
19 dpavlin 1.2 Datum _rserv_log_(PG_FUNCTION_ARGS);
20     Datum _rserv_sync_(PG_FUNCTION_ARGS);
21     Datum _rserv_debug_(PG_FUNCTION_ARGS);
22    
23 dpavlin 1.1 #else
24     HeapTuple _rserv_log_(void);
25     int32 _rserv_sync_(int32);
26     int32 _rserv_debug_(int32);
27     #endif
28    
29     static int debug = 0;
30    
31 dpavlin 1.2 static char *OutputValue(char *key, char *buf, int size);
32 dpavlin 1.1
33     #ifdef PG_FUNCTION_INFO_V1
34 dpavlin 1.2 Datum _rserv_log_(PG_FUNCTION_ARGS)
35 dpavlin 1.1 #else
36 dpavlin 1.2 HeapTuple _rserv_log_()
37 dpavlin 1.1 #endif
38     {
39 dpavlin 1.2 Trigger *trigger; /* to get trigger name */
40     int nargs; /* # of args specified in CREATE TRIGGER */
41     char **args; /* argument: argnum */
42     Relation rel; /* triggered relation */
43     HeapTuple tuple; /* tuple to return */
44     HeapTuple newtuple = NULL; /* tuple to return */
45     TupleDesc tupdesc; /* tuple description */
46     int keynum;
47     char *key;
48     char *okey;
49     char *newkey = NULL;
50     int deleted, inserted, updated;
51     char sql[8192];
52     char outbuf[8192];
53     char oidbuf[64];
54     int ret;
55    
56     /* Called by trigger manager ? */
57     if (!CurrentTriggerData)
58     elog(ERROR, "_rserv_log_: triggers are not initialized");
59    
60     /* Should be called for ROW trigger */
61     if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
62     elog(ERROR, "_rserv_log_: can't process STATEMENT events");
63    
64     tuple = CurrentTriggerData->tg_trigtuple;
65    
66     trigger = CurrentTriggerData->tg_trigger;
67     nargs = trigger->tgnargs;
68     args = trigger->tgargs;
69    
70     if (nargs != 1) /* odd number of arguments! */
71     elog(ERROR, "_rserv_log_: need in *one* argument");
72    
73     keynum = atoi(args[0]);
74    
75     if (keynum < 0 && keynum != ObjectIdAttributeNumber)
76     elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
77    
78     rel = CurrentTriggerData->tg_relation;
79     tupdesc = rel->rd_att;
80    
81     deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ? 1 : 0;
82     inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
83     // updated = (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) ? 1 : 0;
84    
85     updated = 0;
86     if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
87     updated = 1;
88     newtuple = CurrentTriggerData->tg_newtuple;
89     }
90    
91     #ifndef PG_FUNCTION_INFO_V1
92    
93     /*
94     * Setting CurrentTriggerData to NULL prevents direct calls to trigger
95     * functions in queries. Normally, trigger functions have to be called
96     * by trigger manager code only.
97     */
98     CurrentTriggerData = NULL;
99     #endif
100    
101     /* Connect to SPI manager */
102     if ((ret = SPI_connect()) < 0)
103     elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
104    
105     if (keynum == ObjectIdAttributeNumber) {
106     sprintf(oidbuf, "%u", tuple->t_data->t_oid);
107     key = oidbuf;
108     } else {
109     key = SPI_getvalue(tuple, tupdesc, keynum);
110     }
111    
112     if (key == NULL)
113     elog(ERROR, "_rserv_log_: key must be not null");
114    
115     if (newtuple && keynum != ObjectIdAttributeNumber) {
116     newkey = SPI_getvalue(newtuple, tupdesc, keynum);
117     if (newkey == NULL)
118     elog(ERROR, "_rserv_log_: key must be not null");
119     if (strcmp(newkey, key) == 0)
120     newkey = NULL;
121     else
122     deleted = 1; /* old key was deleted */
123     }
124    
125     if (strpbrk(key, "\\ \n'"))
126     okey = OutputValue(key, outbuf, sizeof(outbuf));
127     else
128     okey = key;
129    
130    
131     /**
132     versao onde verifica-se primeiro se o registro existe
133     para decidir se faz insert ou update em _RSERV_LOG_
134     **********************/
135    
136     #ifdef ASKFORUPDATE
137     sprintf(sql, "SELECT true WHERE EXISTS (SELECT logid from _RSERV_LOG_ "
138     "WHERE reloid = %u AND key = '%s')",
139     rel->rd_id, okey);
140    
141     if (debug)
142     elog(NOTICE, sql);
143    
144     if ((ret = SPI_exec(sql, 1)) < 0) {
145     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
146     }
147    
148     if (SPI_processed > 0) {
149     sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
150     "insert = %d, update = %d, delete = %d where reloid = %u "
151     "and key = '%s'",
152     GetCurrentTransactionId(),
153     inserted, updated, deleted, rel->rd_id, okey);
154    
155     if (debug)
156     elog(NOTICE, sql);
157    
158     ret = SPI_exec(sql, 0);
159    
160     if (ret < 0)
161     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
162    
163     if (SPI_processed > 1)
164     elog(ERROR, "_rserv_log_: duplicate tuples");
165     } else {
166     sprintf(sql, "insert into _RSERV_LOG_ "
167     "(reloid, logid, logtime, insert, update, delete, key) "
168     "values (%u, %d, now(), %d, %d, %d, '%s')",
169     rel->rd_id, GetCurrentTransactionId(),
170     inserted, updated, deleted, okey);
171    
172     if (debug)
173     elog(NOTICE, sql);
174    
175     ret = SPI_exec(sql, 0);
176    
177     if (ret < 0)
178     elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
179     }
180 dpavlin 1.1
181 dpavlin 1.2 #else
182    
183     sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
184     "insert = %d, update = %d, delete = %d where reloid = %u "
185     "and key = '%s'",
186     GetCurrentTransactionId(),
187     inserted, updated, deleted, rel->rd_id, okey);
188    
189     if (debug)
190     elog(NOTICE, sql);
191    
192     ret = SPI_exec(sql, 0);
193    
194     if (ret < 0)
195     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
196    
197    
198     /*
199     * If no tuple was UPDATEd then do INSERT...
200     */
201    
202    
203     if (SPI_processed > 1)
204     elog(ERROR, "_rserv_log_: duplicate tuples");
205     else if (SPI_processed == 0) {
206     sprintf(sql, "insert into _RSERV_LOG_ "
207     "(reloid, logid, logtime, insert, update, delete, key) "
208     "values (%u, %d, now(), %d, %d, %d, '%s')",
209     rel->rd_id, GetCurrentTransactionId(),
210     inserted, updated, deleted, okey);
211    
212     if (debug)
213     elog(NOTICE, sql);
214    
215     ret = SPI_exec(sql, 0);
216    
217     if (ret < 0)
218     elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
219     }
220    
221     #endif
222    
223     if (okey != key && okey != outbuf)
224     pfree(okey);
225    
226     if (newkey) {
227     if (strpbrk(newkey, "\\ \n'"))
228     okey = OutputValue(newkey, outbuf, sizeof(outbuf));
229     else
230     okey = newkey;
231    
232     sprintf(sql, "insert into _RSERV_LOG_ "
233     "(reloid, logid, logtime, insert, update, delete, key) "
234     "values (%u, %d, now(), %d, %d, 0, '%s')",
235     rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey);
236    
237     if (debug)
238     elog(NOTICE, sql);
239    
240     ret = SPI_exec(sql, 0);
241    
242     if (ret < 0)
243     elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
244    
245     if (okey != newkey && okey != outbuf)
246     pfree(okey);
247     }
248    
249     SPI_finish();
250    
251 dpavlin 1.1 #ifdef PG_FUNCTION_INFO_V1
252 dpavlin 1.2 return (PointerGetDatum(tuple));
253 dpavlin 1.1 #else
254 dpavlin 1.2 return (tuple);
255 dpavlin 1.1 #endif
256     }
257    
258     #ifdef PG_FUNCTION_INFO_V1
259     Datum
260     _rserv_sync_(PG_FUNCTION_ARGS)
261     #else
262     int32
263     _rserv_sync_(int32 server)
264     #endif
265     {
266     #ifdef PG_FUNCTION_INFO_V1
267 dpavlin 1.2 int32 server = PG_GETARG_INT32(0);
268 dpavlin 1.1 #endif
269 dpavlin 1.2 char sql[8192];
270     char buf[8192];
271     char *active = buf;
272     uint32 xcnt;
273     int ret;
274 dpavlin 1.1
275     if (SerializableSnapshot == NULL)
276     elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
277    
278     buf[0] = 0;
279     for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
280     {
281     sprintf(buf + strlen(buf), "%s%u", (xcnt) ? ", " : "",
282 dpavlin 1.2 SerializableSnapshot->xip[xcnt]);
283 dpavlin 1.1 }
284    
285     if ((ret = SPI_connect()) < 0)
286     elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
287    
288     sprintf(sql, "insert into _RSERV_SYNC_ "
289     "(server, syncid, synctime, status, minid, maxid, active) "
290 dpavlin 1.2 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
291 dpavlin 1.1 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
292    
293     ret = SPI_exec(sql, 0);
294    
295     if (ret < 0)
296     elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
297    
298     SPI_finish();
299    
300     return (0);
301     }
302    
303     #ifdef PG_FUNCTION_INFO_V1
304     Datum
305     _rserv_debug_(PG_FUNCTION_ARGS)
306     #else
307     int32
308     _rserv_debug_(int32 newval)
309     #endif
310     {
311     #ifdef PG_FUNCTION_INFO_V1
312 dpavlin 1.2 int32 newval = PG_GETARG_INT32(0);
313 dpavlin 1.1 #endif
314 dpavlin 1.2 int32 oldval = debug;
315 dpavlin 1.1
316     debug = newval;
317    
318     return (oldval);
319     }
320    
321 dpavlin 1.2 #define ExtendBy 1024
322 dpavlin 1.1
323 dpavlin 1.2 static char *
324 dpavlin 1.1 OutputValue(char *key, char *buf, int size)
325     {
326     int i = 0;
327     char *out = buf;
328     char *subst = NULL;
329     int slen = 0;
330    
331     size--;
332 dpavlin 1.2 for (;;)
333 dpavlin 1.1 {
334     switch (*key)
335     {
336 dpavlin 1.2 case '\\':
337     subst = "\\\\";
338     slen = 2;
339     break;
340     case ' ':
341     subst = "\\011";
342     slen = 4;
343     break;
344     case '\n':
345     subst = "\\012";
346     slen = 4;
347     break;
348     case '\'':
349     subst = "\\047";
350     slen = 4;
351     break;
352     case '\0':
353     out[i] = 0;
354     return (out);
355     default:
356     slen = 1;
357     break;
358 dpavlin 1.1 }
359    
360     if (i + slen >= size)
361     {
362     if (out == buf)
363     {
364 dpavlin 1.2 out = (char *) palloc(size + ExtendBy);
365 dpavlin 1.1 strncpy(out, buf, i);
366     size += ExtendBy;
367     }
368     else
369     {
370 dpavlin 1.2 out = (char *) repalloc(out, size + ExtendBy);
371 dpavlin 1.1 size += ExtendBy;
372     }
373     }
374    
375     if (slen == 1)
376     out[i++] = *key;
377     else
378     {
379     memcpy(out + i, subst, slen);
380     i += slen;
381     }
382     key++;
383     }
384    
385 dpavlin 1.2 return (out);
386 dpavlin 1.1
387     }

  ViewVC Help
Powered by ViewVC 1.1.26