@@ -290,8 +290,9 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
290
290
LOCKMODE lockmode );
291
291
static ObjectAddress ATExecAlterConstraint (Relation rel , AlterTableCmd * cmd ,
292
292
bool recurse , bool recursing , LOCKMODE lockmode );
293
- static ObjectAddress ATExecValidateConstraint (Relation rel , char * constrName ,
294
- bool recurse , bool recursing , LOCKMODE lockmode );
293
+ static ObjectAddress ATExecValidateConstraint (List * * wqueue , Relation rel ,
294
+ char * constrName , bool recurse , bool recursing ,
295
+ LOCKMODE lockmode );
295
296
static int transformColumnNameList (Oid relId , List * colList ,
296
297
int16 * attnums , Oid * atttypids );
297
298
static int transformFkeyGetPrimaryKey (Relation pkrel , Oid * indexOid ,
@@ -304,7 +305,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
304
305
static void checkFkeyPermissions (Relation rel , int16 * attnums , int natts );
305
306
static CoercionPathType findFkeyCast (Oid targetTypeId , Oid sourceTypeId ,
306
307
Oid * funcid );
307
- static void validateCheckConstraint (Relation rel , HeapTuple constrtup );
308
308
static void validateForeignKeyConstraint (char * conname ,
309
309
Relation rel , Relation pkrel ,
310
310
Oid pkindOid , Oid constraintOid );
@@ -3588,13 +3588,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
3588
3588
address = ATExecAlterConstraint (rel , cmd , false, false, lockmode );
3589
3589
break ;
3590
3590
case AT_ValidateConstraint : /* VALIDATE CONSTRAINT */
3591
- address = ATExecValidateConstraint (rel , cmd -> name , false , false,
3592
- lockmode );
3591
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , false,
3592
+ false, lockmode );
3593
3593
break ;
3594
3594
case AT_ValidateConstraintRecurse : /* VALIDATE CONSTRAINT with
3595
3595
* recursion */
3596
- address = ATExecValidateConstraint (rel , cmd -> name , true, false ,
3597
- lockmode );
3596
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , true,
3597
+ false, lockmode );
3598
3598
break ;
3599
3599
case AT_DropConstraint : /* DROP CONSTRAINT */
3600
3600
ATExecDropConstraint (rel , cmd -> name , cmd -> behavior ,
@@ -6881,8 +6881,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
6881
6881
* was already validated, InvalidObjectAddress is returned.
6882
6882
*/
6883
6883
static ObjectAddress
6884
- ATExecValidateConstraint (Relation rel , char * constrName , bool recurse ,
6885
- bool recursing , LOCKMODE lockmode )
6884
+ ATExecValidateConstraint (List * * wqueue , Relation rel , char * constrName ,
6885
+ bool recurse , bool recursing , LOCKMODE lockmode )
6886
6886
{
6887
6887
Relation conrel ;
6888
6888
SysScanDesc scan ;
@@ -6929,27 +6929,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6929
6929
6930
6930
if (!con -> convalidated )
6931
6931
{
6932
+ AlteredTableInfo * tab ;
6932
6933
HeapTuple copyTuple ;
6933
6934
Form_pg_constraint copy_con ;
6934
6935
6935
6936
if (con -> contype == CONSTRAINT_FOREIGN )
6936
6937
{
6937
- Relation refrel ;
6938
+ NewConstraint * newcon ;
6939
+ Constraint * fkconstraint ;
6938
6940
6939
- /*
6940
- * Triggers are already in place on both tables, so a concurrent
6941
- * write that alters the result here is not possible. Normally we
6942
- * can run a query here to do the validation, which would only
6943
- * require AccessShareLock. In some cases, it is possible that we
6944
- * might need to fire triggers to perform the check, so we take a
6945
- * lock at RowShareLock level just in case.
6946
- */
6947
- refrel = heap_open (con -> confrelid , RowShareLock );
6941
+ /* Queue validation for phase 3 */
6942
+ fkconstraint = makeNode (Constraint );
6943
+ /* for now this is all we need */
6944
+ fkconstraint -> conname = constrName ;
6948
6945
6949
- validateForeignKeyConstraint (constrName , rel , refrel ,
6950
- con -> conindid ,
6951
- HeapTupleGetOid (tuple ));
6952
- heap_close (refrel , NoLock );
6946
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
6947
+ newcon -> name = constrName ;
6948
+ newcon -> contype = CONSTR_FOREIGN ;
6949
+ newcon -> refrelid = con -> confrelid ;
6950
+ newcon -> refindid = con -> conindid ;
6951
+ newcon -> conid = HeapTupleGetOid (tuple );
6952
+ newcon -> qual = (Node * ) fkconstraint ;
6953
+
6954
+ /* Find or create work queue entry for this table */
6955
+ tab = ATGetQueueEntry (wqueue , rel );
6956
+ tab -> constraints = lappend (tab -> constraints , newcon );
6953
6957
6954
6958
/*
6955
6959
* Foreign keys do not inherit, so we purposely ignore the
@@ -6960,6 +6964,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6960
6964
{
6961
6965
List * children = NIL ;
6962
6966
ListCell * child ;
6967
+ NewConstraint * newcon ;
6968
+ bool isnull ;
6969
+ Datum val ;
6970
+ char * conbin ;
6963
6971
6964
6972
/*
6965
6973
* If we're recursing, the parent has already done this, so skip
@@ -6998,12 +7006,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
6998
7006
/* find_all_inheritors already got lock */
6999
7007
childrel = heap_open (childoid , NoLock );
7000
7008
7001
- ATExecValidateConstraint (childrel , constrName , false,
7009
+ ATExecValidateConstraint (wqueue , childrel , constrName , false,
7002
7010
true, lockmode );
7003
7011
heap_close (childrel , NoLock );
7004
7012
}
7005
7013
7006
- validateCheckConstraint (rel , tuple );
7014
+ /* Queue validation for phase 3 */
7015
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
7016
+ newcon -> name = constrName ;
7017
+ newcon -> contype = CONSTR_CHECK ;
7018
+ newcon -> refrelid = InvalidOid ;
7019
+ newcon -> refindid = InvalidOid ;
7020
+ newcon -> conid = HeapTupleGetOid (tuple );
7021
+
7022
+ val = SysCacheGetAttr (CONSTROID , tuple ,
7023
+ Anum_pg_constraint_conbin , & isnull );
7024
+ if (isnull )
7025
+ elog (ERROR , "null conbin for constraint %u" ,
7026
+ HeapTupleGetOid (tuple ));
7027
+
7028
+ conbin = TextDatumGetCString (val );
7029
+ newcon -> qual = (Node * ) make_ands_implicit ((Expr * ) stringToNode (conbin ));
7030
+
7031
+ /* Find or create work queue entry for this table */
7032
+ tab = ATGetQueueEntry (wqueue , rel );
7033
+ tab -> constraints = lappend (tab -> constraints , newcon );
7007
7034
7008
7035
/*
7009
7036
* Invalidate relcache so that others see the new validated
@@ -7375,88 +7402,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
7375
7402
}
7376
7403
}
7377
7404
7378
- /*
7379
- * Scan the existing rows in a table to verify they meet a proposed
7380
- * CHECK constraint.
7381
- *
7382
- * The caller must have opened and locked the relation appropriately.
7383
- */
7384
- static void
7385
- validateCheckConstraint (Relation rel , HeapTuple constrtup )
7386
- {
7387
- EState * estate ;
7388
- Datum val ;
7389
- char * conbin ;
7390
- Expr * origexpr ;
7391
- List * exprstate ;
7392
- TupleDesc tupdesc ;
7393
- HeapScanDesc scan ;
7394
- HeapTuple tuple ;
7395
- ExprContext * econtext ;
7396
- MemoryContext oldcxt ;
7397
- TupleTableSlot * slot ;
7398
- Form_pg_constraint constrForm ;
7399
- bool isnull ;
7400
- Snapshot snapshot ;
7401
-
7402
- /* VALIDATE CONSTRAINT is a no-op for foreign tables */
7403
- if (rel -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE )
7404
- return ;
7405
-
7406
- constrForm = (Form_pg_constraint ) GETSTRUCT (constrtup );
7407
-
7408
- estate = CreateExecutorState ();
7409
-
7410
- /*
7411
- * XXX this tuple doesn't really come from a syscache, but this doesn't
7412
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
7413
- * the tupdesc
7414
- */
7415
- val = SysCacheGetAttr (CONSTROID , constrtup , Anum_pg_constraint_conbin ,
7416
- & isnull );
7417
- if (isnull )
7418
- elog (ERROR , "null conbin for constraint %u" ,
7419
- HeapTupleGetOid (constrtup ));
7420
- conbin = TextDatumGetCString (val );
7421
- origexpr = (Expr * ) stringToNode (conbin );
7422
- exprstate = (List * )
7423
- ExecPrepareExpr ((Expr * ) make_ands_implicit (origexpr ), estate );
7424
-
7425
- econtext = GetPerTupleExprContext (estate );
7426
- tupdesc = RelationGetDescr (rel );
7427
- slot = MakeSingleTupleTableSlot (tupdesc );
7428
- econtext -> ecxt_scantuple = slot ;
7429
-
7430
- snapshot = RegisterSnapshot (GetLatestSnapshot ());
7431
- scan = heap_beginscan (rel , snapshot , 0 , NULL );
7432
-
7433
- /*
7434
- * Switch to per-tuple memory context and reset it for each tuple
7435
- * produced, so we don't leak memory.
7436
- */
7437
- oldcxt = MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
7438
-
7439
- while ((tuple = heap_getnext (scan , ForwardScanDirection )) != NULL )
7440
- {
7441
- ExecStoreTuple (tuple , slot , InvalidBuffer , false);
7442
-
7443
- if (!ExecQual (exprstate , econtext , true))
7444
- ereport (ERROR ,
7445
- (errcode (ERRCODE_CHECK_VIOLATION ),
7446
- errmsg ("check constraint \"%s\" is violated by some row" ,
7447
- NameStr (constrForm -> conname )),
7448
- errtableconstraint (rel , NameStr (constrForm -> conname ))));
7449
-
7450
- ResetExprContext (econtext );
7451
- }
7452
-
7453
- MemoryContextSwitchTo (oldcxt );
7454
- heap_endscan (scan );
7455
- UnregisterSnapshot (snapshot );
7456
- ExecDropSingleTupleTableSlot (slot );
7457
- FreeExecutorState (estate );
7458
- }
7459
-
7460
7405
/*
7461
7406
* Scan the existing rows in a table to verify they meet a proposed FK
7462
7407
* constraint.
0 commit comments