/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (show annotations)
Wed Aug 6 16:14:53 2003 UTC (20 years, 7 months ago) by dpavlin
Branch: MAIN
CVS Tags: before_onlytables, before_multmaster, r_0_3
Changes since 1.2: +165 -217 lines
File MIME type: text/plain
code synced with rserv.c from 7.3.2 (so, hopefully, it will compile on
PostgreSQL 7.3.x version. It *WON'T* compile on PostgreSQL 7.4 (as of this
writing in development stage), but you can checkout lastest version from
PostgreSQL CVS, make diff and apply changes (as I have done, just to find
out that it doesn't compile on 7.3 -- oh, wonders of major version
relese...)

This code has also removed ASKFORUPDATE parts of code from Nélio Alves
Pereira Filho until I have chance to review what is that code doing.
However, it does include updates to logs (which are needed by other
utilities anyway)

1 /* rserv.c
2 * Support functions for erServer replication.
3 * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4 */
5
6 #include "executor/spi.h" /* this is what you need to work with SPI */
7 #include "commands/trigger.h" /* -"- and triggers */
8 #include "utils/tqual.h" /* -"- and SnapshotData */
9 #include <ctype.h> /* tolower () */
10
11 #ifdef PG_FUNCTION_INFO_V1
12 #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13 #endif
14
15 #ifdef PG_FUNCTION_INFO_V1
16 PG_FUNCTION_INFO_V1(_rserv_log_);
17 PG_FUNCTION_INFO_V1(_rserv_sync_);
18 PG_FUNCTION_INFO_V1(_rserv_debug_);
19 Datum _rserv_log_(PG_FUNCTION_ARGS);
20 Datum _rserv_sync_(PG_FUNCTION_ARGS);
21 Datum _rserv_debug_(PG_FUNCTION_ARGS);
22
23 #else
24 HeapTuple _rserv_log_(void);
25 int32 _rserv_sync_(int32);
26 int32 _rserv_debug_(int32);
27 #endif
28
29 static int debug = 0;
30
31 static char *OutputValue(char *key, char *buf, int size);
32
33 #ifdef PG_FUNCTION_INFO_V1
34 Datum
35 _rserv_log_(PG_FUNCTION_ARGS)
36 #else
37 HeapTuple
38 _rserv_log_()
39 #endif
40 {
41 Trigger *trigger; /* to get trigger name */
42 int nargs; /* # of args specified in CREATE TRIGGER */
43 char **args; /* argument: argnum */
44 Relation rel; /* triggered relation */
45 HeapTuple tuple; /* tuple to return */
46 HeapTuple newtuple = NULL; /* tuple to return */
47 TupleDesc tupdesc; /* tuple description */
48 int keynum;
49 char *key;
50 char *okey;
51 char *newkey = NULL;
52 int deleted, inserted, updated;
53 char sql[8192];
54 char outbuf[8192];
55 char oidbuf[64];
56 int ret;
57
58 /* Called by trigger manager ? */
59 if (!CurrentTriggerData)
60 elog(ERROR, "_rserv_log_: triggers are not initialized");
61
62 /* Should be called for ROW trigger */
63 if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
64 elog(ERROR, "_rserv_log_: can't process STATEMENT events");
65
66 tuple = CurrentTriggerData->tg_trigtuple;
67
68 trigger = CurrentTriggerData->tg_trigger;
69 nargs = trigger->tgnargs;
70 args = trigger->tgargs;
71
72 if (nargs != 1) /* odd number of arguments! */
73 elog(ERROR, "_rserv_log_: need in *one* argument");
74
75 keynum = atoi(args[0]);
76
77 if (keynum < 0 && keynum != ObjectIdAttributeNumber)
78 elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
79
80 rel = CurrentTriggerData->tg_relation;
81 tupdesc = rel->rd_att;
82
83 deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
84 1 : 0;
85
86 inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
87
88 updated = 0;
89 if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
90 updated = 1;
91 newtuple = CurrentTriggerData->tg_newtuple;
92 }
93
94 #ifndef PG_FUNCTION_INFO_V1
95
96 /*
97 * Setting CurrentTriggerData to NULL prevents direct calls to trigger
98 * functions in queries. Normally, trigger functions have to be called
99 * by trigger manager code only.
100 */
101 CurrentTriggerData = NULL;
102 #endif
103
104 /* Connect to SPI manager */
105 if ((ret = SPI_connect()) < 0)
106 elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
107
108 if (keynum == ObjectIdAttributeNumber)
109 {
110 snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
111 key = oidbuf;
112 }
113 else
114 key = SPI_getvalue(tuple, tupdesc, keynum);
115
116 if (key == NULL)
117 elog(ERROR, "_rserv_log_: key must be not null");
118
119 if (newtuple && keynum != ObjectIdAttributeNumber)
120 {
121 newkey = SPI_getvalue(newtuple, tupdesc, keynum);
122 if (newkey == NULL)
123 elog(ERROR, "_rserv_log_: key must be not null");
124 if (strcmp(newkey, key) == 0)
125 newkey = NULL;
126 else
127 deleted = 1; /* old key was deleted */
128 }
129
130 if (strpbrk(key, "\\ \n'"))
131 okey = OutputValue(key, outbuf, sizeof(outbuf));
132 else
133 okey = key;
134
135 snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
136 "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
137 GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
138
139 if (debug)
140 elog(DEBUG3, "sql: %s", sql);
141
142 ret = SPI_exec(sql, 0);
143
144 if (ret < 0)
145 elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
146
147 /*
148 * If no tuple was UPDATEd then do INSERT...
149 */
150 if (SPI_processed > 1)
151 elog(ERROR, "_rserv_log_: duplicate tuples");
152 else if (SPI_processed == 0)
153 {
154 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
155 "(reloid, logid, logtime, insert, update, delete, key) "
156 "values (%u, %d, now(), %d, %d, %d, '%s')",
157 rel->rd_id, GetCurrentTransactionId(),
158 inserted, updated, deleted, okey);
159
160 if (debug)
161 elog(DEBUG3, "sql: %s", sql);
162
163 ret = SPI_exec(sql, 0);
164
165 if (ret < 0)
166 elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
167 }
168
169 if (okey != key && okey != outbuf)
170 pfree(okey);
171
172 if (newkey)
173 {
174 if (strpbrk(newkey, "\\ \n'"))
175 okey = OutputValue(newkey, outbuf, sizeof(outbuf));
176 else
177 okey = newkey;
178
179 snprintf(sql, 8192, "insert into _RSERV_LOG_ "
180 "(reloid, logid, logtime, insert, update, deleted, key) "
181 "values (%u, %d, now(), %d, %d, 0, '%s')",
182 rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey);
183
184 if (debug)
185 elog(DEBUG3, "sql: %s", sql);
186
187 ret = SPI_exec(sql, 0);
188
189 if (ret < 0)
190 elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
191
192 if (okey != newkey && okey != outbuf)
193 pfree(okey);
194 }
195
196 SPI_finish();
197
198 #ifdef PG_FUNCTION_INFO_V1
199 return (PointerGetDatum(tuple));
200 #else
201 return (tuple);
202 #endif
203 }
204
205 #ifdef PG_FUNCTION_INFO_V1
206 Datum
207 _rserv_sync_(PG_FUNCTION_ARGS)
208 #else
209 int32
210 _rserv_sync_(int32 server)
211 #endif
212 {
213 #ifdef PG_FUNCTION_INFO_V1
214 int32 server = PG_GETARG_INT32(0);
215 #endif
216 char sql[8192];
217 char buf[8192];
218 char *active = buf;
219 uint32 xcnt;
220 int ret;
221
222 if (SerializableSnapshot == NULL)
223 elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
224
225 buf[0] = 0;
226 for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
227 {
228 snprintf(buf + strlen(buf), 8192 - strlen(buf),
229 "%s%u", (xcnt) ? ", " : "",
230 SerializableSnapshot->xip[xcnt]);
231 }
232
233 if ((ret = SPI_connect()) < 0)
234 elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
235
236 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
237 "(server, syncid, synctime, status, minid, maxid, active) "
238 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
239 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
240
241 ret = SPI_exec(sql, 0);
242
243 if (ret < 0)
244 elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
245
246 SPI_finish();
247
248 return (0);
249 }
250
251 #ifdef PG_FUNCTION_INFO_V1
252 Datum
253 _rserv_debug_(PG_FUNCTION_ARGS)
254 #else
255 int32
256 _rserv_debug_(int32 newval)
257 #endif
258 {
259 #ifdef PG_FUNCTION_INFO_V1
260 int32 newval = PG_GETARG_INT32(0);
261 #endif
262 int32 oldval = debug;
263
264 debug = newval;
265
266 return (oldval);
267 }
268
269 #define ExtendBy 1024
270
271 static char *
272 OutputValue(char *key, char *buf, int size)
273 {
274 int i = 0;
275 char *out = buf;
276 char *subst = NULL;
277 int slen = 0;
278
279 size--;
280 for (;;)
281 {
282 switch (*key)
283 {
284 case '\\':
285 subst = "\\\\";
286 slen = 2;
287 break;
288 case ' ':
289 subst = "\\011";
290 slen = 4;
291 break;
292 case '\n':
293 subst = "\\012";
294 slen = 4;
295 break;
296 case '\'':
297 subst = "\\047";
298 slen = 4;
299 break;
300 case '\0':
301 out[i] = 0;
302 return (out);
303 default:
304 slen = 1;
305 break;
306 }
307
308 if (i + slen >= size)
309 {
310 if (out == buf)
311 {
312 out = (char *) palloc(size + ExtendBy);
313 strncpy(out, buf, i);
314 size += ExtendBy;
315 }
316 else
317 {
318 out = (char *) repalloc(out, size + ExtendBy);
319 size += ExtendBy;
320 }
321 }
322
323 if (slen == 1)
324 out[i++] = *key;
325 else
326 {
327 memcpy(out + i, subst, slen);
328 i += slen;
329 }
330 key++;
331 }
332
333 return (out);
334
335 }

  ViewVC Help
Powered by ViewVC 1.1.26