/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (show annotations)
Tue Aug 5 09:52:39 2003 UTC (20 years, 9 months ago) by dpavlin
Branch: MAIN
Changes since 1.1: +258 -190 lines
File MIME type: text/plain
rserv 0.2 changes by Nélio Alves Pereira Filho

1 /* rserv.c
2 * Support functions for erServer replication.
3 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4 */
5
6 #include "executor/spi.h" /* this is what you need to work with SPI */
7 #include "commands/trigger.h" /* -"- and triggers */
8 #include "utils/tqual.h" /* -"- and SnapshotData */
9 #include <ctype.h> /* tolower () */
10
11 #ifdef PG_FUNCTION_INFO_V1
12 #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13 #endif
14
15 #ifdef PG_FUNCTION_INFO_V1
16 PG_FUNCTION_INFO_V1(_rserv_log_);
17 PG_FUNCTION_INFO_V1(_rserv_sync_);
18 PG_FUNCTION_INFO_V1(_rserv_debug_);
19 Datum _rserv_log_(PG_FUNCTION_ARGS);
20 Datum _rserv_sync_(PG_FUNCTION_ARGS);
21 Datum _rserv_debug_(PG_FUNCTION_ARGS);
22
23 #else
24 HeapTuple _rserv_log_(void);
25 int32 _rserv_sync_(int32);
26 int32 _rserv_debug_(int32);
27 #endif
28
29 static int debug = 0;
30
31 static char *OutputValue(char *key, char *buf, int size);
32
33 #ifdef PG_FUNCTION_INFO_V1
34 Datum _rserv_log_(PG_FUNCTION_ARGS)
35 #else
36 HeapTuple _rserv_log_()
37 #endif
38 {
39 Trigger *trigger; /* to get trigger name */
40 int nargs; /* # of args specified in CREATE TRIGGER */
41 char **args; /* argument: argnum */
42 Relation rel; /* triggered relation */
43 HeapTuple tuple; /* tuple to return */
44 HeapTuple newtuple = NULL; /* tuple to return */
45 TupleDesc tupdesc; /* tuple description */
46 int keynum;
47 char *key;
48 char *okey;
49 char *newkey = NULL;
50 int deleted, inserted, updated;
51 char sql[8192];
52 char outbuf[8192];
53 char oidbuf[64];
54 int ret;
55
56 /* Called by trigger manager ? */
57 if (!CurrentTriggerData)
58 elog(ERROR, "_rserv_log_: triggers are not initialized");
59
60 /* Should be called for ROW trigger */
61 if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
62 elog(ERROR, "_rserv_log_: can't process STATEMENT events");
63
64 tuple = CurrentTriggerData->tg_trigtuple;
65
66 trigger = CurrentTriggerData->tg_trigger;
67 nargs = trigger->tgnargs;
68 args = trigger->tgargs;
69
70 if (nargs != 1) /* odd number of arguments! */
71 elog(ERROR, "_rserv_log_: need in *one* argument");
72
73 keynum = atoi(args[0]);
74
75 if (keynum < 0 && keynum != ObjectIdAttributeNumber)
76 elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
77
78 rel = CurrentTriggerData->tg_relation;
79 tupdesc = rel->rd_att;
80
81 deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ? 1 : 0;
82 inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
83 // updated = (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) ? 1 : 0;
84
85 updated = 0;
86 if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
87 updated = 1;
88 newtuple = CurrentTriggerData->tg_newtuple;
89 }
90
91 #ifndef PG_FUNCTION_INFO_V1
92
93 /*
94 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
95 * functions in queries. Normally, trigger functions have to be called
96 * by trigger manager code only.
97 */
98 CurrentTriggerData = NULL;
99 #endif
100
101 /* Connect to SPI manager */
102 if ((ret = SPI_connect()) < 0)
103 elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
104
105 if (keynum == ObjectIdAttributeNumber) {
106 sprintf(oidbuf, "%u", tuple->t_data->t_oid);
107 key = oidbuf;
108 } else {
109 key = SPI_getvalue(tuple, tupdesc, keynum);
110 }
111
112 if (key == NULL)
113 elog(ERROR, "_rserv_log_: key must be not null");
114
115 if (newtuple && keynum != ObjectIdAttributeNumber) {
116 newkey = SPI_getvalue(newtuple, tupdesc, keynum);
117 if (newkey == NULL)
118 elog(ERROR, "_rserv_log_: key must be not null");
119 if (strcmp(newkey, key) == 0)
120 newkey = NULL;
121 else
122 deleted = 1; /* old key was deleted */
123 }
124
125 if (strpbrk(key, "\\ \n'"))
126 okey = OutputValue(key, outbuf, sizeof(outbuf));
127 else
128 okey = key;
129
130
131 /**
132 versao onde verifica-se primeiro se o registro existe
133 para decidir se faz insert ou update em _RSERV_LOG_
134 **********************/
135
136 #ifdef ASKFORUPDATE
137 sprintf(sql, "SELECT true WHERE EXISTS (SELECT logid from _RSERV_LOG_ "
138 "WHERE reloid = %u AND key = '%s')",
139 rel->rd_id, okey);
140
141 if (debug)
142 elog(NOTICE, sql);
143
144 if ((ret = SPI_exec(sql, 1)) < 0) {
145 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
146 }
147
148 if (SPI_processed > 0) {
149 sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
150 "insert = %d, update = %d, delete = %d where reloid = %u "
151 "and key = '%s'",
152 GetCurrentTransactionId(),
153 inserted, updated, deleted, rel->rd_id, okey);
154
155 if (debug)
156 elog(NOTICE, sql);
157
158 ret = SPI_exec(sql, 0);
159
160 if (ret < 0)
161 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
162
163 if (SPI_processed > 1)
164 elog(ERROR, "_rserv_log_: duplicate tuples");
165 } else {
166 sprintf(sql, "insert into _RSERV_LOG_ "
167 "(reloid, logid, logtime, insert, update, delete, key) "
168 "values (%u, %d, now(), %d, %d, %d, '%s')",
169 rel->rd_id, GetCurrentTransactionId(),
170 inserted, updated, deleted, okey);
171
172 if (debug)
173 elog(NOTICE, sql);
174
175 ret = SPI_exec(sql, 0);
176
177 if (ret < 0)
178 elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
179 }
180
181 #else
182
183 sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
184 "insert = %d, update = %d, delete = %d where reloid = %u "
185 "and key = '%s'",
186 GetCurrentTransactionId(),
187 inserted, updated, deleted, rel->rd_id, okey);
188
189 if (debug)
190 elog(NOTICE, sql);
191
192 ret = SPI_exec(sql, 0);
193
194 if (ret < 0)
195 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
196
197
198 /*
199 * If no tuple was UPDATEd then do INSERT...
200 */
201
202
203 if (SPI_processed > 1)
204 elog(ERROR, "_rserv_log_: duplicate tuples");
205 else if (SPI_processed == 0) {
206 sprintf(sql, "insert into _RSERV_LOG_ "
207 "(reloid, logid, logtime, insert, update, delete, key) "
208 "values (%u, %d, now(), %d, %d, %d, '%s')",
209 rel->rd_id, GetCurrentTransactionId(),
210 inserted, updated, deleted, okey);
211
212 if (debug)
213 elog(NOTICE, sql);
214
215 ret = SPI_exec(sql, 0);
216
217 if (ret < 0)
218 elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
219 }
220
221 #endif
222
223 if (okey != key && okey != outbuf)
224 pfree(okey);
225
226 if (newkey) {
227 if (strpbrk(newkey, "\\ \n'"))
228 okey = OutputValue(newkey, outbuf, sizeof(outbuf));
229 else
230 okey = newkey;
231
232 sprintf(sql, "insert into _RSERV_LOG_ "
233 "(reloid, logid, logtime, insert, update, delete, key) "
234 "values (%u, %d, now(), %d, %d, 0, '%s')",
235 rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey);
236
237 if (debug)
238 elog(NOTICE, sql);
239
240 ret = SPI_exec(sql, 0);
241
242 if (ret < 0)
243 elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
244
245 if (okey != newkey && okey != outbuf)
246 pfree(okey);
247 }
248
249 SPI_finish();
250
251 #ifdef PG_FUNCTION_INFO_V1
252 return (PointerGetDatum(tuple));
253 #else
254 return (tuple);
255 #endif
256 }
257
258 #ifdef PG_FUNCTION_INFO_V1
259 Datum
260 _rserv_sync_(PG_FUNCTION_ARGS)
261 #else
262 int32
263 _rserv_sync_(int32 server)
264 #endif
265 {
266 #ifdef PG_FUNCTION_INFO_V1
267 int32 server = PG_GETARG_INT32(0);
268 #endif
269 char sql[8192];
270 char buf[8192];
271 char *active = buf;
272 uint32 xcnt;
273 int ret;
274
275 if (SerializableSnapshot == NULL)
276 elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
277
278 buf[0] = 0;
279 for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
280 {
281 sprintf(buf + strlen(buf), "%s%u", (xcnt) ? ", " : "",
282 SerializableSnapshot->xip[xcnt]);
283 }
284
285 if ((ret = SPI_connect()) < 0)
286 elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
287
288 sprintf(sql, "insert into _RSERV_SYNC_ "
289 "(server, syncid, synctime, status, minid, maxid, active) "
290 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
291 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
292
293 ret = SPI_exec(sql, 0);
294
295 if (ret < 0)
296 elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
297
298 SPI_finish();
299
300 return (0);
301 }
302
303 #ifdef PG_FUNCTION_INFO_V1
304 Datum
305 _rserv_debug_(PG_FUNCTION_ARGS)
306 #else
307 int32
308 _rserv_debug_(int32 newval)
309 #endif
310 {
311 #ifdef PG_FUNCTION_INFO_V1
312 int32 newval = PG_GETARG_INT32(0);
313 #endif
314 int32 oldval = debug;
315
316 debug = newval;
317
318 return (oldval);
319 }
320
321 #define ExtendBy 1024
322
323 static char *
324 OutputValue(char *key, char *buf, int size)
325 {
326 int i = 0;
327 char *out = buf;
328 char *subst = NULL;
329 int slen = 0;
330
331 size--;
332 for (;;)
333 {
334 switch (*key)
335 {
336 case '\\':
337 subst = "\\\\";
338 slen = 2;
339 break;
340 case ' ':
341 subst = "\\011";
342 slen = 4;
343 break;
344 case '\n':
345 subst = "\\012";
346 slen = 4;
347 break;
348 case '\'':
349 subst = "\\047";
350 slen = 4;
351 break;
352 case '\0':
353 out[i] = 0;
354 return (out);
355 default:
356 slen = 1;
357 break;
358 }
359
360 if (i + slen >= size)
361 {
362 if (out == buf)
363 {
364 out = (char *) palloc(size + ExtendBy);
365 strncpy(out, buf, i);
366 size += ExtendBy;
367 }
368 else
369 {
370 out = (char *) repalloc(out, size + ExtendBy);
371 size += ExtendBy;
372 }
373 }
374
375 if (slen == 1)
376 out[i++] = *key;
377 else
378 {
379 memcpy(out + i, subst, slen);
380 i += slen;
381 }
382 key++;
383 }
384
385 return (out);
386
387 }

  ViewVC Help
Powered by ViewVC 1.1.26