Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit da324d6

Browse filesBrowse files
author
Amit Kapila
committed
Refactor pgoutput_change().
Instead of mostly-duplicate code for different operation (insert/update/delete) types, write a common code to compute old/new tuples, and check the row filter. Author: Hou Zhijie Reviewed-by: Peter Smith, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716194A47FFA8D91133687D94BF9@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 902ecd3 commit da324d6
Copy full SHA for da324d6

File tree

1 file changed

+79
-156
lines changed
Filter options

1 file changed

+79
-156
lines changed

‎src/backend/replication/pgoutput/pgoutput.c

Copy file name to clipboardExpand all lines: src/backend/replication/pgoutput/pgoutput.c
+79-156Lines changed: 79 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14401440
case REORDER_BUFFER_CHANGE_DELETE:
14411441
if (!relentry->pubactions.pubdelete)
14421442
return;
1443+
1444+
/*
1445+
* This is only possible if deletes are allowed even when replica
1446+
* identity is not defined for a table. Since the DELETE action
1447+
* can't be published, we simply return.
1448+
*/
1449+
if (!change->data.tp.oldtuple)
1450+
{
1451+
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1452+
return;
1453+
}
14431454
break;
14441455
default:
14451456
Assert(false);
@@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
14481459
/* Avoid leaking memory by using and resetting our own context */
14491460
old = MemoryContextSwitchTo(data->context);
14501461

1451-
/* Send the data */
1452-
switch (action)
1462+
/* Switch relation if publishing via root. */
1463+
if (relentry->publish_as_relid != RelationGetRelid(relation))
14531464
{
1454-
case REORDER_BUFFER_CHANGE_INSERT:
1455-
new_slot = relentry->new_slot;
1456-
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1457-
new_slot, false);
1458-
1459-
/* Switch relation if publishing via root. */
1460-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1461-
{
1462-
Assert(relation->rd_rel->relispartition);
1463-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1464-
targetrel = ancestor;
1465-
/* Convert tuple if needed. */
1466-
if (relentry->attrmap)
1467-
{
1468-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1469-
1470-
new_slot = execute_attr_map_slot(relentry->attrmap,
1471-
new_slot,
1472-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1473-
}
1474-
}
1475-
1476-
/* Check row filter */
1477-
if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
1478-
&action))
1479-
break;
1480-
1481-
/*
1482-
* Send BEGIN if we haven't yet.
1483-
*
1484-
* We send the BEGIN message after ensuring that we will actually
1485-
* send the change. This avoids sending a pair of BEGIN/COMMIT
1486-
* messages for empty transactions.
1487-
*/
1488-
if (txndata && !txndata->sent_begin_txn)
1489-
pgoutput_send_begin(ctx, txn);
1490-
1491-
/*
1492-
* Schema should be sent using the original relation because it
1493-
* also sends the ancestor's relation.
1494-
*/
1495-
maybe_send_schema(ctx, change, relation, relentry);
1465+
Assert(relation->rd_rel->relispartition);
1466+
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1467+
targetrel = ancestor;
1468+
}
14961469

1497-
OutputPluginPrepareWrite(ctx, true);
1498-
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1499-
data->binary, relentry->columns);
1500-
OutputPluginWrite(ctx, true);
1501-
break;
1502-
case REORDER_BUFFER_CHANGE_UPDATE:
1503-
if (change->data.tp.oldtuple)
1504-
{
1505-
old_slot = relentry->old_slot;
1506-
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1507-
old_slot, false);
1508-
}
1470+
if (change->data.tp.oldtuple)
1471+
{
1472+
old_slot = relentry->old_slot;
1473+
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
15091474

1510-
new_slot = relentry->new_slot;
1511-
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1512-
new_slot, false);
1475+
/* Convert tuple if needed. */
1476+
if (relentry->attrmap)
1477+
{
1478+
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1479+
&TTSOpsVirtual);
15131480

1514-
/* Switch relation if publishing via root. */
1515-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1516-
{
1517-
Assert(relation->rd_rel->relispartition);
1518-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1519-
targetrel = ancestor;
1520-
/* Convert tuples if needed. */
1521-
if (relentry->attrmap)
1522-
{
1523-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1481+
old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1482+
}
1483+
}
15241484

1525-
if (old_slot)
1526-
old_slot = execute_attr_map_slot(relentry->attrmap,
1527-
old_slot,
1528-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1485+
if (change->data.tp.newtuple)
1486+
{
1487+
new_slot = relentry->new_slot;
1488+
ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
15291489

1530-
new_slot = execute_attr_map_slot(relentry->attrmap,
1531-
new_slot,
1532-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1533-
}
1534-
}
1490+
/* Convert tuple if needed. */
1491+
if (relentry->attrmap)
1492+
{
1493+
TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1494+
&TTSOpsVirtual);
15351495

1536-
/* Check row filter */
1537-
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1538-
relentry, &action))
1539-
break;
1496+
new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1497+
}
1498+
}
15401499

1541-
/* Send BEGIN if we haven't yet */
1542-
if (txndata && !txndata->sent_begin_txn)
1543-
pgoutput_send_begin(ctx, txn);
1500+
/*
1501+
* Check row filter.
1502+
*
1503+
* Updates could be transformed to inserts or deletes based on the results
1504+
* of the row filter for old and new tuple.
1505+
*/
1506+
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1507+
goto cleanup;
15441508

1545-
maybe_send_schema(ctx, change, relation, relentry);
1509+
/*
1510+
* Send BEGIN if we haven't yet.
1511+
*
1512+
* We send the BEGIN message after ensuring that we will actually send the
1513+
* change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514+
* transactions.
1515+
*/
1516+
if (txndata && !txndata->sent_begin_txn)
1517+
pgoutput_send_begin(ctx, txn);
15461518

1547-
OutputPluginPrepareWrite(ctx, true);
1519+
/*
1520+
* Schema should be sent using the original relation because it also sends
1521+
* the ancestor's relation.
1522+
*/
1523+
maybe_send_schema(ctx, change, relation, relentry);
15481524

1549-
/*
1550-
* Updates could be transformed to inserts or deletes based on the
1551-
* results of the row filter for old and new tuple.
1552-
*/
1553-
switch (action)
1554-
{
1555-
case REORDER_BUFFER_CHANGE_INSERT:
1556-
logicalrep_write_insert(ctx->out, xid, targetrel,
1557-
new_slot, data->binary,
1558-
relentry->columns);
1559-
break;
1560-
case REORDER_BUFFER_CHANGE_UPDATE:
1561-
logicalrep_write_update(ctx->out, xid, targetrel,
1562-
old_slot, new_slot, data->binary,
1563-
relentry->columns);
1564-
break;
1565-
case REORDER_BUFFER_CHANGE_DELETE:
1566-
logicalrep_write_delete(ctx->out, xid, targetrel,
1567-
old_slot, data->binary,
1568-
relentry->columns);
1569-
break;
1570-
default:
1571-
Assert(false);
1572-
}
1525+
OutputPluginPrepareWrite(ctx, true);
15731526

1574-
OutputPluginWrite(ctx, true);
1527+
/* Send the data */
1528+
switch (action)
1529+
{
1530+
case REORDER_BUFFER_CHANGE_INSERT:
1531+
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1532+
data->binary, relentry->columns);
1533+
break;
1534+
case REORDER_BUFFER_CHANGE_UPDATE:
1535+
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1536+
new_slot, data->binary, relentry->columns);
15751537
break;
15761538
case REORDER_BUFFER_CHANGE_DELETE:
1577-
if (change->data.tp.oldtuple)
1578-
{
1579-
old_slot = relentry->old_slot;
1580-
1581-
ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1582-
old_slot, false);
1583-
1584-
/* Switch relation if publishing via root. */
1585-
if (relentry->publish_as_relid != RelationGetRelid(relation))
1586-
{
1587-
Assert(relation->rd_rel->relispartition);
1588-
ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1589-
targetrel = ancestor;
1590-
/* Convert tuple if needed. */
1591-
if (relentry->attrmap)
1592-
{
1593-
TupleDesc tupdesc = RelationGetDescr(targetrel);
1594-
1595-
old_slot = execute_attr_map_slot(relentry->attrmap,
1596-
old_slot,
1597-
MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1598-
}
1599-
}
1600-
1601-
/* Check row filter */
1602-
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1603-
relentry, &action))
1604-
break;
1605-
1606-
/* Send BEGIN if we haven't yet */
1607-
if (txndata && !txndata->sent_begin_txn)
1608-
pgoutput_send_begin(ctx, txn);
1609-
1610-
maybe_send_schema(ctx, change, relation, relentry);
1611-
1612-
OutputPluginPrepareWrite(ctx, true);
1613-
logicalrep_write_delete(ctx->out, xid, targetrel,
1614-
old_slot, data->binary,
1615-
relentry->columns);
1616-
OutputPluginWrite(ctx, true);
1617-
}
1618-
else
1619-
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1539+
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1540+
data->binary, relentry->columns);
16201541
break;
16211542
default:
16221543
Assert(false);
16231544
}
16241545

1546+
OutputPluginWrite(ctx, true);
1547+
1548+
cleanup:
16251549
if (RelationIsValid(ancestor))
16261550
{
16271551
RelationClose(ancestor);
16281552
ancestor = NULL;
16291553
}
16301554

1631-
/* Cleanup */
16321555
MemoryContextSwitchTo(old);
16331556
MemoryContextReset(data->context);
16341557
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.