26
26
#include "catalog/pg_operator.h"
27
27
#include "catalog/pg_type.h"
28
28
#include "commands/defrem.h"
29
+ #include "commands/tablecmds.h"
29
30
#include "utils/array.h"
30
31
#include "utils/builtins.h"
31
32
#include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
377
378
return conOid ;
378
379
}
379
380
381
+ /*
382
+ * CloneForeignKeyConstraints
383
+ * Clone foreign keys from a partitioned table to a newly acquired
384
+ * partition.
385
+ *
386
+ * relationId is a partition of parentId, so we can be certain that it has the
387
+ * same columns with the same datatypes. The columns may be in different
388
+ * order, though.
389
+ *
390
+ * The *cloned list is appended ClonedConstraint elements describing what was
391
+ * created.
392
+ */
393
+ void
394
+ CloneForeignKeyConstraints (Oid parentId , Oid relationId , List * * cloned )
395
+ {
396
+ Relation pg_constraint ;
397
+ Relation parentRel ;
398
+ Relation rel ;
399
+ ScanKeyData key ;
400
+ SysScanDesc scan ;
401
+ TupleDesc tupdesc ;
402
+ HeapTuple tuple ;
403
+ AttrNumber * attmap ;
404
+
405
+ parentRel = heap_open (parentId , NoLock ); /* already got lock */
406
+ /* see ATAddForeignKeyConstraint about lock level */
407
+ rel = heap_open (relationId , AccessExclusiveLock );
408
+
409
+ pg_constraint = heap_open (ConstraintRelationId , RowShareLock );
410
+ tupdesc = RelationGetDescr (pg_constraint );
411
+
412
+ /*
413
+ * The constraint key may differ, if the columns in the partition are
414
+ * different. This map is used to convert them.
415
+ */
416
+ attmap = convert_tuples_by_name_map (RelationGetDescr (rel ),
417
+ RelationGetDescr (parentRel ),
418
+ gettext_noop ("could not convert row type" ));
419
+
420
+ ScanKeyInit (& key ,
421
+ Anum_pg_constraint_conrelid , BTEqualStrategyNumber ,
422
+ F_OIDEQ , ObjectIdGetDatum (parentId ));
423
+ scan = systable_beginscan (pg_constraint , ConstraintRelidIndexId , true,
424
+ NULL , 1 , & key );
425
+
426
+ while ((tuple = systable_getnext (scan )) != NULL )
427
+ {
428
+ Form_pg_constraint constrForm = (Form_pg_constraint ) GETSTRUCT (tuple );
429
+ AttrNumber conkey [INDEX_MAX_KEYS ];
430
+ AttrNumber mapped_conkey [INDEX_MAX_KEYS ];
431
+ AttrNumber confkey [INDEX_MAX_KEYS ];
432
+ Oid conpfeqop [INDEX_MAX_KEYS ];
433
+ Oid conppeqop [INDEX_MAX_KEYS ];
434
+ Oid conffeqop [INDEX_MAX_KEYS ];
435
+ Constraint * fkconstraint ;
436
+ ClonedConstraint * newc ;
437
+ Oid constrOid ;
438
+ ObjectAddress parentAddr ,
439
+ childAddr ;
440
+ int nelem ;
441
+ int i ;
442
+ ArrayType * arr ;
443
+ Datum datum ;
444
+ bool isnull ;
445
+
446
+ /* only foreign keys */
447
+ if (constrForm -> contype != CONSTRAINT_FOREIGN )
448
+ continue ;
449
+
450
+ ObjectAddressSet (parentAddr , ConstraintRelationId ,
451
+ HeapTupleGetOid (tuple ));
452
+
453
+ datum = fastgetattr (tuple , Anum_pg_constraint_conkey ,
454
+ tupdesc , & isnull );
455
+ if (isnull )
456
+ elog (ERROR , "null conkey" );
457
+ arr = DatumGetArrayTypeP (datum );
458
+ nelem = ARR_DIMS (arr )[0 ];
459
+ if (ARR_NDIM (arr ) != 1 ||
460
+ nelem < 1 ||
461
+ nelem > INDEX_MAX_KEYS ||
462
+ ARR_HASNULL (arr ) ||
463
+ ARR_ELEMTYPE (arr ) != INT2OID )
464
+ elog (ERROR , "conkey is not a 1-D smallint array" );
465
+ memcpy (conkey , ARR_DATA_PTR (arr ), nelem * sizeof (AttrNumber ));
466
+
467
+ for (i = 0 ; i < nelem ; i ++ )
468
+ mapped_conkey [i ] = attmap [conkey [i ] - 1 ];
469
+
470
+ datum = fastgetattr (tuple , Anum_pg_constraint_confkey ,
471
+ tupdesc , & isnull );
472
+ if (isnull )
473
+ elog (ERROR , "null confkey" );
474
+ arr = DatumGetArrayTypeP (datum );
475
+ nelem = ARR_DIMS (arr )[0 ];
476
+ if (ARR_NDIM (arr ) != 1 ||
477
+ nelem < 1 ||
478
+ nelem > INDEX_MAX_KEYS ||
479
+ ARR_HASNULL (arr ) ||
480
+ ARR_ELEMTYPE (arr ) != INT2OID )
481
+ elog (ERROR , "confkey is not a 1-D smallint array" );
482
+ memcpy (confkey , ARR_DATA_PTR (arr ), nelem * sizeof (AttrNumber ));
483
+
484
+ datum = fastgetattr (tuple , Anum_pg_constraint_conpfeqop ,
485
+ tupdesc , & isnull );
486
+ if (isnull )
487
+ elog (ERROR , "null conpfeqop" );
488
+ arr = DatumGetArrayTypeP (datum );
489
+ nelem = ARR_DIMS (arr )[0 ];
490
+ if (ARR_NDIM (arr ) != 1 ||
491
+ nelem < 1 ||
492
+ nelem > INDEX_MAX_KEYS ||
493
+ ARR_HASNULL (arr ) ||
494
+ ARR_ELEMTYPE (arr ) != OIDOID )
495
+ elog (ERROR , "conpfeqop is not a 1-D OID array" );
496
+ memcpy (conpfeqop , ARR_DATA_PTR (arr ), nelem * sizeof (Oid ));
497
+
498
+ datum = fastgetattr (tuple , Anum_pg_constraint_conpfeqop ,
499
+ tupdesc , & isnull );
500
+ if (isnull )
501
+ elog (ERROR , "null conpfeqop" );
502
+ arr = DatumGetArrayTypeP (datum );
503
+ nelem = ARR_DIMS (arr )[0 ];
504
+ if (ARR_NDIM (arr ) != 1 ||
505
+ nelem < 1 ||
506
+ nelem > INDEX_MAX_KEYS ||
507
+ ARR_HASNULL (arr ) ||
508
+ ARR_ELEMTYPE (arr ) != OIDOID )
509
+ elog (ERROR , "conpfeqop is not a 1-D OID array" );
510
+ memcpy (conpfeqop , ARR_DATA_PTR (arr ), nelem * sizeof (Oid ));
511
+
512
+ datum = fastgetattr (tuple , Anum_pg_constraint_conppeqop ,
513
+ tupdesc , & isnull );
514
+ if (isnull )
515
+ elog (ERROR , "null conppeqop" );
516
+ arr = DatumGetArrayTypeP (datum );
517
+ nelem = ARR_DIMS (arr )[0 ];
518
+ if (ARR_NDIM (arr ) != 1 ||
519
+ nelem < 1 ||
520
+ nelem > INDEX_MAX_KEYS ||
521
+ ARR_HASNULL (arr ) ||
522
+ ARR_ELEMTYPE (arr ) != OIDOID )
523
+ elog (ERROR , "conppeqop is not a 1-D OID array" );
524
+ memcpy (conppeqop , ARR_DATA_PTR (arr ), nelem * sizeof (Oid ));
525
+
526
+ datum = fastgetattr (tuple , Anum_pg_constraint_conffeqop ,
527
+ tupdesc , & isnull );
528
+ if (isnull )
529
+ elog (ERROR , "null conffeqop" );
530
+ arr = DatumGetArrayTypeP (datum );
531
+ nelem = ARR_DIMS (arr )[0 ];
532
+ if (ARR_NDIM (arr ) != 1 ||
533
+ nelem < 1 ||
534
+ nelem > INDEX_MAX_KEYS ||
535
+ ARR_HASNULL (arr ) ||
536
+ ARR_ELEMTYPE (arr ) != OIDOID )
537
+ elog (ERROR , "conffeqop is not a 1-D OID array" );
538
+ memcpy (conffeqop , ARR_DATA_PTR (arr ), nelem * sizeof (Oid ));
539
+
540
+ constrOid =
541
+ CreateConstraintEntry (NameStr (constrForm -> conname ),
542
+ constrForm -> connamespace ,
543
+ CONSTRAINT_FOREIGN ,
544
+ constrForm -> condeferrable ,
545
+ constrForm -> condeferred ,
546
+ constrForm -> convalidated ,
547
+ HeapTupleGetOid (tuple ),
548
+ relationId ,
549
+ mapped_conkey ,
550
+ nelem ,
551
+ InvalidOid , /* not a domain constraint */
552
+ constrForm -> conindid , /* same index */
553
+ constrForm -> confrelid , /* same foreign rel */
554
+ confkey ,
555
+ conpfeqop ,
556
+ conppeqop ,
557
+ conffeqop ,
558
+ nelem ,
559
+ constrForm -> confupdtype ,
560
+ constrForm -> confdeltype ,
561
+ constrForm -> confmatchtype ,
562
+ NULL ,
563
+ NULL ,
564
+ NULL ,
565
+ NULL ,
566
+ false,
567
+ 1 , false, true);
568
+
569
+ ObjectAddressSet (childAddr , ConstraintRelationId , constrOid );
570
+ recordDependencyOn (& childAddr , & parentAddr , DEPENDENCY_INTERNAL_AUTO );
571
+
572
+ fkconstraint = makeNode (Constraint );
573
+ /* for now this is all we need */
574
+ fkconstraint -> fk_upd_action = constrForm -> confupdtype ;
575
+ fkconstraint -> fk_del_action = constrForm -> confdeltype ;
576
+ fkconstraint -> deferrable = constrForm -> condeferrable ;
577
+ fkconstraint -> initdeferred = constrForm -> condeferred ;
578
+
579
+ createForeignKeyTriggers (rel , constrForm -> confrelid , fkconstraint ,
580
+ constrOid , constrForm -> conindid , false);
581
+
582
+ if (cloned )
583
+ {
584
+ /*
585
+ * Feed back caller about the constraints we created, so that they can
586
+ * set up constraint verification.
587
+ */
588
+ newc = palloc (sizeof (ClonedConstraint ));
589
+ newc -> relid = relationId ;
590
+ newc -> refrelid = constrForm -> confrelid ;
591
+ newc -> conindid = constrForm -> conindid ;
592
+ newc -> conid = constrOid ;
593
+ newc -> constraint = fkconstraint ;
594
+
595
+ * cloned = lappend (* cloned , newc );
596
+ }
597
+ }
598
+ systable_endscan (scan );
599
+
600
+ pfree (attmap );
601
+
602
+ if (rel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
603
+ {
604
+ PartitionDesc partdesc = RelationGetPartitionDesc (rel );
605
+ int i ;
606
+
607
+ for (i = 0 ; i < partdesc -> nparts ; i ++ )
608
+ CloneForeignKeyConstraints (RelationGetRelid (rel ),
609
+ partdesc -> oids [i ],
610
+ cloned );
611
+ }
612
+
613
+ heap_close (rel , NoLock ); /* keep lock till commit */
614
+ heap_close (parentRel , NoLock );
615
+ heap_close (pg_constraint , RowShareLock );
616
+ }
380
617
381
618
/*
382
619
* Test whether given name is currently used as a constraint name
0 commit comments