/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.4 - (hide annotations)
Tue Oct 28 18:43:15 2003 UTC (20 years, 5 months ago) by dpavlin
Branch: MAIN
Changes since 1.3: +13 -11 lines
File MIME type: text/plain
add server number in _rserv_log_

1 dpavlin 1.1 /* rserv.c
2     * Support functions for erServer replication.
3     * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4     */
5    
6 dpavlin 1.4 #include "executor/spi.h" /* this is what you need to work with SPI */
7 dpavlin 1.1 #include "commands/trigger.h" /* -"- and triggers */
8 dpavlin 1.4 #include "utils/tqual.h" /* -"- and SnapshotData */
9     #include <ctype.h> /* tolower () */
10 dpavlin 1.1
11     #ifdef PG_FUNCTION_INFO_V1
12     #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13     #endif
14    
15     #ifdef PG_FUNCTION_INFO_V1
16     PG_FUNCTION_INFO_V1(_rserv_log_);
17     PG_FUNCTION_INFO_V1(_rserv_sync_);
18     PG_FUNCTION_INFO_V1(_rserv_debug_);
19 dpavlin 1.2 Datum _rserv_log_(PG_FUNCTION_ARGS);
20     Datum _rserv_sync_(PG_FUNCTION_ARGS);
21     Datum _rserv_debug_(PG_FUNCTION_ARGS);
22    
23 dpavlin 1.1 #else
24     HeapTuple _rserv_log_(void);
25     int32 _rserv_sync_(int32);
26     int32 _rserv_debug_(int32);
27     #endif
28    
29     static int debug = 0;
30    
31 dpavlin 1.2 static char *OutputValue(char *key, char *buf, int size);
32 dpavlin 1.1
33     #ifdef PG_FUNCTION_INFO_V1
34 dpavlin 1.3 Datum
35     _rserv_log_(PG_FUNCTION_ARGS)
36 dpavlin 1.1 #else
37 dpavlin 1.3 HeapTuple
38     _rserv_log_()
39 dpavlin 1.1 #endif
40     {
41 dpavlin 1.3 Trigger *trigger; /* to get trigger name */
42     int nargs; /* # of args specified in CREATE TRIGGER */
43     char **args; /* argument: argnum */
44     Relation rel; /* triggered relation */
45     HeapTuple tuple; /* tuple to return */
46     HeapTuple newtuple = NULL; /* tuple to return */
47     TupleDesc tupdesc; /* tuple description */
48     int keynum;
49     char *key;
50     char *okey;
51     char *newkey = NULL;
52     int deleted, inserted, updated;
53     char sql[8192];
54     char outbuf[8192];
55     char oidbuf[64];
56     int ret;
57 dpavlin 1.4 int server;
58 dpavlin 1.3
59     /* Called by trigger manager ? */
60     if (!CurrentTriggerData)
61     elog(ERROR, "_rserv_log_: triggers are not initialized");
62    
63     /* Should be called for ROW trigger */
64     if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
65     elog(ERROR, "_rserv_log_: can't process STATEMENT events");
66    
67     tuple = CurrentTriggerData->tg_trigtuple;
68    
69     trigger = CurrentTriggerData->tg_trigger;
70     nargs = trigger->tgnargs;
71     args = trigger->tgargs;
72    
73 dpavlin 1.4 if (nargs != 2) /* odd number of arguments! */
74     elog(ERROR, "_rserv_log_: need in *two* arguments, key number and server number");
75 dpavlin 1.3
76     keynum = atoi(args[0]);
77 dpavlin 1.4 server = atoi(args[1]);
78 dpavlin 1.3
79     if (keynum < 0 && keynum != ObjectIdAttributeNumber)
80     elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
81    
82     rel = CurrentTriggerData->tg_relation;
83     tupdesc = rel->rd_att;
84    
85     deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
86     1 : 0;
87    
88     inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
89    
90     updated = 0;
91     if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
92     updated = 1;
93     newtuple = CurrentTriggerData->tg_newtuple;
94     }
95 dpavlin 1.2
96     #ifndef PG_FUNCTION_INFO_V1
97 dpavlin 1.3
98     /*
99     * Setting CurrentTriggerData to NULL prevents direct calls to trigger
100     * functions in queries. Normally, trigger functions have to be called
101     * by trigger manager code only.
102     */
103     CurrentTriggerData = NULL;
104 dpavlin 1.2 #endif
105 dpavlin 1.1
106 dpavlin 1.3 /* Connect to SPI manager */
107     if ((ret = SPI_connect()) < 0)
108     elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
109    
110     if (keynum == ObjectIdAttributeNumber)
111     {
112     snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
113     key = oidbuf;
114     }
115     else
116     key = SPI_getvalue(tuple, tupdesc, keynum);
117    
118     if (key == NULL)
119     elog(ERROR, "_rserv_log_: key must be not null");
120    
121     if (newtuple && keynum != ObjectIdAttributeNumber)
122     {
123     newkey = SPI_getvalue(newtuple, tupdesc, keynum);
124     if (newkey == NULL)
125     elog(ERROR, "_rserv_log_: key must be not null");
126     if (strcmp(newkey, key) == 0)
127     newkey = NULL;
128     else
129     deleted = 1; /* old key was deleted */
130     }
131    
132     if (strpbrk(key, "\\ \n'"))
133     okey = OutputValue(key, outbuf, sizeof(outbuf));
134     else
135     okey = key;
136    
137     snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
138     "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
139     GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
140    
141     if (debug)
142     elog(DEBUG3, "sql: %s", sql);
143    
144     ret = SPI_exec(sql, 0);
145    
146     if (ret < 0)
147     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
148    
149     /*
150     * If no tuple was UPDATEd then do INSERT...
151     */
152     if (SPI_processed > 1)
153     elog(ERROR, "_rserv_log_: duplicate tuples");
154     else if (SPI_processed == 0)
155     {
156     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
157 dpavlin 1.4 "(reloid, logid, logtime, insert, update, delete, key, server) "
158     "values (%u, %d, now(), %d, %d, %d, '%s', %d)",
159 dpavlin 1.3 rel->rd_id, GetCurrentTransactionId(),
160 dpavlin 1.4 inserted, updated, deleted, okey, server);
161 dpavlin 1.3
162     if (debug)
163     elog(DEBUG3, "sql: %s", sql);
164    
165     ret = SPI_exec(sql, 0);
166    
167     if (ret < 0)
168     elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
169     }
170    
171     if (okey != key && okey != outbuf)
172     pfree(okey);
173    
174     if (newkey)
175     {
176     if (strpbrk(newkey, "\\ \n'"))
177     okey = OutputValue(newkey, outbuf, sizeof(outbuf));
178     else
179     okey = newkey;
180    
181     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
182 dpavlin 1.4 "(reloid, logid, logtime, insert, update, deleted, key, server) "
183     "values (%u, %d, now(), %d, %d, 0, '%s', %d)",
184     rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey, server);
185 dpavlin 1.3
186     if (debug)
187     elog(DEBUG3, "sql: %s", sql);
188    
189     ret = SPI_exec(sql, 0);
190    
191     if (ret < 0)
192     elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
193    
194     if (okey != newkey && okey != outbuf)
195     pfree(okey);
196     }
197    
198     SPI_finish();
199    
200 dpavlin 1.1 #ifdef PG_FUNCTION_INFO_V1
201 dpavlin 1.3 return (PointerGetDatum(tuple));
202 dpavlin 1.1 #else
203 dpavlin 1.3 return (tuple);
204 dpavlin 1.1 #endif
205     }
206    
207     #ifdef PG_FUNCTION_INFO_V1
208     Datum
209     _rserv_sync_(PG_FUNCTION_ARGS)
210     #else
211     int32
212     _rserv_sync_(int32 server)
213     #endif
214     {
215     #ifdef PG_FUNCTION_INFO_V1
216 dpavlin 1.2 int32 server = PG_GETARG_INT32(0);
217 dpavlin 1.1 #endif
218 dpavlin 1.2 char sql[8192];
219     char buf[8192];
220     char *active = buf;
221     uint32 xcnt;
222     int ret;
223 dpavlin 1.1
224     if (SerializableSnapshot == NULL)
225     elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
226    
227     buf[0] = 0;
228     for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
229     {
230 dpavlin 1.3 snprintf(buf + strlen(buf), 8192 - strlen(buf),
231     "%s%u", (xcnt) ? ", " : "",
232     SerializableSnapshot->xip[xcnt]);
233 dpavlin 1.1 }
234    
235     if ((ret = SPI_connect()) < 0)
236     elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
237    
238 dpavlin 1.3 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
239     "(server, syncid, synctime, status, minid, maxid, active) "
240 dpavlin 1.2 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
241 dpavlin 1.3 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
242 dpavlin 1.1
243     ret = SPI_exec(sql, 0);
244    
245     if (ret < 0)
246     elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
247    
248     SPI_finish();
249    
250     return (0);
251     }
252    
253     #ifdef PG_FUNCTION_INFO_V1
254     Datum
255     _rserv_debug_(PG_FUNCTION_ARGS)
256     #else
257     int32
258     _rserv_debug_(int32 newval)
259     #endif
260     {
261     #ifdef PG_FUNCTION_INFO_V1
262 dpavlin 1.2 int32 newval = PG_GETARG_INT32(0);
263 dpavlin 1.1 #endif
264 dpavlin 1.2 int32 oldval = debug;
265 dpavlin 1.1
266     debug = newval;
267    
268     return (oldval);
269     }
270    
271 dpavlin 1.2 #define ExtendBy 1024
272 dpavlin 1.1
273 dpavlin 1.2 static char *
274 dpavlin 1.1 OutputValue(char *key, char *buf, int size)
275     {
276     int i = 0;
277     char *out = buf;
278     char *subst = NULL;
279     int slen = 0;
280    
281     size--;
282 dpavlin 1.2 for (;;)
283 dpavlin 1.1 {
284     switch (*key)
285     {
286 dpavlin 1.2 case '\\':
287     subst = "\\\\";
288     slen = 2;
289     break;
290     case ' ':
291     subst = "\\011";
292     slen = 4;
293     break;
294     case '\n':
295     subst = "\\012";
296     slen = 4;
297     break;
298     case '\'':
299     subst = "\\047";
300     slen = 4;
301     break;
302     case '\0':
303     out[i] = 0;
304     return (out);
305     default:
306     slen = 1;
307     break;
308 dpavlin 1.1 }
309    
310     if (i + slen >= size)
311     {
312     if (out == buf)
313     {
314 dpavlin 1.2 out = (char *) palloc(size + ExtendBy);
315 dpavlin 1.1 strncpy(out, buf, i);
316     size += ExtendBy;
317     }
318     else
319     {
320 dpavlin 1.2 out = (char *) repalloc(out, size + ExtendBy);
321 dpavlin 1.1 size += ExtendBy;
322     }
323     }
324    
325     if (slen == 1)
326     out[i++] = *key;
327     else
328     {
329     memcpy(out + i, subst, slen);
330     i += slen;
331     }
332     key++;
333     }
334    
335 dpavlin 1.2 return (out);
336 dpavlin 1.1
337     }

  ViewVC Help
Powered by ViewVC 1.1.26