/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.5 - (show annotations)
Mon Nov 3 21:30:37 2003 UTC (20 years, 4 months ago) by dpavlin
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +17 -0 lines
File MIME type: text/plain
added _rserv_xid_ function to return current transaction xid and removed
all perl cludgery about keys (as well as KEYS directive from snapshots)

1 /* rserv.c
2 * Support functions for erServer replication.
3 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4 */
5
6 #include "executor/spi.h" /* this is what you need to work with SPI */
7 #include "commands/trigger.h" /* -"- and triggers */
8 #include "utils/tqual.h" /* -"- and SnapshotData */
9 #include <ctype.h> /* tolower () */
10
11 #ifdef PG_FUNCTION_INFO_V1
12 #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13 #endif
14
15 #ifdef PG_FUNCTION_INFO_V1
16 PG_FUNCTION_INFO_V1(_rserv_log_);
17 PG_FUNCTION_INFO_V1(_rserv_sync_);
18 PG_FUNCTION_INFO_V1(_rserv_debug_);
19 PG_FUNCTION_INFO_V1(_rserv_xid_);
20 Datum _rserv_log_(PG_FUNCTION_ARGS);
21 Datum _rserv_sync_(PG_FUNCTION_ARGS);
22 Datum _rserv_debug_(PG_FUNCTION_ARGS);
23 Datum _rserv_xid_(PG_FUNCTION_ARGS);
24
25 #else
26 HeapTuple _rserv_log_(void);
27 int32 _rserv_sync_(int32);
28 int32 _rserv_debug_(int32);
29 int32 _rserv_xid_(void);
30 #endif
31
32 static int debug = 0;
33
34 static char *OutputValue(char *key, char *buf, int size);
35
36 #ifdef PG_FUNCTION_INFO_V1
37 Datum
38 _rserv_log_(PG_FUNCTION_ARGS)
39 #else
40 HeapTuple
41 _rserv_log_()
42 #endif
43 {
44 Trigger *trigger; /* to get trigger name */
45 int nargs; /* # of args specified in CREATE TRIGGER */
46 char **args; /* argument: argnum */
47 Relation rel; /* triggered relation */
48 HeapTuple tuple; /* tuple to return */
49 HeapTuple newtuple = NULL; /* tuple to return */
50 TupleDesc tupdesc; /* tuple description */
51 int keynum;
52 char *key;
53 char *okey;
54 char *newkey = NULL;
55 int deleted, inserted, updated;
56 char sql[8192];
57 char outbuf[8192];
58 char oidbuf[64];
59 int ret;
60 int server;
61
62 /* Called by trigger manager ? */
63 if (!CurrentTriggerData)
64 elog(ERROR, "_rserv_log_: triggers are not initialized");
65
66 /* Should be called for ROW trigger */
67 if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
68 elog(ERROR, "_rserv_log_: can't process STATEMENT events");
69
70 tuple = CurrentTriggerData->tg_trigtuple;
71
72 trigger = CurrentTriggerData->tg_trigger;
73 nargs = trigger->tgnargs;
74 args = trigger->tgargs;
75
76 if (nargs != 2) /* odd number of arguments! */
77 elog(ERROR, "_rserv_log_: need in *two* arguments, key number and server number");
78
79 keynum = atoi(args[0]);
80 server = atoi(args[1]);
81
82 if (keynum < 0 && keynum != ObjectIdAttributeNumber)
83 elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
84
85 rel = CurrentTriggerData->tg_relation;
86 tupdesc = rel->rd_att;
87
88 deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
89 1 : 0;
90
91 inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
92
93 updated = 0;
94 if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
95 updated = 1;
96 newtuple = CurrentTriggerData->tg_newtuple;
97 }
98
99 #ifndef PG_FUNCTION_INFO_V1
100
101 /*
102 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
103 * functions in queries. Normally, trigger functions have to be called
104 * by trigger manager code only.
105 */
106 CurrentTriggerData = NULL;
107 #endif
108
109 /* Connect to SPI manager */
110 if ((ret = SPI_connect()) < 0)
111 elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
112
113 if (keynum == ObjectIdAttributeNumber)
114 {
115 snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
116 key = oidbuf;
117 }
118 else
119 key = SPI_getvalue(tuple, tupdesc, keynum);
120
121 if (key == NULL)
122 elog(ERROR, "_rserv_log_: key must be not null");
123
124 if (newtuple && keynum != ObjectIdAttributeNumber)
125 {
126 newkey = SPI_getvalue(newtuple, tupdesc, keynum);
127 if (newkey == NULL)
128 elog(ERROR, "_rserv_log_: key must be not null");
129 if (strcmp(newkey, key) == 0)
130 newkey = NULL;
131 else
132 deleted = 1; /* old key was deleted */
133 }
134
135 if (strpbrk(key, "\\ \n'"))
136 okey = OutputValue(key, outbuf, sizeof(outbuf));
137 else
138 okey = key;
139
140 snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
141 "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
142 GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
143
144 if (debug)
145 elog(DEBUG3, "sql: %s", sql);
146
147 ret = SPI_exec(sql, 0);
148
149 if (ret < 0)
150 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
151
152 /*
153 * If no tuple was UPDATEd then do INSERT...
154 */
155 if (SPI_processed > 1)
156 elog(ERROR, "_rserv_log_: duplicate tuples");
157 else if (SPI_processed == 0)
158 {
159 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
160 "(reloid, logid, logtime, insert, update, delete, key, server) "
161 "values (%u, %d, now(), %d, %d, %d, '%s', %d)",
162 rel->rd_id, GetCurrentTransactionId(),
163 inserted, updated, deleted, okey, server);
164
165 if (debug)
166 elog(DEBUG3, "sql: %s", sql);
167
168 ret = SPI_exec(sql, 0);
169
170 if (ret < 0)
171 elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
172 }
173
174 if (okey != key && okey != outbuf)
175 pfree(okey);
176
177 if (newkey)
178 {
179 if (strpbrk(newkey, "\\ \n'"))
180 okey = OutputValue(newkey, outbuf, sizeof(outbuf));
181 else
182 okey = newkey;
183
184 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
185 "(reloid, logid, logtime, insert, update, deleted, key, server) "
186 "values (%u, %d, now(), %d, %d, 0, '%s', %d)",
187 rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey, server);
188
189 if (debug)
190 elog(DEBUG3, "sql: %s", sql);
191
192 ret = SPI_exec(sql, 0);
193
194 if (ret < 0)
195 elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
196
197 if (okey != newkey && okey != outbuf)
198 pfree(okey);
199 }
200
201 SPI_finish();
202
203 #ifdef PG_FUNCTION_INFO_V1
204 return (PointerGetDatum(tuple));
205 #else
206 return (tuple);
207 #endif
208 }
209
210 #ifdef PG_FUNCTION_INFO_V1
211 Datum
212 _rserv_sync_(PG_FUNCTION_ARGS)
213 #else
214 int32
215 _rserv_sync_(int32 server)
216 #endif
217 {
218 #ifdef PG_FUNCTION_INFO_V1
219 int32 server = PG_GETARG_INT32(0);
220 #endif
221 char sql[8192];
222 char buf[8192];
223 char *active = buf;
224 uint32 xcnt;
225 int ret;
226
227 if (SerializableSnapshot == NULL)
228 elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
229
230 buf[0] = 0;
231 for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
232 {
233 snprintf(buf + strlen(buf), 8192 - strlen(buf),
234 "%s%u", (xcnt) ? ", " : "",
235 SerializableSnapshot->xip[xcnt]);
236 }
237
238 if ((ret = SPI_connect()) < 0)
239 elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
240
241 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
242 "(server, syncid, synctime, status, minid, maxid, active) "
243 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
244 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
245
246 ret = SPI_exec(sql, 0);
247
248 if (ret < 0)
249 elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
250
251 SPI_finish();
252
253 return (0);
254 }
255
256 #ifdef PG_FUNCTION_INFO_V1
257 Datum
258 _rserv_debug_(PG_FUNCTION_ARGS)
259 #else
260 int32
261 _rserv_debug_(int32 newval)
262 #endif
263 {
264 #ifdef PG_FUNCTION_INFO_V1
265 int32 newval = PG_GETARG_INT32(0);
266 #endif
267 int32 oldval = debug;
268
269 debug = newval;
270
271 return (oldval);
272 }
273
274 #define ExtendBy 1024
275
276 static char *
277 OutputValue(char *key, char *buf, int size)
278 {
279 int i = 0;
280 char *out = buf;
281 char *subst = NULL;
282 int slen = 0;
283
284 size--;
285 for (;;)
286 {
287 switch (*key)
288 {
289 case '\\':
290 subst = "\\\\";
291 slen = 2;
292 break;
293 case ' ':
294 subst = "\\011";
295 slen = 4;
296 break;
297 case '\n':
298 subst = "\\012";
299 slen = 4;
300 break;
301 case '\'':
302 subst = "\\047";
303 slen = 4;
304 break;
305 case '\0':
306 out[i] = 0;
307 return (out);
308 default:
309 slen = 1;
310 break;
311 }
312
313 if (i + slen >= size)
314 {
315 if (out == buf)
316 {
317 out = (char *) palloc(size + ExtendBy);
318 strncpy(out, buf, i);
319 size += ExtendBy;
320 }
321 else
322 {
323 out = (char *) repalloc(out, size + ExtendBy);
324 size += ExtendBy;
325 }
326 }
327
328 if (slen == 1)
329 out[i++] = *key;
330 else
331 {
332 memcpy(out + i, subst, slen);
333 i += slen;
334 }
335 key++;
336 }
337
338 return (out);
339
340 }
341
342 #ifdef PG_FUNCTION_INFO_V1
343 Datum
344 _rserv_xid_(PG_FUNCTION_ARGS)
345 #else
346 int32
347 _rserv_xid_(void)
348 #endif
349 {
350 int32 curr_xid = GetCurrentTransactionId();
351
352 return (curr_xid);
353 }
354

  ViewVC Help
Powered by ViewVC 1.1.26