Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3de241d

Browse filesBrowse files
committed
Foreign keys on partitioned tables
Author: Álvaro Herrera Discussion: https://postgr.es/m/20171231194359.cvojcour423ulha4@alvherre.pgsql Reviewed-by: Peter Eisentraut
1 parent 857f9c3 commit 3de241d
Copy full SHA for 3de241d

File tree

Expand file treeCollapse file tree

17 files changed

+895
-109
lines changed
Filter options
Expand file treeCollapse file tree

17 files changed

+895
-109
lines changed

‎doc/src/sgml/ref/alter_table.sgml

Copy file name to clipboardExpand all lines: doc/src/sgml/ref/alter_table.sgml
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
368368
specified check constraints). But the
369369
database will not assume that the constraint holds for all rows in
370370
the table, until it is validated by using the <literal>VALIDATE
371-
CONSTRAINT</literal> option.
371+
CONSTRAINT</literal> option. Foreign key constraints on partitioned
372+
tables may not be declared <literal>NOT VALID</literal> at present.
372373
</para>
373374

374375
<para>

‎doc/src/sgml/ref/create_table.sgml

Copy file name to clipboardExpand all lines: doc/src/sgml/ref/create_table.sgml
+9-4Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,12 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
546546
</para>
547547

548548
<para>
549-
Partitioned tables do not support <literal>EXCLUDE</literal> or
550-
<literal>FOREIGN KEY</literal> constraints; however, you can define
551-
these constraints on individual partitions.
549+
Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
550+
however, you can define these constraints on individual partitions.
551+
Also, while it's possible to define <literal>PRIMARY KEY</literal>
552+
constraints on partitioned tables, it is not supported to create foreign
553+
keys cannot that reference them. This restriction will be lifted in a
554+
future release.
552555
</para>
553556

554557
</listitem>
@@ -907,7 +910,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
907910
must have <literal>REFERENCES</literal> permission on the referenced table
908911
(either the whole table, or the specific referenced columns).
909912
Note that foreign key constraints cannot be defined between temporary
910-
tables and permanent tables.
913+
tables and permanent tables. Also note that while it is possible to
914+
define a foreign key on a partitioned table, it is not possible to
915+
declare a foreign key that references a partitioned table.
911916
</para>
912917

913918
<para>

‎src/backend/catalog/pg_constraint.c

Copy file name to clipboardExpand all lines: src/backend/catalog/pg_constraint.c
+237Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "catalog/pg_operator.h"
2727
#include "catalog/pg_type.h"
2828
#include "commands/defrem.h"
29+
#include "commands/tablecmds.h"
2930
#include "utils/array.h"
3031
#include "utils/builtins.h"
3132
#include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
377378
return conOid;
378379
}
379380

381+
/*
382+
* CloneForeignKeyConstraints
383+
* Clone foreign keys from a partitioned table to a newly acquired
384+
* partition.
385+
*
386+
* relationId is a partition of parentId, so we can be certain that it has the
387+
* same columns with the same datatypes. The columns may be in different
388+
* order, though.
389+
*
390+
* The *cloned list is appended ClonedConstraint elements describing what was
391+
* created.
392+
*/
393+
void
394+
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
395+
{
396+
Relation pg_constraint;
397+
Relation parentRel;
398+
Relation rel;
399+
ScanKeyData key;
400+
SysScanDesc scan;
401+
TupleDesc tupdesc;
402+
HeapTuple tuple;
403+
AttrNumber *attmap;
404+
405+
parentRel = heap_open(parentId, NoLock); /* already got lock */
406+
/* see ATAddForeignKeyConstraint about lock level */
407+
rel = heap_open(relationId, AccessExclusiveLock);
408+
409+
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
410+
tupdesc = RelationGetDescr(pg_constraint);
411+
412+
/*
413+
* The constraint key may differ, if the columns in the partition are
414+
* different. This map is used to convert them.
415+
*/
416+
attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
417+
RelationGetDescr(parentRel),
418+
gettext_noop("could not convert row type"));
419+
420+
ScanKeyInit(&key,
421+
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
422+
F_OIDEQ, ObjectIdGetDatum(parentId));
423+
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
424+
NULL, 1, &key);
425+
426+
while ((tuple = systable_getnext(scan)) != NULL)
427+
{
428+
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
429+
AttrNumber conkey[INDEX_MAX_KEYS];
430+
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
431+
AttrNumber confkey[INDEX_MAX_KEYS];
432+
Oid conpfeqop[INDEX_MAX_KEYS];
433+
Oid conppeqop[INDEX_MAX_KEYS];
434+
Oid conffeqop[INDEX_MAX_KEYS];
435+
Constraint *fkconstraint;
436+
ClonedConstraint *newc;
437+
Oid constrOid;
438+
ObjectAddress parentAddr,
439+
childAddr;
440+
int nelem;
441+
int i;
442+
ArrayType *arr;
443+
Datum datum;
444+
bool isnull;
445+
446+
/* only foreign keys */
447+
if (constrForm->contype != CONSTRAINT_FOREIGN)
448+
continue;
449+
450+
ObjectAddressSet(parentAddr, ConstraintRelationId,
451+
HeapTupleGetOid(tuple));
452+
453+
datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
454+
tupdesc, &isnull);
455+
if (isnull)
456+
elog(ERROR, "null conkey");
457+
arr = DatumGetArrayTypeP(datum);
458+
nelem = ARR_DIMS(arr)[0];
459+
if (ARR_NDIM(arr) != 1 ||
460+
nelem < 1 ||
461+
nelem > INDEX_MAX_KEYS ||
462+
ARR_HASNULL(arr) ||
463+
ARR_ELEMTYPE(arr) != INT2OID)
464+
elog(ERROR, "conkey is not a 1-D smallint array");
465+
memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
466+
467+
for (i = 0; i < nelem; i++)
468+
mapped_conkey[i] = attmap[conkey[i] - 1];
469+
470+
datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
471+
tupdesc, &isnull);
472+
if (isnull)
473+
elog(ERROR, "null confkey");
474+
arr = DatumGetArrayTypeP(datum);
475+
nelem = ARR_DIMS(arr)[0];
476+
if (ARR_NDIM(arr) != 1 ||
477+
nelem < 1 ||
478+
nelem > INDEX_MAX_KEYS ||
479+
ARR_HASNULL(arr) ||
480+
ARR_ELEMTYPE(arr) != INT2OID)
481+
elog(ERROR, "confkey is not a 1-D smallint array");
482+
memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
483+
484+
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
485+
tupdesc, &isnull);
486+
if (isnull)
487+
elog(ERROR, "null conpfeqop");
488+
arr = DatumGetArrayTypeP(datum);
489+
nelem = ARR_DIMS(arr)[0];
490+
if (ARR_NDIM(arr) != 1 ||
491+
nelem < 1 ||
492+
nelem > INDEX_MAX_KEYS ||
493+
ARR_HASNULL(arr) ||
494+
ARR_ELEMTYPE(arr) != OIDOID)
495+
elog(ERROR, "conpfeqop is not a 1-D OID array");
496+
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
497+
498+
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
499+
tupdesc, &isnull);
500+
if (isnull)
501+
elog(ERROR, "null conpfeqop");
502+
arr = DatumGetArrayTypeP(datum);
503+
nelem = ARR_DIMS(arr)[0];
504+
if (ARR_NDIM(arr) != 1 ||
505+
nelem < 1 ||
506+
nelem > INDEX_MAX_KEYS ||
507+
ARR_HASNULL(arr) ||
508+
ARR_ELEMTYPE(arr) != OIDOID)
509+
elog(ERROR, "conpfeqop is not a 1-D OID array");
510+
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
511+
512+
datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
513+
tupdesc, &isnull);
514+
if (isnull)
515+
elog(ERROR, "null conppeqop");
516+
arr = DatumGetArrayTypeP(datum);
517+
nelem = ARR_DIMS(arr)[0];
518+
if (ARR_NDIM(arr) != 1 ||
519+
nelem < 1 ||
520+
nelem > INDEX_MAX_KEYS ||
521+
ARR_HASNULL(arr) ||
522+
ARR_ELEMTYPE(arr) != OIDOID)
523+
elog(ERROR, "conppeqop is not a 1-D OID array");
524+
memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
525+
526+
datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
527+
tupdesc, &isnull);
528+
if (isnull)
529+
elog(ERROR, "null conffeqop");
530+
arr = DatumGetArrayTypeP(datum);
531+
nelem = ARR_DIMS(arr)[0];
532+
if (ARR_NDIM(arr) != 1 ||
533+
nelem < 1 ||
534+
nelem > INDEX_MAX_KEYS ||
535+
ARR_HASNULL(arr) ||
536+
ARR_ELEMTYPE(arr) != OIDOID)
537+
elog(ERROR, "conffeqop is not a 1-D OID array");
538+
memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
539+
540+
constrOid =
541+
CreateConstraintEntry(NameStr(constrForm->conname),
542+
constrForm->connamespace,
543+
CONSTRAINT_FOREIGN,
544+
constrForm->condeferrable,
545+
constrForm->condeferred,
546+
constrForm->convalidated,
547+
HeapTupleGetOid(tuple),
548+
relationId,
549+
mapped_conkey,
550+
nelem,
551+
InvalidOid, /* not a domain constraint */
552+
constrForm->conindid, /* same index */
553+
constrForm->confrelid, /* same foreign rel */
554+
confkey,
555+
conpfeqop,
556+
conppeqop,
557+
conffeqop,
558+
nelem,
559+
constrForm->confupdtype,
560+
constrForm->confdeltype,
561+
constrForm->confmatchtype,
562+
NULL,
563+
NULL,
564+
NULL,
565+
NULL,
566+
false,
567+
1, false, true);
568+
569+
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
570+
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
571+
572+
fkconstraint = makeNode(Constraint);
573+
/* for now this is all we need */
574+
fkconstraint->fk_upd_action = constrForm->confupdtype;
575+
fkconstraint->fk_del_action = constrForm->confdeltype;
576+
fkconstraint->deferrable = constrForm->condeferrable;
577+
fkconstraint->initdeferred = constrForm->condeferred;
578+
579+
createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
580+
constrOid, constrForm->conindid, false);
581+
582+
if (cloned)
583+
{
584+
/*
585+
* Feed back caller about the constraints we created, so that they can
586+
* set up constraint verification.
587+
*/
588+
newc = palloc(sizeof(ClonedConstraint));
589+
newc->relid = relationId;
590+
newc->refrelid = constrForm->confrelid;
591+
newc->conindid = constrForm->conindid;
592+
newc->conid = constrOid;
593+
newc->constraint = fkconstraint;
594+
595+
*cloned = lappend(*cloned, newc);
596+
}
597+
}
598+
systable_endscan(scan);
599+
600+
pfree(attmap);
601+
602+
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
603+
{
604+
PartitionDesc partdesc = RelationGetPartitionDesc(rel);
605+
int i;
606+
607+
for (i = 0; i < partdesc->nparts; i++)
608+
CloneForeignKeyConstraints(RelationGetRelid(rel),
609+
partdesc->oids[i],
610+
cloned);
611+
}
612+
613+
heap_close(rel, NoLock); /* keep lock till commit */
614+
heap_close(parentRel, NoLock);
615+
heap_close(pg_constraint, RowShareLock);
616+
}
380617

381618
/*
382619
* Test whether given name is currently used as a constraint name

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.