Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0bead9a

Browse filesBrowse files
author
Amit Kapila
committed
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead) only when wal_level=logical. We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required for avoiding overflow in the hot standby snapshot. Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent d05b172 commit 0bead9a
Copy full SHA for 0bead9a

File tree

Expand file treeCollapse file tree

9 files changed

+108
-24
lines changed
Filter options
Expand file treeCollapse file tree

9 files changed

+108
-24
lines changed

‎src/backend/access/transam/xact.c

Copy file name to clipboardExpand all lines: src/backend/access/transam/xact.c
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
191191
bool didLogXid; /* has xid been included in WAL record? */
192192
int parallelModeLevel; /* Enter/ExitParallelMode counter */
193193
bool chain; /* start a new block after this one */
194+
bool assigned; /* assigned to top-level XID */
194195
struct TransactionStateData *parent; /* back link to parent */
195196
} TransactionStateData;
196197

@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
223224
static TransactionStateData TopTransactionStateData = {
224225
.state = TRANS_DEFAULT,
225226
.blockState = TBLOCK_DEFAULT,
227+
.assigned = false,
226228
};
227229

228230
/*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
51205122
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
51215123
s->prevXactReadOnly = XactReadOnly;
51225124
s->parallelModeLevel = 0;
5125+
s->assigned = false;
51235126

51245127
CurrentTransactionState = s;
51255128

@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
60226025
else
60236026
elog(PANIC, "xact_redo: unknown op code %u", info);
60246027
}
6028+
6029+
/*
6030+
* IsSubTransactionAssignmentPending
6031+
*
6032+
* This is used to decide whether we need to WAL log the top-level XID for
6033+
* operation in a subtransaction. We require that for logical decoding, see
6034+
* LogicalDecodingProcessRecord.
6035+
*
6036+
* This returns true if wal_level >= logical and we are inside a valid
6037+
* subtransaction, for which the assignment was not yet written to any WAL
6038+
* record.
6039+
*/
6040+
bool
6041+
IsSubTransactionAssignmentPending(void)
6042+
{
6043+
/* wal_level has to be logical */
6044+
if (!XLogLogicalInfoActive())
6045+
return false;
6046+
6047+
/* we need to be in a transaction state */
6048+
if (!IsTransactionState())
6049+
return false;
6050+
6051+
/* it has to be a subtransaction */
6052+
if (!IsSubTransaction())
6053+
return false;
6054+
6055+
/* the subtransaction has to have a XID assigned */
6056+
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
6057+
return false;
6058+
6059+
/* and it should not be already 'assigned' */
6060+
return !CurrentTransactionState->assigned;
6061+
}
6062+
6063+
/*
6064+
* MarkSubTransactionAssigned
6065+
*
6066+
* Mark the subtransaction assignment as completed.
6067+
*/
6068+
void
6069+
MarkSubTransactionAssigned(void)
6070+
{
6071+
Assert(IsSubTransactionAssignmentPending());
6072+
6073+
CurrentTransactionState->assigned = true;
6074+
}

‎src/backend/access/transam/xloginsert.c

Copy file name to clipboardExpand all lines: src/backend/access/transam/xloginsert.c
+21-2Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
8989
static char *hdr_scratch = NULL;
9090

9191
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
92+
#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
9293

9394
#define HEADER_SCRATCH_SIZE \
9495
(SizeOfXLogRecord + \
9596
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
96-
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
97+
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
98+
SizeOfXLogTransactionId)
9799

98100
/*
99101
* An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
195197
{
196198
int i;
197199

200+
/* reset the subxact assignment flag (if needed) */
201+
if (curinsert_flags & XLOG_INCLUDE_XID)
202+
MarkSubTransactionAssigned();
203+
198204
for (i = 0; i < max_registered_block_id; i++)
199205
registered_buffers[i].in_use = false;
200206

@@ -398,7 +404,7 @@ void
398404
XLogSetRecordFlags(uint8 flags)
399405
{
400406
Assert(begininsert_called);
401-
curinsert_flags = flags;
407+
curinsert_flags |= flags;
402408
}
403409

404410
/*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
748754
scratch += sizeof(replorigin_session_origin);
749755
}
750756

757+
/* followed by toplevel XID, if not already included in previous record */
758+
if (IsSubTransactionAssignmentPending())
759+
{
760+
TransactionId xid = GetTopTransactionIdIfAny();
761+
762+
/* update the flag (later used by XLogResetInsertion) */
763+
XLogSetRecordFlags(XLOG_INCLUDE_XID);
764+
765+
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
766+
memcpy(scratch, &xid, sizeof(TransactionId));
767+
scratch += sizeof(TransactionId);
768+
}
769+
751770
/* followed by main data, if any */
752771
if (mainrdata_len > 0)
753772
{

‎src/backend/access/transam/xlogreader.c

Copy file name to clipboardExpand all lines: src/backend/access/transam/xlogreader.c
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
11971197

11981198
state->decoded_record = record;
11991199
state->record_origin = InvalidRepOriginId;
1200+
state->toplevel_xid = InvalidTransactionId;
12001201

12011202
ptr = (char *) record;
12021203
ptr += SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
12351236
{
12361237
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
12371238
}
1239+
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
1240+
{
1241+
COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
1242+
}
12381243
else if (block_id <= XLR_MAX_BLOCK_ID)
12391244
{
12401245
/* XLogRecordBlockHeader */

‎src/backend/replication/logical/decode.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/decode.c
+23-21Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,27 @@ void
9494
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
9595
{
9696
XLogRecordBuffer buf;
97+
TransactionId txid;
9798

9899
buf.origptr = ctx->reader->ReadRecPtr;
99100
buf.endptr = ctx->reader->EndRecPtr;
100101
buf.record = record;
101102

103+
txid = XLogRecGetTopXid(record);
104+
105+
/*
106+
* If the top-level xid is valid, we need to assign the subxact to the
107+
* top-level xact. We need to do this for all records, hence we do it
108+
* before the switch.
109+
*/
110+
if (TransactionIdIsValid(txid))
111+
{
112+
ReorderBufferAssignChild(ctx->reorder,
113+
txid,
114+
record->decoded_record->xl_xid,
115+
buf.origptr);
116+
}
117+
102118
/* cast so we get a warning when new rmgrs are added */
103119
switch ((RmgrId) XLogRecGetRmid(record))
104120
{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
216232
/*
217233
* If the snapshot isn't yet fully built, we cannot decode anything, so
218234
* bail out.
219-
*
220-
* However, it's critical to process XLOG_XACT_ASSIGNMENT records even
221-
* when the snapshot is being built: it is possible to get later records
222-
* that require subxids to be properly assigned.
223235
*/
224-
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
225-
info != XLOG_XACT_ASSIGNMENT)
236+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
226237
return;
227238

228239
switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
264275
break;
265276
}
266277
case XLOG_XACT_ASSIGNMENT:
267-
{
268-
xl_xact_assignment *xlrec;
269-
int i;
270-
TransactionId *sub_xid;
271278

272-
xlrec = (xl_xact_assignment *) XLogRecGetData(r);
273-
274-
sub_xid = &xlrec->xsub[0];
275-
276-
for (i = 0; i < xlrec->nsubxacts; i++)
277-
{
278-
ReorderBufferAssignChild(reorder, xlrec->xtop,
279-
*(sub_xid++), buf->origptr);
280-
}
281-
break;
282-
}
279+
/*
280+
* We assign subxact to the toplevel xact while processing each
281+
* record if required. So, we don't need to do anything here.
282+
* See LogicalDecodingProcessRecord.
283+
*/
284+
break;
283285
case XLOG_XACT_PREPARE:
284286

285287
/*

‎src/include/access/xact.h

Copy file name to clipboardExpand all lines: src/include/access/xact.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
428428
extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
429429
extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
430430

431+
extern bool IsSubTransactionAssignmentPending(void);
432+
extern void MarkSubTransactionAssigned(void);
433+
431434
extern int xactGetCommittedChildren(TransactionId **ptr);
432435

433436
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,

‎src/include/access/xlog.h

Copy file name to clipboardExpand all lines: src/include/access/xlog.h
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
237237
*/
238238
#define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */
239239
#define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */
240+
#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */
240241

241242

242243
/* Checkpoint statistics */

‎src/include/access/xlog_internal.h

Copy file name to clipboardExpand all lines: src/include/access/xlog_internal.h
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */
34+
#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
3535

3636
typedef struct XLogPageHeaderData
3737
{

‎src/include/access/xlogreader.h

Copy file name to clipboardExpand all lines: src/include/access/xlogreader.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ struct XLogReaderState
191191

192192
RepOriginId record_origin;
193193

194+
TransactionId toplevel_xid; /* XID of top-level transaction */
195+
194196
/* information about blocks referenced by the record. */
195197
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
196198

@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
304306
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
305307
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
306308
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
309+
#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
307310
#define XLogRecGetData(decoder) ((decoder)->main_data)
308311
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
309312
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)

‎src/include/access/xlogrecord.h

Copy file name to clipboardExpand all lines: src/include/access/xlogrecord.h
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
223223
#define XLR_BLOCK_ID_DATA_SHORT 255
224224
#define XLR_BLOCK_ID_DATA_LONG 254
225225
#define XLR_BLOCK_ID_ORIGIN 253
226+
#define XLR_BLOCK_ID_TOPLEVEL_XID 252
226227

227228
#endif /* XLOGRECORD_H */

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.