/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.4 - (show annotations)
Tue Oct 28 18:43:15 2003 UTC (20 years, 5 months ago) by dpavlin
Branch: MAIN
Changes since 1.3: +13 -11 lines
File MIME type: text/plain
add server number in _rserv_log_

1 /* rserv.c
2 * Support functions for erServer replication.
3 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4 */
5
6 #include "executor/spi.h" /* this is what you need to work with SPI */
7 #include "commands/trigger.h" /* -"- and triggers */
8 #include "utils/tqual.h" /* -"- and SnapshotData */
9 #include <ctype.h> /* tolower () */
10
11 #ifdef PG_FUNCTION_INFO_V1
12 #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13 #endif
14
15 #ifdef PG_FUNCTION_INFO_V1
16 PG_FUNCTION_INFO_V1(_rserv_log_);
17 PG_FUNCTION_INFO_V1(_rserv_sync_);
18 PG_FUNCTION_INFO_V1(_rserv_debug_);
19 Datum _rserv_log_(PG_FUNCTION_ARGS);
20 Datum _rserv_sync_(PG_FUNCTION_ARGS);
21 Datum _rserv_debug_(PG_FUNCTION_ARGS);
22
23 #else
24 HeapTuple _rserv_log_(void);
25 int32 _rserv_sync_(int32);
26 int32 _rserv_debug_(int32);
27 #endif
28
29 static int debug = 0;
30
31 static char *OutputValue(char *key, char *buf, int size);
32
33 #ifdef PG_FUNCTION_INFO_V1
34 Datum
35 _rserv_log_(PG_FUNCTION_ARGS)
36 #else
37 HeapTuple
38 _rserv_log_()
39 #endif
40 {
41 Trigger *trigger; /* to get trigger name */
42 int nargs; /* # of args specified in CREATE TRIGGER */
43 char **args; /* argument: argnum */
44 Relation rel; /* triggered relation */
45 HeapTuple tuple; /* tuple to return */
46 HeapTuple newtuple = NULL; /* tuple to return */
47 TupleDesc tupdesc; /* tuple description */
48 int keynum;
49 char *key;
50 char *okey;
51 char *newkey = NULL;
52 int deleted, inserted, updated;
53 char sql[8192];
54 char outbuf[8192];
55 char oidbuf[64];
56 int ret;
57 int server;
58
59 /* Called by trigger manager ? */
60 if (!CurrentTriggerData)
61 elog(ERROR, "_rserv_log_: triggers are not initialized");
62
63 /* Should be called for ROW trigger */
64 if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
65 elog(ERROR, "_rserv_log_: can't process STATEMENT events");
66
67 tuple = CurrentTriggerData->tg_trigtuple;
68
69 trigger = CurrentTriggerData->tg_trigger;
70 nargs = trigger->tgnargs;
71 args = trigger->tgargs;
72
73 if (nargs != 2) /* odd number of arguments! */
74 elog(ERROR, "_rserv_log_: need in *two* arguments, key number and server number");
75
76 keynum = atoi(args[0]);
77 server = atoi(args[1]);
78
79 if (keynum < 0 && keynum != ObjectIdAttributeNumber)
80 elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
81
82 rel = CurrentTriggerData->tg_relation;
83 tupdesc = rel->rd_att;
84
85 deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
86 1 : 0;
87
88 inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
89
90 updated = 0;
91 if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
92 updated = 1;
93 newtuple = CurrentTriggerData->tg_newtuple;
94 }
95
96 #ifndef PG_FUNCTION_INFO_V1
97
98 /*
99 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
100 * functions in queries. Normally, trigger functions have to be called
101 * by trigger manager code only.
102 */
103 CurrentTriggerData = NULL;
104 #endif
105
106 /* Connect to SPI manager */
107 if ((ret = SPI_connect()) < 0)
108 elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
109
110 if (keynum == ObjectIdAttributeNumber)
111 {
112 snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
113 key = oidbuf;
114 }
115 else
116 key = SPI_getvalue(tuple, tupdesc, keynum);
117
118 if (key == NULL)
119 elog(ERROR, "_rserv_log_: key must be not null");
120
121 if (newtuple && keynum != ObjectIdAttributeNumber)
122 {
123 newkey = SPI_getvalue(newtuple, tupdesc, keynum);
124 if (newkey == NULL)
125 elog(ERROR, "_rserv_log_: key must be not null");
126 if (strcmp(newkey, key) == 0)
127 newkey = NULL;
128 else
129 deleted = 1; /* old key was deleted */
130 }
131
132 if (strpbrk(key, "\\ \n'"))
133 okey = OutputValue(key, outbuf, sizeof(outbuf));
134 else
135 okey = key;
136
137 snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
138 "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
139 GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
140
141 if (debug)
142 elog(DEBUG3, "sql: %s", sql);
143
144 ret = SPI_exec(sql, 0);
145
146 if (ret < 0)
147 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
148
149 /*
150 * If no tuple was UPDATEd then do INSERT...
151 */
152 if (SPI_processed > 1)
153 elog(ERROR, "_rserv_log_: duplicate tuples");
154 else if (SPI_processed == 0)
155 {
156 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
157 "(reloid, logid, logtime, insert, update, delete, key, server) "
158 "values (%u, %d, now(), %d, %d, %d, '%s', %d)",
159 rel->rd_id, GetCurrentTransactionId(),
160 inserted, updated, deleted, okey, server);
161
162 if (debug)
163 elog(DEBUG3, "sql: %s", sql);
164
165 ret = SPI_exec(sql, 0);
166
167 if (ret < 0)
168 elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
169 }
170
171 if (okey != key && okey != outbuf)
172 pfree(okey);
173
174 if (newkey)
175 {
176 if (strpbrk(newkey, "\\ \n'"))
177 okey = OutputValue(newkey, outbuf, sizeof(outbuf));
178 else
179 okey = newkey;
180
181 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
182 "(reloid, logid, logtime, insert, update, deleted, key, server) "
183 "values (%u, %d, now(), %d, %d, 0, '%s', %d)",
184 rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey, server);
185
186 if (debug)
187 elog(DEBUG3, "sql: %s", sql);
188
189 ret = SPI_exec(sql, 0);
190
191 if (ret < 0)
192 elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
193
194 if (okey != newkey && okey != outbuf)
195 pfree(okey);
196 }
197
198 SPI_finish();
199
200 #ifdef PG_FUNCTION_INFO_V1
201 return (PointerGetDatum(tuple));
202 #else
203 return (tuple);
204 #endif
205 }
206
207 #ifdef PG_FUNCTION_INFO_V1
208 Datum
209 _rserv_sync_(PG_FUNCTION_ARGS)
210 #else
211 int32
212 _rserv_sync_(int32 server)
213 #endif
214 {
215 #ifdef PG_FUNCTION_INFO_V1
216 int32 server = PG_GETARG_INT32(0);
217 #endif
218 char sql[8192];
219 char buf[8192];
220 char *active = buf;
221 uint32 xcnt;
222 int ret;
223
224 if (SerializableSnapshot == NULL)
225 elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
226
227 buf[0] = 0;
228 for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
229 {
230 snprintf(buf + strlen(buf), 8192 - strlen(buf),
231 "%s%u", (xcnt) ? ", " : "",
232 SerializableSnapshot->xip[xcnt]);
233 }
234
235 if ((ret = SPI_connect()) < 0)
236 elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
237
238 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
239 "(server, syncid, synctime, status, minid, maxid, active) "
240 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
241 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
242
243 ret = SPI_exec(sql, 0);
244
245 if (ret < 0)
246 elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
247
248 SPI_finish();
249
250 return (0);
251 }
252
253 #ifdef PG_FUNCTION_INFO_V1
254 Datum
255 _rserv_debug_(PG_FUNCTION_ARGS)
256 #else
257 int32
258 _rserv_debug_(int32 newval)
259 #endif
260 {
261 #ifdef PG_FUNCTION_INFO_V1
262 int32 newval = PG_GETARG_INT32(0);
263 #endif
264 int32 oldval = debug;
265
266 debug = newval;
267
268 return (oldval);
269 }
270
271 #define ExtendBy 1024
272
273 static char *
274 OutputValue(char *key, char *buf, int size)
275 {
276 int i = 0;
277 char *out = buf;
278 char *subst = NULL;
279 int slen = 0;
280
281 size--;
282 for (;;)
283 {
284 switch (*key)
285 {
286 case '\\':
287 subst = "\\\\";
288 slen = 2;
289 break;
290 case ' ':
291 subst = "\\011";
292 slen = 4;
293 break;
294 case '\n':
295 subst = "\\012";
296 slen = 4;
297 break;
298 case '\'':
299 subst = "\\047";
300 slen = 4;
301 break;
302 case '\0':
303 out[i] = 0;
304 return (out);
305 default:
306 slen = 1;
307 break;
308 }
309
310 if (i + slen >= size)
311 {
312 if (out == buf)
313 {
314 out = (char *) palloc(size + ExtendBy);
315 strncpy(out, buf, i);
316 size += ExtendBy;
317 }
318 else
319 {
320 out = (char *) repalloc(out, size + ExtendBy);
321 size += ExtendBy;
322 }
323 }
324
325 if (slen == 1)
326 out[i++] = *key;
327 else
328 {
329 memcpy(out + i, subst, slen);
330 i += slen;
331 }
332 key++;
333 }
334
335 return (out);
336
337 }

  ViewVC Help
Powered by ViewVC 1.1.26