/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (hide annotations)
Mon Nov 3 21:30:37 2003 UTC (20 years, 4 months ago) by dpavlin
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +17 -0 lines
File MIME type: text/plain
added _rserv_xid_ function to return current transaction xid and removed
all perl cludgery about keys (as well as KEYS directive from snapshots)

1 dpavlin 1.1 /* rserv.c
2     * Support functions for erServer replication.
3     * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4     */
5    
6 dpavlin 1.4 #include "executor/spi.h" /* this is what you need to work with SPI */
7 dpavlin 1.1 #include "commands/trigger.h" /* -"- and triggers */
8 dpavlin 1.4 #include "utils/tqual.h" /* -"- and SnapshotData */
9     #include <ctype.h> /* tolower () */
10 dpavlin 1.1
11     #ifdef PG_FUNCTION_INFO_V1
12     #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13     #endif
14    
15     #ifdef PG_FUNCTION_INFO_V1
16     PG_FUNCTION_INFO_V1(_rserv_log_);
17     PG_FUNCTION_INFO_V1(_rserv_sync_);
18     PG_FUNCTION_INFO_V1(_rserv_debug_);
19 dpavlin 1.5 PG_FUNCTION_INFO_V1(_rserv_xid_);
20 dpavlin 1.2 Datum _rserv_log_(PG_FUNCTION_ARGS);
21     Datum _rserv_sync_(PG_FUNCTION_ARGS);
22     Datum _rserv_debug_(PG_FUNCTION_ARGS);
23 dpavlin 1.5 Datum _rserv_xid_(PG_FUNCTION_ARGS);
24 dpavlin 1.2
25 dpavlin 1.1 #else
26     HeapTuple _rserv_log_(void);
27     int32 _rserv_sync_(int32);
28     int32 _rserv_debug_(int32);
29 dpavlin 1.5 int32 _rserv_xid_(void);
30 dpavlin 1.1 #endif
31    
32     static int debug = 0;
33    
34 dpavlin 1.2 static char *OutputValue(char *key, char *buf, int size);
35 dpavlin 1.1
36     #ifdef PG_FUNCTION_INFO_V1
37 dpavlin 1.3 Datum
38     _rserv_log_(PG_FUNCTION_ARGS)
39 dpavlin 1.1 #else
40 dpavlin 1.3 HeapTuple
41     _rserv_log_()
42 dpavlin 1.1 #endif
43     {
44 dpavlin 1.3 Trigger *trigger; /* to get trigger name */
45     int nargs; /* # of args specified in CREATE TRIGGER */
46     char **args; /* argument: argnum */
47     Relation rel; /* triggered relation */
48     HeapTuple tuple; /* tuple to return */
49     HeapTuple newtuple = NULL; /* tuple to return */
50     TupleDesc tupdesc; /* tuple description */
51     int keynum;
52     char *key;
53     char *okey;
54     char *newkey = NULL;
55     int deleted, inserted, updated;
56     char sql[8192];
57     char outbuf[8192];
58     char oidbuf[64];
59     int ret;
60 dpavlin 1.4 int server;
61 dpavlin 1.3
62     /* Called by trigger manager ? */
63     if (!CurrentTriggerData)
64     elog(ERROR, "_rserv_log_: triggers are not initialized");
65    
66     /* Should be called for ROW trigger */
67     if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
68     elog(ERROR, "_rserv_log_: can't process STATEMENT events");
69    
70     tuple = CurrentTriggerData->tg_trigtuple;
71    
72     trigger = CurrentTriggerData->tg_trigger;
73     nargs = trigger->tgnargs;
74     args = trigger->tgargs;
75    
76 dpavlin 1.4 if (nargs != 2) /* odd number of arguments! */
77     elog(ERROR, "_rserv_log_: need in *two* arguments, key number and server number");
78 dpavlin 1.3
79     keynum = atoi(args[0]);
80 dpavlin 1.4 server = atoi(args[1]);
81 dpavlin 1.3
82     if (keynum < 0 && keynum != ObjectIdAttributeNumber)
83     elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
84    
85     rel = CurrentTriggerData->tg_relation;
86     tupdesc = rel->rd_att;
87    
88     deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
89     1 : 0;
90    
91     inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
92    
93     updated = 0;
94     if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
95     updated = 1;
96     newtuple = CurrentTriggerData->tg_newtuple;
97     }
98 dpavlin 1.2
99     #ifndef PG_FUNCTION_INFO_V1
100 dpavlin 1.3
101     /*
102     * Setting CurrentTriggerData to NULL prevents direct calls to trigger
103     * functions in queries. Normally, trigger functions have to be called
104     * by trigger manager code only.
105     */
106     CurrentTriggerData = NULL;
107 dpavlin 1.2 #endif
108 dpavlin 1.1
109 dpavlin 1.3 /* Connect to SPI manager */
110     if ((ret = SPI_connect()) < 0)
111     elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
112    
113     if (keynum == ObjectIdAttributeNumber)
114     {
115     snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
116     key = oidbuf;
117     }
118     else
119     key = SPI_getvalue(tuple, tupdesc, keynum);
120    
121     if (key == NULL)
122     elog(ERROR, "_rserv_log_: key must be not null");
123    
124     if (newtuple && keynum != ObjectIdAttributeNumber)
125     {
126     newkey = SPI_getvalue(newtuple, tupdesc, keynum);
127     if (newkey == NULL)
128     elog(ERROR, "_rserv_log_: key must be not null");
129     if (strcmp(newkey, key) == 0)
130     newkey = NULL;
131     else
132     deleted = 1; /* old key was deleted */
133     }
134    
135     if (strpbrk(key, "\\ \n'"))
136     okey = OutputValue(key, outbuf, sizeof(outbuf));
137     else
138     okey = key;
139    
140     snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
141     "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
142     GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
143    
144     if (debug)
145     elog(DEBUG3, "sql: %s", sql);
146    
147     ret = SPI_exec(sql, 0);
148    
149     if (ret < 0)
150     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
151    
152     /*
153     * If no tuple was UPDATEd then do INSERT...
154     */
155     if (SPI_processed > 1)
156     elog(ERROR, "_rserv_log_: duplicate tuples");
157     else if (SPI_processed == 0)
158     {
159     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
160 dpavlin 1.4 "(reloid, logid, logtime, insert, update, delete, key, server) "
161     "values (%u, %d, now(), %d, %d, %d, '%s', %d)",
162 dpavlin 1.3 rel->rd_id, GetCurrentTransactionId(),
163 dpavlin 1.4 inserted, updated, deleted, okey, server);
164 dpavlin 1.3
165     if (debug)
166     elog(DEBUG3, "sql: %s", sql);
167    
168     ret = SPI_exec(sql, 0);
169    
170     if (ret < 0)
171     elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
172     }
173    
174     if (okey != key && okey != outbuf)
175     pfree(okey);
176    
177     if (newkey)
178     {
179     if (strpbrk(newkey, "\\ \n'"))
180     okey = OutputValue(newkey, outbuf, sizeof(outbuf));
181     else
182     okey = newkey;
183    
184     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
185 dpavlin 1.4 "(reloid, logid, logtime, insert, update, deleted, key, server) "
186     "values (%u, %d, now(), %d, %d, 0, '%s', %d)",
187     rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey, server);
188 dpavlin 1.3
189     if (debug)
190     elog(DEBUG3, "sql: %s", sql);
191    
192     ret = SPI_exec(sql, 0);
193    
194     if (ret < 0)
195     elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
196    
197     if (okey != newkey && okey != outbuf)
198     pfree(okey);
199     }
200    
201     SPI_finish();
202    
203 dpavlin 1.1 #ifdef PG_FUNCTION_INFO_V1
204 dpavlin 1.3 return (PointerGetDatum(tuple));
205 dpavlin 1.1 #else
206 dpavlin 1.3 return (tuple);
207 dpavlin 1.1 #endif
208     }
209    
210     #ifdef PG_FUNCTION_INFO_V1
211     Datum
212     _rserv_sync_(PG_FUNCTION_ARGS)
213     #else
214     int32
215     _rserv_sync_(int32 server)
216     #endif
217     {
218     #ifdef PG_FUNCTION_INFO_V1
219 dpavlin 1.2 int32 server = PG_GETARG_INT32(0);
220 dpavlin 1.1 #endif
221 dpavlin 1.2 char sql[8192];
222     char buf[8192];
223     char *active = buf;
224     uint32 xcnt;
225     int ret;
226 dpavlin 1.1
227     if (SerializableSnapshot == NULL)
228     elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
229    
230     buf[0] = 0;
231     for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
232     {
233 dpavlin 1.3 snprintf(buf + strlen(buf), 8192 - strlen(buf),
234     "%s%u", (xcnt) ? ", " : "",
235     SerializableSnapshot->xip[xcnt]);
236 dpavlin 1.1 }
237    
238     if ((ret = SPI_connect()) < 0)
239     elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
240    
241 dpavlin 1.3 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
242     "(server, syncid, synctime, status, minid, maxid, active) "
243 dpavlin 1.2 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
244 dpavlin 1.3 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
245 dpavlin 1.1
246     ret = SPI_exec(sql, 0);
247    
248     if (ret < 0)
249     elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
250    
251     SPI_finish();
252    
253     return (0);
254     }
255    
256     #ifdef PG_FUNCTION_INFO_V1
257     Datum
258     _rserv_debug_(PG_FUNCTION_ARGS)
259     #else
260     int32
261     _rserv_debug_(int32 newval)
262     #endif
263     {
264     #ifdef PG_FUNCTION_INFO_V1
265 dpavlin 1.2 int32 newval = PG_GETARG_INT32(0);
266 dpavlin 1.1 #endif
267 dpavlin 1.2 int32 oldval = debug;
268 dpavlin 1.1
269     debug = newval;
270    
271     return (oldval);
272     }
273    
274 dpavlin 1.2 #define ExtendBy 1024
275 dpavlin 1.1
276 dpavlin 1.2 static char *
277 dpavlin 1.1 OutputValue(char *key, char *buf, int size)
278     {
279     int i = 0;
280     char *out = buf;
281     char *subst = NULL;
282     int slen = 0;
283    
284     size--;
285 dpavlin 1.2 for (;;)
286 dpavlin 1.1 {
287     switch (*key)
288     {
289 dpavlin 1.2 case '\\':
290     subst = "\\\\";
291     slen = 2;
292     break;
293     case ' ':
294     subst = "\\011";
295     slen = 4;
296     break;
297     case '\n':
298     subst = "\\012";
299     slen = 4;
300     break;
301     case '\'':
302     subst = "\\047";
303     slen = 4;
304     break;
305     case '\0':
306     out[i] = 0;
307     return (out);
308     default:
309     slen = 1;
310     break;
311 dpavlin 1.1 }
312    
313     if (i + slen >= size)
314     {
315     if (out == buf)
316     {
317 dpavlin 1.2 out = (char *) palloc(size + ExtendBy);
318 dpavlin 1.1 strncpy(out, buf, i);
319     size += ExtendBy;
320     }
321     else
322     {
323 dpavlin 1.2 out = (char *) repalloc(out, size + ExtendBy);
324 dpavlin 1.1 size += ExtendBy;
325     }
326     }
327    
328     if (slen == 1)
329     out[i++] = *key;
330     else
331     {
332     memcpy(out + i, subst, slen);
333     i += slen;
334     }
335     key++;
336     }
337    
338 dpavlin 1.2 return (out);
339 dpavlin 1.1
340     }
341 dpavlin 1.5
342     #ifdef PG_FUNCTION_INFO_V1
343     Datum
344     _rserv_xid_(PG_FUNCTION_ARGS)
345     #else
346     int32
347     _rserv_xid_(void)
348     #endif
349     {
350     int32 curr_xid = GetCurrentTransactionId();
351    
352     return (curr_xid);
353     }
354    

  ViewVC Help
Powered by ViewVC 1.1.26