Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c7d43c4

Browse filesBrowse files
committed
Correct attach/detach logic for FKs in partitions
There was no code to handle foreign key constraints on partitioned tables in the case of ALTER TABLE DETACH; and if you happened to ATTACH a partition that already had an equivalent constraint, that one was ignored and a new constraint was created. Adding this to the fact that foreign key cloning reuses the constraint name on the partition instead of generating a new name (as it probably should, to cater to SQL standard rules about constraint naming within schemas), the result was a pretty poor user experience -- the most visible failure was that just detaching a partition and re-attaching it failed with an error such as ERROR: duplicate key value violates unique constraint "pg_constraint_conrelid_contypid_conname_index" DETAIL: Key (conrelid, contypid, conname)=(26702, 0, test_result_asset_id_fkey) already exists. because it would try to create an identically-named constraint in the partition. To make matters worse, if you tried to drop the constraint in the now-independent partition, that would fail because the constraint was still seen as dependent on the constraint in its former parent partitioned table: ERROR: cannot drop inherited constraint "test_result_asset_id_fkey" of relation "test_result_cbsystem_0001_0050_monthly_2018_09" This fix attacks the problem from two angles: first, when the partition is detached, the constraint is also marked as independent, so the drop now works. Second, when the partition is re-attached, we scan existing constraints searching for one matching the FK in the parent, and if one exists, we link that one to the parent constraint. So we don't end up with a duplicate -- and better yet, we don't need to scan the referenced table to verify that the constraint holds. To implement this I made a small change to previously planner-only struct ForeignKeyCacheInfo to contain the constraint OID; also relcache now maintains the list of FKs for partitioned tables too. Backpatch to 11. Reported-by: Michael Vitale (bug #15425) Discussion: https://postgr.es/m/15425-2dbc9d2aa999f816@postgresql.org
1 parent f188538 commit c7d43c4
Copy full SHA for c7d43c4

File tree

Expand file treeCollapse file tree

8 files changed

+404
-39
lines changed
Filter options
Expand file treeCollapse file tree

8 files changed

+404
-39
lines changed

‎src/backend/catalog/pg_constraint.c

Copy file name to clipboardExpand all lines: src/backend/catalog/pg_constraint.c
+203-35Lines changed: 203 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "access/htup_details.h"
2020
#include "access/sysattr.h"
2121
#include "access/tupconvert.h"
22+
#include "access/xact.h"
2223
#include "catalog/dependency.h"
2324
#include "catalog/indexing.h"
2425
#include "catalog/objectaccess.h"
@@ -37,6 +38,10 @@
3738
#include "utils/tqual.h"
3839

3940

41+
static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
42+
Relation partRel, List *clone, List **cloned);
43+
44+
4045
/*
4146
* CreateConstraintEntry
4247
* Create a constraint table entry.
@@ -400,57 +405,106 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
400405
Relation rel;
401406
ScanKeyData key;
402407
SysScanDesc scan;
403-
TupleDesc tupdesc;
404408
HeapTuple tuple;
405-
AttrNumber *attmap;
409+
List *clone = NIL;
406410

407411
parentRel = heap_open(parentId, NoLock); /* already got lock */
408412
/* see ATAddForeignKeyConstraint about lock level */
409413
rel = heap_open(relationId, AccessExclusiveLock);
410-
411414
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
415+
416+
/* Obtain the list of constraints to clone or attach */
417+
ScanKeyInit(&key,
418+
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
419+
F_OIDEQ, ObjectIdGetDatum(parentId));
420+
scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
421+
NULL, 1, &key);
422+
while ((tuple = systable_getnext(scan)) != NULL)
423+
clone = lappend_oid(clone, HeapTupleGetOid(tuple));
424+
systable_endscan(scan);
425+
426+
/* Do the actual work, recursing to partitions as needed */
427+
clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
428+
429+
/* We're done. Clean up */
430+
heap_close(parentRel, NoLock);
431+
heap_close(rel, NoLock); /* keep lock till commit */
432+
heap_close(pg_constraint, RowShareLock);
433+
}
434+
435+
/*
436+
* clone_fk_constraints
437+
* Recursive subroutine for CloneForeignKeyConstraints
438+
*
439+
* Clone the given list of FK constraints when a partition is attached.
440+
*
441+
* When cloning foreign keys to a partition, it may happen that equivalent
442+
* constraints already exist in the partition for some of them. We can skip
443+
* creating a clone in that case, and instead just attach the existing
444+
* constraint to the one in the parent.
445+
*
446+
* This function recurses to partitions, if the new partition is partitioned;
447+
* of course, only do this for FKs that were actually cloned.
448+
*/
449+
static void
450+
clone_fk_constraints(Relation pg_constraint, Relation parentRel,
451+
Relation partRel, List *clone, List **cloned)
452+
{
453+
TupleDesc tupdesc;
454+
AttrNumber *attmap;
455+
List *partFKs;
456+
List *subclone = NIL;
457+
ListCell *cell;
458+
412459
tupdesc = RelationGetDescr(pg_constraint);
413460

414461
/*
415462
* The constraint key may differ, if the columns in the partition are
416463
* different. This map is used to convert them.
417464
*/
418-
attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
465+
attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
419466
RelationGetDescr(parentRel),
420467
gettext_noop("could not convert row type"));
421468

422-
ScanKeyInit(&key,
423-
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
424-
F_OIDEQ, ObjectIdGetDatum(parentId));
425-
scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
426-
NULL, 1, &key);
469+
partFKs = copyObject(RelationGetFKeyList(partRel));
427470

428-
while ((tuple = systable_getnext(scan)) != NULL)
471+
foreach(cell, clone)
429472
{
430-
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
473+
Oid parentConstrOid = lfirst_oid(cell);
474+
Form_pg_constraint constrForm;
475+
HeapTuple tuple;
431476
AttrNumber conkey[INDEX_MAX_KEYS];
432477
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
433478
AttrNumber confkey[INDEX_MAX_KEYS];
434479
Oid conpfeqop[INDEX_MAX_KEYS];
435480
Oid conppeqop[INDEX_MAX_KEYS];
436481
Oid conffeqop[INDEX_MAX_KEYS];
437482
Constraint *fkconstraint;
438-
ClonedConstraint *newc;
483+
bool attach_it;
439484
Oid constrOid;
440485
ObjectAddress parentAddr,
441486
childAddr;
442487
int nelem;
488+
ListCell *cell;
443489
int i;
444490
ArrayType *arr;
445491
Datum datum;
446492
bool isnull;
447493

494+
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
495+
if (!tuple)
496+
elog(ERROR, "cache lookup failed for constraint %u",
497+
parentConstrOid);
498+
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
499+
448500
/* only foreign keys */
449501
if (constrForm->contype != CONSTRAINT_FOREIGN)
502+
{
503+
ReleaseSysCache(tuple);
450504
continue;
505+
}
451506

452-
ObjectAddressSet(parentAddr, ConstraintRelationId,
453-
HeapTupleGetOid(tuple));
507+
ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
454508

455509
datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
456510
tupdesc, &isnull);
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
539593
elog(ERROR, "conffeqop is not a 1-D OID array");
540594
memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
541595

596+
/*
597+
* Before creating a new constraint, see whether any existing FKs are
598+
* fit for the purpose. If one is, attach the parent constraint to it,
599+
* and don't clone anything. This way we avoid the expensive
600+
* verification step and don't end up with a duplicate FK. This also
601+
* means we don't consider this constraint when recursing to
602+
* partitions.
603+
*/
604+
attach_it = false;
605+
foreach(cell, partFKs)
606+
{
607+
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
608+
Form_pg_constraint partConstr;
609+
HeapTuple partcontup;
610+
611+
attach_it = true;
612+
613+
/*
614+
* Do some quick & easy initial checks. If any of these fail, we
615+
* cannot use this constraint, but keep looking.
616+
*/
617+
if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
618+
{
619+
attach_it = false;
620+
continue;
621+
}
622+
for (i = 0; i < nelem; i++)
623+
{
624+
if (fk->conkey[i] != mapped_conkey[i] ||
625+
fk->confkey[i] != confkey[i] ||
626+
fk->conpfeqop[i] != conpfeqop[i])
627+
{
628+
attach_it = false;
629+
break;
630+
}
631+
}
632+
if (!attach_it)
633+
continue;
634+
635+
/*
636+
* Looks good so far; do some more extensive checks. Presumably
637+
* the check for 'convalidated' could be dropped, since we don't
638+
* really care about that, but let's be careful for now.
639+
*/
640+
partcontup = SearchSysCache1(CONSTROID,
641+
ObjectIdGetDatum(fk->conoid));
642+
if (!partcontup)
643+
elog(ERROR, "cache lookup failed for constraint %u",
644+
fk->conoid);
645+
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
646+
if (OidIsValid(partConstr->conparentid) ||
647+
!partConstr->convalidated ||
648+
partConstr->condeferrable != constrForm->condeferrable ||
649+
partConstr->condeferred != constrForm->condeferred ||
650+
partConstr->confupdtype != constrForm->confupdtype ||
651+
partConstr->confdeltype != constrForm->confdeltype ||
652+
partConstr->confmatchtype != constrForm->confmatchtype)
653+
{
654+
ReleaseSysCache(partcontup);
655+
attach_it = false;
656+
continue;
657+
}
658+
659+
ReleaseSysCache(partcontup);
660+
661+
/* looks good! Attach this constraint */
662+
ConstraintSetParentConstraint(fk->conoid,
663+
HeapTupleGetOid(tuple));
664+
CommandCounterIncrement();
665+
attach_it = true;
666+
break;
667+
}
668+
669+
/*
670+
* If we attached to an existing constraint, there is no need to
671+
* create a new one. In fact, there's no need to recurse for this
672+
* constraint to partitions, either.
673+
*/
674+
if (attach_it)
675+
{
676+
ReleaseSysCache(tuple);
677+
continue;
678+
}
679+
542680
constrOid =
543681
CreateConstraintEntry(NameStr(constrForm->conname),
544682
constrForm->connamespace,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
547685
constrForm->condeferred,
548686
constrForm->convalidated,
549687
HeapTupleGetOid(tuple),
550-
relationId,
688+
RelationGetRelid(partRel),
551689
mapped_conkey,
552690
nelem,
553691
nelem,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
568706
NULL,
569707
false,
570708
1, false, true);
709+
subclone = lappend_oid(subclone, constrOid);
571710

572711
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
573712
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
@@ -580,43 +719,56 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
580719
fkconstraint->deferrable = constrForm->condeferrable;
581720
fkconstraint->initdeferred = constrForm->condeferred;
582721

583-
createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
722+
createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
584723
constrOid, constrForm->conindid, false);
585724

586725
if (cloned)
587726
{
727+
ClonedConstraint *newc;
728+
588729
/*
589730
* Feed back caller about the constraints we created, so that they
590731
* can set up constraint verification.
591732
*/
592733
newc = palloc(sizeof(ClonedConstraint));
593-
newc->relid = relationId;
734+
newc->relid = RelationGetRelid(partRel);
594735
newc->refrelid = constrForm->confrelid;
595736
newc->conindid = constrForm->conindid;
596737
newc->conid = constrOid;
597738
newc->constraint = fkconstraint;
598739

599740
*cloned = lappend(*cloned, newc);
600741
}
742+
743+
ReleaseSysCache(tuple);
601744
}
602-
systable_endscan(scan);
603745

604746
pfree(attmap);
747+
list_free_deep(partFKs);
605748

606-
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
749+
/*
750+
* If the partition is partitioned, recurse to handle any constraints that
751+
* were cloned.
752+
*/
753+
if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
754+
subclone != NIL)
607755
{
608-
PartitionDesc partdesc = RelationGetPartitionDesc(rel);
756+
PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
609757
int i;
610758

611759
for (i = 0; i < partdesc->nparts; i++)
612-
CloneForeignKeyConstraints(RelationGetRelid(rel),
613-
partdesc->oids[i],
614-
cloned);
760+
{
761+
Relation childRel;
762+
763+
childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
764+
clone_fk_constraints(pg_constraint,
765+
partRel,
766+
childRel,
767+
subclone,
768+
cloned);
769+
heap_close(childRel, NoLock); /* keep lock till commit */
770+
}
615771
}
616-
617-
heap_close(rel, NoLock); /* keep lock till commit */
618-
heap_close(parentRel, NoLock);
619-
heap_close(pg_constraint, RowShareLock);
620772
}
621773

622774
/*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
10281180
elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
10291181
newtup = heap_copytuple(tuple);
10301182
constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
1031-
constrForm->conislocal = false;
1032-
constrForm->coninhcount++;
1033-
constrForm->conparentid = parentConstrId;
1034-
CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1035-
ReleaseSysCache(tuple);
1183+
if (OidIsValid(parentConstrId))
1184+
{
1185+
constrForm->conislocal = false;
1186+
constrForm->coninhcount++;
1187+
constrForm->conparentid = parentConstrId;
1188+
1189+
CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
10361190

1037-
ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
1038-
ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
1191+
ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
1192+
ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
10391193

1040-
recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
1194+
recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
1195+
}
1196+
else
1197+
{
1198+
constrForm->coninhcount--;
1199+
if (constrForm->coninhcount <= 0)
1200+
constrForm->conislocal = true;
1201+
constrForm->conparentid = InvalidOid;
1202+
1203+
deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
1204+
ConstraintRelationId,
1205+
DEPENDENCY_INTERNAL_AUTO);
1206+
CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
1207+
}
10411208

1209+
ReleaseSysCache(tuple);
10421210
heap_close(constrRel, RowExclusiveLock);
10431211
}
10441212

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.