/[rserv]/lib/rserv.c
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /lib/rserv.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (hide annotations)
Wed Aug 6 16:14:53 2003 UTC (20 years, 8 months ago) by dpavlin
Branch: MAIN
CVS Tags: before_onlytables, before_multmaster, r_0_3
Changes since 1.2: +165 -217 lines
File MIME type: text/plain
code synced with rserv.c from 7.3.2 (so, hopefully, it will compile on
PostgreSQL 7.3.x version. It *WON'T* compile on PostgreSQL 7.4 (as of this
writing in development stage), but you can checkout lastest version from
PostgreSQL CVS, make diff and apply changes (as I have done, just to find
out that it doesn't compile on 7.3 -- oh, wonders of major version
relese...)

This code has also removed ASKFORUPDATE parts of code from Nélio Alves
Pereira Filho until I have chance to review what is that code doing.
However, it does include updates to logs (which are needed by other
utilities anyway)

1 dpavlin 1.1 /* rserv.c
2     * Support functions for erServer replication.
3     * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
4     */
5    
6     #include "executor/spi.h" /* this is what you need to work with SPI */
7     #include "commands/trigger.h" /* -"- and triggers */
8     #include "utils/tqual.h" /* -"- and SnapshotData */
9     #include <ctype.h> /* tolower () */
10    
11     #ifdef PG_FUNCTION_INFO_V1
12     #define CurrentTriggerData ((TriggerData *) fcinfo->context)
13     #endif
14    
15     #ifdef PG_FUNCTION_INFO_V1
16     PG_FUNCTION_INFO_V1(_rserv_log_);
17     PG_FUNCTION_INFO_V1(_rserv_sync_);
18     PG_FUNCTION_INFO_V1(_rserv_debug_);
19 dpavlin 1.2 Datum _rserv_log_(PG_FUNCTION_ARGS);
20     Datum _rserv_sync_(PG_FUNCTION_ARGS);
21     Datum _rserv_debug_(PG_FUNCTION_ARGS);
22    
23 dpavlin 1.1 #else
24     HeapTuple _rserv_log_(void);
25     int32 _rserv_sync_(int32);
26     int32 _rserv_debug_(int32);
27     #endif
28    
29     static int debug = 0;
30    
31 dpavlin 1.2 static char *OutputValue(char *key, char *buf, int size);
32 dpavlin 1.1
33     #ifdef PG_FUNCTION_INFO_V1
34 dpavlin 1.3 Datum
35     _rserv_log_(PG_FUNCTION_ARGS)
36 dpavlin 1.1 #else
37 dpavlin 1.3 HeapTuple
38     _rserv_log_()
39 dpavlin 1.1 #endif
40     {
41 dpavlin 1.3 Trigger *trigger; /* to get trigger name */
42     int nargs; /* # of args specified in CREATE TRIGGER */
43     char **args; /* argument: argnum */
44     Relation rel; /* triggered relation */
45     HeapTuple tuple; /* tuple to return */
46     HeapTuple newtuple = NULL; /* tuple to return */
47     TupleDesc tupdesc; /* tuple description */
48     int keynum;
49     char *key;
50     char *okey;
51     char *newkey = NULL;
52     int deleted, inserted, updated;
53     char sql[8192];
54     char outbuf[8192];
55     char oidbuf[64];
56     int ret;
57    
58     /* Called by trigger manager ? */
59     if (!CurrentTriggerData)
60     elog(ERROR, "_rserv_log_: triggers are not initialized");
61    
62     /* Should be called for ROW trigger */
63     if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
64     elog(ERROR, "_rserv_log_: can't process STATEMENT events");
65    
66     tuple = CurrentTriggerData->tg_trigtuple;
67    
68     trigger = CurrentTriggerData->tg_trigger;
69     nargs = trigger->tgnargs;
70     args = trigger->tgargs;
71    
72     if (nargs != 1) /* odd number of arguments! */
73     elog(ERROR, "_rserv_log_: need in *one* argument");
74    
75     keynum = atoi(args[0]);
76    
77     if (keynum < 0 && keynum != ObjectIdAttributeNumber)
78     elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
79    
80     rel = CurrentTriggerData->tg_relation;
81     tupdesc = rel->rd_att;
82    
83     deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
84     1 : 0;
85    
86     inserted = (TRIGGER_FIRED_BY_INSERT(CurrentTriggerData->tg_event)) ? 1 : 0;
87    
88     updated = 0;
89     if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) {
90     updated = 1;
91     newtuple = CurrentTriggerData->tg_newtuple;
92     }
93 dpavlin 1.2
94     #ifndef PG_FUNCTION_INFO_V1
95 dpavlin 1.3
96     /*
97     * Setting CurrentTriggerData to NULL prevents direct calls to trigger
98     * functions in queries. Normally, trigger functions have to be called
99     * by trigger manager code only.
100     */
101     CurrentTriggerData = NULL;
102 dpavlin 1.2 #endif
103 dpavlin 1.1
104 dpavlin 1.3 /* Connect to SPI manager */
105     if ((ret = SPI_connect()) < 0)
106     elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
107    
108     if (keynum == ObjectIdAttributeNumber)
109     {
110     snprintf(oidbuf, sizeof(oidbuf), "%u", HeapTupleGetOid(tuple));
111     key = oidbuf;
112     }
113     else
114     key = SPI_getvalue(tuple, tupdesc, keynum);
115    
116     if (key == NULL)
117     elog(ERROR, "_rserv_log_: key must be not null");
118    
119     if (newtuple && keynum != ObjectIdAttributeNumber)
120     {
121     newkey = SPI_getvalue(newtuple, tupdesc, keynum);
122     if (newkey == NULL)
123     elog(ERROR, "_rserv_log_: key must be not null");
124     if (strcmp(newkey, key) == 0)
125     newkey = NULL;
126     else
127     deleted = 1; /* old key was deleted */
128     }
129    
130     if (strpbrk(key, "\\ \n'"))
131     okey = OutputValue(key, outbuf, sizeof(outbuf));
132     else
133     okey = key;
134    
135     snprintf(sql, 8192, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
136     "insert = %d, update = %d, delete = %d where reloid = %u and key = '%s'",
137     GetCurrentTransactionId(), inserted, updated, deleted, rel->rd_id, okey);
138    
139     if (debug)
140     elog(DEBUG3, "sql: %s", sql);
141    
142     ret = SPI_exec(sql, 0);
143    
144     if (ret < 0)
145     elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
146    
147     /*
148     * If no tuple was UPDATEd then do INSERT...
149     */
150     if (SPI_processed > 1)
151     elog(ERROR, "_rserv_log_: duplicate tuples");
152     else if (SPI_processed == 0)
153     {
154     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
155     "(reloid, logid, logtime, insert, update, delete, key) "
156     "values (%u, %d, now(), %d, %d, %d, '%s')",
157     rel->rd_id, GetCurrentTransactionId(),
158     inserted, updated, deleted, okey);
159    
160     if (debug)
161     elog(DEBUG3, "sql: %s", sql);
162    
163     ret = SPI_exec(sql, 0);
164    
165     if (ret < 0)
166     elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
167     }
168    
169     if (okey != key && okey != outbuf)
170     pfree(okey);
171    
172     if (newkey)
173     {
174     if (strpbrk(newkey, "\\ \n'"))
175     okey = OutputValue(newkey, outbuf, sizeof(outbuf));
176     else
177     okey = newkey;
178    
179     snprintf(sql, 8192, "insert into _RSERV_LOG_ "
180     "(reloid, logid, logtime, insert, update, deleted, key) "
181     "values (%u, %d, now(), %d, %d, 0, '%s')",
182     rel->rd_id, GetCurrentTransactionId(), inserted, updated, okey);
183    
184     if (debug)
185     elog(DEBUG3, "sql: %s", sql);
186    
187     ret = SPI_exec(sql, 0);
188    
189     if (ret < 0)
190     elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
191    
192     if (okey != newkey && okey != outbuf)
193     pfree(okey);
194     }
195    
196     SPI_finish();
197    
198 dpavlin 1.1 #ifdef PG_FUNCTION_INFO_V1
199 dpavlin 1.3 return (PointerGetDatum(tuple));
200 dpavlin 1.1 #else
201 dpavlin 1.3 return (tuple);
202 dpavlin 1.1 #endif
203     }
204    
205     #ifdef PG_FUNCTION_INFO_V1
206     Datum
207     _rserv_sync_(PG_FUNCTION_ARGS)
208     #else
209     int32
210     _rserv_sync_(int32 server)
211     #endif
212     {
213     #ifdef PG_FUNCTION_INFO_V1
214 dpavlin 1.2 int32 server = PG_GETARG_INT32(0);
215 dpavlin 1.1 #endif
216 dpavlin 1.2 char sql[8192];
217     char buf[8192];
218     char *active = buf;
219     uint32 xcnt;
220     int ret;
221 dpavlin 1.1
222     if (SerializableSnapshot == NULL)
223     elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
224    
225     buf[0] = 0;
226     for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
227     {
228 dpavlin 1.3 snprintf(buf + strlen(buf), 8192 - strlen(buf),
229     "%s%u", (xcnt) ? ", " : "",
230     SerializableSnapshot->xip[xcnt]);
231 dpavlin 1.1 }
232    
233     if ((ret = SPI_connect()) < 0)
234     elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
235    
236 dpavlin 1.3 snprintf(sql, 8192, "insert into _RSERV_SYNC_ "
237     "(server, syncid, synctime, status, minid, maxid, active) "
238 dpavlin 1.2 "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
239 dpavlin 1.3 server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
240 dpavlin 1.1
241     ret = SPI_exec(sql, 0);
242    
243     if (ret < 0)
244     elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
245    
246     SPI_finish();
247    
248     return (0);
249     }
250    
251     #ifdef PG_FUNCTION_INFO_V1
252     Datum
253     _rserv_debug_(PG_FUNCTION_ARGS)
254     #else
255     int32
256     _rserv_debug_(int32 newval)
257     #endif
258     {
259     #ifdef PG_FUNCTION_INFO_V1
260 dpavlin 1.2 int32 newval = PG_GETARG_INT32(0);
261 dpavlin 1.1 #endif
262 dpavlin 1.2 int32 oldval = debug;
263 dpavlin 1.1
264     debug = newval;
265    
266     return (oldval);
267     }
268    
269 dpavlin 1.2 #define ExtendBy 1024
270 dpavlin 1.1
271 dpavlin 1.2 static char *
272 dpavlin 1.1 OutputValue(char *key, char *buf, int size)
273     {
274     int i = 0;
275     char *out = buf;
276     char *subst = NULL;
277     int slen = 0;
278    
279     size--;
280 dpavlin 1.2 for (;;)
281 dpavlin 1.1 {
282     switch (*key)
283     {
284 dpavlin 1.2 case '\\':
285     subst = "\\\\";
286     slen = 2;
287     break;
288     case ' ':
289     subst = "\\011";
290     slen = 4;
291     break;
292     case '\n':
293     subst = "\\012";
294     slen = 4;
295     break;
296     case '\'':
297     subst = "\\047";
298     slen = 4;
299     break;
300     case '\0':
301     out[i] = 0;
302     return (out);
303     default:
304     slen = 1;
305     break;
306 dpavlin 1.1 }
307    
308     if (i + slen >= size)
309     {
310     if (out == buf)
311     {
312 dpavlin 1.2 out = (char *) palloc(size + ExtendBy);
313 dpavlin 1.1 strncpy(out, buf, i);
314     size += ExtendBy;
315     }
316     else
317     {
318 dpavlin 1.2 out = (char *) repalloc(out, size + ExtendBy);
319 dpavlin 1.1 size += ExtendBy;
320     }
321     }
322    
323     if (slen == 1)
324     out[i++] = *key;
325     else
326     {
327     memcpy(out + i, subst, slen);
328     i += slen;
329     }
330     key++;
331     }
332    
333 dpavlin 1.2 return (out);
334 dpavlin 1.1
335     }

  ViewVC Help
Powered by ViewVC 1.1.26