29
29
#include "access/amapi.h"
30
30
#include "access/htup_details.h"
31
31
#include "access/reloptions.h"
32
+ #include "access/tupconvert.h"
32
33
#include "catalog/dependency.h"
33
34
#include "catalog/heap.h"
34
35
#include "catalog/index.h"
@@ -80,7 +81,6 @@ typedef struct
80
81
List * ckconstraints ; /* CHECK constraints */
81
82
List * fkconstraints ; /* FOREIGN KEY constraints */
82
83
List * ixconstraints ; /* index-creating constraints */
83
- List * inh_indexes ; /* cloned indexes from INCLUDING INDEXES */
84
84
List * blist ; /* "before list" of things to do before
85
85
* creating the table */
86
86
List * alist ; /* "after list" of things to do after creating
@@ -111,7 +111,7 @@ static void transformTableLikeClause(CreateStmtContext *cxt,
111
111
TableLikeClause * table_like_clause );
112
112
static void transformOfType (CreateStmtContext * cxt ,
113
113
TypeName * ofTypename );
114
- static IndexStmt * generateClonedIndexStmt (CreateStmtContext * cxt ,
114
+ static IndexStmt * generateClonedIndexStmt (RangeVar * heapRel ,
115
115
Relation source_idx ,
116
116
const AttrNumber * attmap , int attmap_length );
117
117
static List * get_collation (Oid collation , Oid actual_datatype );
@@ -137,6 +137,9 @@ static void setSchemaName(char *context_schema, char **stmt_schema_name);
137
137
* Returns a List of utility commands to be done in sequence. One of these
138
138
* will be the transformed CreateStmt, but there may be additional actions
139
139
* to be done before and after the actual DefineRelation() call.
140
+ * In addition to normal utility commands such as AlterTableStmt and
141
+ * IndexStmt, the result list may contain TableLikeClause(s), representing
142
+ * the need to perform additional parse analysis after DefineRelation().
140
143
*
141
144
* SQL allows constraints to be scattered all over, so thumb through
142
145
* the columns and collect all constraints into one place.
@@ -225,7 +228,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
225
228
cxt .ckconstraints = NIL ;
226
229
cxt .fkconstraints = NIL ;
227
230
cxt .ixconstraints = NIL ;
228
- cxt .inh_indexes = NIL ;
229
231
cxt .blist = NIL ;
230
232
cxt .alist = NIL ;
231
233
cxt .pkey = NULL ;
@@ -718,8 +720,11 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
718
720
* transformTableLikeClause
719
721
*
720
722
* Change the LIKE <srctable> portion of a CREATE TABLE statement into
721
- * column definitions which recreate the user defined column portions of
722
- * <srctable>.
723
+ * column definitions that recreate the user defined column portions of
724
+ * <srctable>. Also, if there are any LIKE options that we can't fully
725
+ * process at this point, add the TableLikeClause to cxt->alist, which
726
+ * will cause utility.c to call expandTableLikeClause() after the new
727
+ * table has been created.
723
728
*/
724
729
static void
725
730
transformTableLikeClause (CreateStmtContext * cxt , TableLikeClause * table_like_clause )
@@ -728,7 +733,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
728
733
Relation relation ;
729
734
TupleDesc tupleDesc ;
730
735
TupleConstr * constr ;
731
- AttrNumber * attmap ;
732
736
AclResult aclresult ;
733
737
char * comment ;
734
738
ParseCallbackState pcbstate ;
@@ -742,6 +746,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
742
746
(errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
743
747
errmsg ("LIKE is not supported for creating foreign tables" )));
744
748
749
+ /* Open the relation referenced by the LIKE clause */
745
750
relation = relation_openrv (table_like_clause -> relation , AccessShareLock );
746
751
747
752
if (relation -> rd_rel -> relkind != RELKIND_RELATION &&
@@ -779,15 +784,10 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
779
784
tupleDesc = RelationGetDescr (relation );
780
785
constr = tupleDesc -> constr ;
781
786
782
- /*
783
- * Initialize column number map for map_variable_attnos(). We need this
784
- * since dropped columns in the source table aren't copied, so the new
785
- * table can have different column numbers.
786
- */
787
- attmap = (AttrNumber * ) palloc0 (sizeof (AttrNumber ) * tupleDesc -> natts );
788
-
789
787
/*
790
788
* Insert the copied attributes into the cxt for the new table definition.
789
+ * We must do this now so that they appear in the table in the relative
790
+ * position where the LIKE clause is, as required by SQL99.
791
791
*/
792
792
for (parent_attno = 1 ; parent_attno <= tupleDesc -> natts ;
793
793
parent_attno ++ )
@@ -797,7 +797,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
797
797
ColumnDef * def ;
798
798
799
799
/*
800
- * Ignore dropped columns in the parent. attmap entry is left zero.
800
+ * Ignore dropped columns in the parent.
801
801
*/
802
802
if (attribute -> attisdropped )
803
803
continue ;
@@ -829,8 +829,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
829
829
*/
830
830
cxt -> columns = lappend (cxt -> columns , def );
831
831
832
- attmap [parent_attno - 1 ] = list_length (cxt -> columns );
833
-
834
832
/*
835
833
* Copy default, if present and the default has been requested
836
834
*/
@@ -890,22 +888,88 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
890
888
/* We use oids if at least one LIKE'ed table has oids. */
891
889
cxt -> hasoids |= relation -> rd_rel -> relhasoids ;
892
890
891
+ /*
892
+ * We cannot yet deal with CHECK constraints or indexes, since we don't
893
+ * yet know what column numbers the copied columns will have in the
894
+ * finished table. If any of those options are specified, add the LIKE
895
+ * clause to cxt->alist so that expandTableLikeClause will be called after
896
+ * we do know that.
897
+ */
898
+ if (table_like_clause -> options &
899
+ (CREATE_TABLE_LIKE_CONSTRAINTS |
900
+ CREATE_TABLE_LIKE_INDEXES ))
901
+ cxt -> alist = lappend (cxt -> alist , table_like_clause );
902
+
903
+ /*
904
+ * Close the parent rel, but keep our AccessShareLock on it until xact
905
+ * commit. That will prevent someone else from deleting or ALTERing the
906
+ * parent before we can run expandTableLikeClause.
907
+ */
908
+ heap_close (relation , NoLock );
909
+ }
910
+
911
+ /*
912
+ * expandTableLikeClause
913
+ *
914
+ * Process LIKE options that require knowing the final column numbers
915
+ * assigned to the new table's columns. This executes after we have
916
+ * run DefineRelation for the new table. It returns a list of utility
917
+ * commands that should be run to generate indexes etc.
918
+ */
919
+ List *
920
+ expandTableLikeClause (RangeVar * heapRel , TableLikeClause * table_like_clause )
921
+ {
922
+ List * result = NIL ;
923
+ List * atsubcmds = NIL ;
924
+ Relation relation ;
925
+ Relation childrel ;
926
+ TupleDesc tupleDesc ;
927
+ TupleConstr * constr ;
928
+ AttrNumber * attmap ;
929
+ char * comment ;
930
+
931
+ /*
932
+ * Open the relation referenced by the LIKE clause. We should still have
933
+ * the table lock obtained by transformTableLikeClause (and this'll throw
934
+ * an assertion failure if not). Hence, no need to recheck privileges
935
+ * etc.
936
+ */
937
+ relation = relation_openrv (table_like_clause -> relation , NoLock );
938
+
939
+ tupleDesc = RelationGetDescr (relation );
940
+ constr = tupleDesc -> constr ;
941
+
942
+ /*
943
+ * Open the newly-created child relation; we have lock on that too.
944
+ */
945
+ childrel = relation_openrv (heapRel , NoLock );
946
+
947
+ /*
948
+ * Construct a map from the LIKE relation's attnos to the child rel's.
949
+ * This re-checks type match etc, although it shouldn't be possible to
950
+ * have a failure since both tables are locked.
951
+ */
952
+ attmap = convert_tuples_by_name_map (RelationGetDescr (childrel ),
953
+ tupleDesc ,
954
+ gettext_noop ("could not convert row type" ));
955
+
893
956
/*
894
957
* Copy CHECK constraints if requested, being careful to adjust attribute
895
958
* numbers so they match the child.
896
959
*/
897
960
if ((table_like_clause -> options & CREATE_TABLE_LIKE_CONSTRAINTS ) &&
898
- tupleDesc -> constr )
961
+ constr != NULL )
899
962
{
900
963
int ccnum ;
901
964
902
- for (ccnum = 0 ; ccnum < tupleDesc -> constr -> num_check ; ccnum ++ )
965
+ for (ccnum = 0 ; ccnum < constr -> num_check ; ccnum ++ )
903
966
{
904
- char * ccname = tupleDesc -> constr -> check [ccnum ].ccname ;
905
- char * ccbin = tupleDesc -> constr -> check [ccnum ].ccbin ;
906
- Constraint * n = makeNode (Constraint );
967
+ char * ccname = constr -> check [ccnum ].ccname ;
968
+ char * ccbin = constr -> check [ccnum ].ccbin ;
907
969
Node * ccbin_node ;
908
970
bool found_whole_row ;
971
+ Constraint * n ;
972
+ AlterTableCmd * atsubcmd ;
909
973
910
974
ccbin_node = map_variable_attnos (stringToNode (ccbin ),
911
975
1 , 0 ,
@@ -926,12 +990,21 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
926
990
ccname ,
927
991
RelationGetRelationName (relation ))));
928
992
993
+ n = makeNode (Constraint );
929
994
n -> contype = CONSTR_CHECK ;
930
995
n -> location = -1 ;
931
996
n -> conname = pstrdup (ccname );
932
997
n -> raw_expr = NULL ;
933
998
n -> cooked_expr = nodeToString (ccbin_node );
934
- cxt -> ckconstraints = lappend (cxt -> ckconstraints , n );
999
+
1000
+ /* We can skip validation, since the new table should be empty. */
1001
+ n -> skip_validation = true;
1002
+ n -> initially_valid = true;
1003
+
1004
+ atsubcmd = makeNode (AlterTableCmd );
1005
+ atsubcmd -> subtype = AT_AddConstraint ;
1006
+ atsubcmd -> def = (Node * ) n ;
1007
+ atsubcmds = lappend (atsubcmds , atsubcmd );
935
1008
936
1009
/* Copy comment on constraint */
937
1010
if ((table_like_clause -> options & CREATE_TABLE_LIKE_COMMENTS ) &&
@@ -943,19 +1016,35 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
943
1016
CommentStmt * stmt = makeNode (CommentStmt );
944
1017
945
1018
stmt -> objtype = OBJECT_TABCONSTRAINT ;
946
- stmt -> objname = list_make3 (makeString (cxt -> relation -> schemaname ),
947
- makeString (cxt -> relation -> relname ),
1019
+ stmt -> objname = list_make3 (makeString (heapRel -> schemaname ),
1020
+ makeString (heapRel -> relname ),
948
1021
makeString (n -> conname ));
949
1022
stmt -> objargs = NIL ;
950
1023
stmt -> comment = comment ;
951
1024
952
- cxt -> alist = lappend (cxt -> alist , stmt );
1025
+ result = lappend (result , stmt );
953
1026
}
954
1027
}
955
1028
}
956
1029
957
1030
/*
958
- * Likewise, copy indexes if requested
1031
+ * If we generated any ALTER TABLE actions above, wrap them into a single
1032
+ * ALTER TABLE command. Stick it at the front of the result, so it runs
1033
+ * before any CommentStmts we made above.
1034
+ */
1035
+ if (atsubcmds )
1036
+ {
1037
+ AlterTableStmt * atcmd = makeNode (AlterTableStmt );
1038
+
1039
+ atcmd -> relation = copyObject (heapRel );
1040
+ atcmd -> cmds = atsubcmds ;
1041
+ atcmd -> relkind = OBJECT_TABLE ;
1042
+ atcmd -> missing_ok = false;
1043
+ result = lcons (atcmd , result );
1044
+ }
1045
+
1046
+ /*
1047
+ * Process indexes if required.
959
1048
*/
960
1049
if ((table_like_clause -> options & CREATE_TABLE_LIKE_INDEXES ) &&
961
1050
relation -> rd_rel -> relhasindex )
@@ -974,7 +1063,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
974
1063
parent_index = index_open (parent_index_oid , AccessShareLock );
975
1064
976
1065
/* Build CREATE INDEX statement to recreate the parent_index */
977
- index_stmt = generateClonedIndexStmt (cxt , parent_index ,
1066
+ index_stmt = generateClonedIndexStmt (heapRel , parent_index ,
978
1067
attmap , tupleDesc -> natts );
979
1068
980
1069
/* Copy comment on index, if requested */
@@ -989,19 +1078,23 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
989
1078
index_stmt -> idxcomment = comment ;
990
1079
}
991
1080
992
- /* Save it in the inh_indexes list for the time being */
993
- cxt -> inh_indexes = lappend (cxt -> inh_indexes , index_stmt );
1081
+ result = lappend (result , index_stmt );
994
1082
995
1083
index_close (parent_index , AccessShareLock );
996
1084
}
997
1085
}
998
1086
1087
+ /* Done with child rel */
1088
+ heap_close (childrel , NoLock );
1089
+
999
1090
/*
1000
1091
* Close the parent rel, but keep our AccessShareLock on it until xact
1001
1092
* commit. That will prevent someone else from deleting or ALTERing the
1002
1093
* parent before the child is committed.
1003
1094
*/
1004
1095
heap_close (relation , NoLock );
1096
+
1097
+ return result ;
1005
1098
}
1006
1099
1007
1100
static void
@@ -1054,7 +1147,7 @@ transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
1054
1147
* "source_idx". Attribute numbers should be adjusted according to attmap.
1055
1148
*/
1056
1149
static IndexStmt *
1057
- generateClonedIndexStmt (CreateStmtContext * cxt , Relation source_idx ,
1150
+ generateClonedIndexStmt (RangeVar * heapRel , Relation source_idx ,
1058
1151
const AttrNumber * attmap , int attmap_length )
1059
1152
{
1060
1153
Oid source_relid = RelationGetRelid (source_idx );
@@ -1111,7 +1204,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
1111
1204
1112
1205
/* Begin building the IndexStmt */
1113
1206
index = makeNode (IndexStmt );
1114
- index -> relation = cxt -> relation ;
1207
+ index -> relation = heapRel ;
1115
1208
index -> accessMethod = pstrdup (NameStr (amrec -> amname ));
1116
1209
if (OidIsValid (idxrelrec -> reltablespace ))
1117
1210
index -> tableSpace = get_tablespace_name (idxrelrec -> reltablespace );
@@ -1461,24 +1554,6 @@ transformIndexConstraints(CreateStmtContext *cxt)
1461
1554
indexlist = lappend (indexlist , index );
1462
1555
}
1463
1556
1464
- /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
1465
- foreach (lc , cxt -> inh_indexes )
1466
- {
1467
- index = (IndexStmt * ) lfirst (lc );
1468
-
1469
- if (index -> primary )
1470
- {
1471
- if (cxt -> pkey != NULL )
1472
- ereport (ERROR ,
1473
- (errcode (ERRCODE_INVALID_TABLE_DEFINITION ),
1474
- errmsg ("multiple primary keys for table \"%s\" are not allowed" ,
1475
- cxt -> relation -> relname )));
1476
- cxt -> pkey = index ;
1477
- }
1478
-
1479
- indexlist = lappend (indexlist , index );
1480
- }
1481
-
1482
1557
/*
1483
1558
* Scan the index list and remove any redundant index specifications. This
1484
1559
* can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A
@@ -2499,7 +2574,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
2499
2574
cxt .ckconstraints = NIL ;
2500
2575
cxt .fkconstraints = NIL ;
2501
2576
cxt .ixconstraints = NIL ;
2502
- cxt .inh_indexes = NIL ;
2503
2577
cxt .blist = NIL ;
2504
2578
cxt .alist = NIL ;
2505
2579
cxt .pkey = NULL ;
0 commit comments