27
27
import org .apache .doris .catalog .RangePartitionItem ;
28
28
import org .apache .doris .common .AnalysisException ;
29
29
import org .apache .doris .common .DdlException ;
30
+ import org .apache .doris .datasource .CacheException ;
31
+ import org .apache .doris .datasource .ExternalSchemaCache ;
32
+ import org .apache .doris .datasource .ExternalSchemaCache .SchemaCacheKey ;
30
33
import org .apache .doris .datasource .ExternalTable ;
31
34
import org .apache .doris .datasource .SchemaCacheValue ;
32
35
import org .apache .doris .datasource .mvcc .MvccSnapshot ;
36
+ import org .apache .doris .datasource .mvcc .MvccTable ;
37
+ import org .apache .doris .datasource .mvcc .MvccUtil ;
33
38
import org .apache .doris .mtmv .MTMVBaseTableIf ;
34
39
import org .apache .doris .mtmv .MTMVRefreshContext ;
35
40
import org .apache .doris .mtmv .MTMVRelatedTableIf ;
77
82
import java .util .Set ;
78
83
import java .util .stream .Collectors ;
79
84
80
- public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf , MTMVBaseTableIf {
85
+ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf , MTMVBaseTableIf , MvccTable {
81
86
82
87
public static final String YEAR = "year" ;
83
88
public static final String MONTH = "month" ;
@@ -117,39 +122,23 @@ public void setPartitionColumns(List<Column> partitionColumns) {
117
122
}
118
123
119
124
@ Override
120
- public Optional <SchemaCacheValue > initSchema () {
121
- table = IcebergUtils .getIcebergTable (catalog , dbName , name );
122
- List <Column > schema = IcebergUtils .getSchema (catalog , dbName , name );
123
- Snapshot snapshot = table .currentSnapshot ();
124
- if (snapshot == null ) {
125
- LOG .debug ("Table {} is empty" , name );
126
- return Optional .of (new IcebergSchemaCacheValue (schema , null , -1 , null ));
127
- }
128
- long snapshotId = snapshot .snapshotId ();
129
- partitionColumns = null ;
130
- IcebergPartitionInfo partitionInfo = null ;
131
- if (isValidRelatedTable ()) {
132
- PartitionSpec spec = table .spec ();
133
- partitionColumns = Lists .newArrayList ();
134
-
135
- // For iceberg table, we only support table with 1 partition column as RelatedTable.
136
- // So we use spec.fields().get(0) to get the partition column.
137
- Types .NestedField col = table .schema ().findField (spec .fields ().get (0 ).sourceId ());
125
+ public Optional <SchemaCacheValue > initSchema (SchemaCacheKey key ) {
126
+ table = getIcebergTable ();
127
+ List <Column > schema = IcebergUtils .getSchema (catalog , dbName , name ,
128
+ ((IcebergSchemaCacheKey ) key ).getSchemaId ());
129
+ List <Column > tmpColumns = Lists .newArrayList ();
130
+ PartitionSpec spec = table .spec ();
131
+ for (PartitionField field : spec .fields ()) {
132
+ Types .NestedField col = table .schema ().findField (field .sourceId ());
138
133
for (Column c : schema ) {
139
134
if (c .getName ().equalsIgnoreCase (col .name ())) {
140
- partitionColumns .add (c );
135
+ tmpColumns .add (c );
141
136
break ;
142
137
}
143
138
}
144
- Preconditions .checkState (partitionColumns .size () == 1 ,
145
- "Support 1 partition column for iceberg table, but found " + partitionColumns .size ());
146
- try {
147
- partitionInfo = loadPartitionInfo ();
148
- } catch (AnalysisException e ) {
149
- LOG .warn ("Failed to load iceberg table {} partition info." , name , e );
150
- }
151
139
}
152
- return Optional .of (new IcebergSchemaCacheValue (schema , partitionColumns , snapshotId , partitionInfo ));
140
+ partitionColumns = tmpColumns ;
141
+ return Optional .of (new IcebergSchemaCacheValue (schema , partitionColumns ));
153
142
}
154
143
155
144
@ Override
@@ -187,6 +176,11 @@ public Table getIcebergTable() {
187
176
return IcebergUtils .getIcebergTable (getCatalog (), getDbName (), getName ());
188
177
}
189
178
179
+ private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue () {
180
+ return Env .getCurrentEnv ().getExtMetaCacheMgr ().getIcebergMetadataCache ()
181
+ .getSnapshotCache (catalog , dbName , name );
182
+ }
183
+
190
184
@ Override
191
185
public void beforeMTMVRefresh (MTMV mtmv ) throws DdlException {
192
186
Env .getCurrentEnv ().getRefreshManager ()
@@ -195,46 +189,36 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
195
189
196
190
@ Override
197
191
public Map <String , PartitionItem > getAndCopyPartitionItems (Optional <MvccSnapshot > snapshot ) {
198
- return Maps .newHashMap (getPartitionInfoFromCache ().getNameToPartitionItem ());
192
+ return Maps .newHashMap (getOrFetchSnapshotCacheValue ( snapshot ). getPartitionInfo ().getNameToPartitionItem ());
199
193
}
200
194
201
- private IcebergPartitionInfo getPartitionInfoFromCache () {
202
- makeSureInitialized ();
203
- Optional <SchemaCacheValue > schemaCacheValue = getSchemaCacheValue ();
204
- if (!schemaCacheValue .isPresent ()) {
205
- return new IcebergPartitionInfo ();
206
- }
207
- return ((IcebergSchemaCacheValue ) schemaCacheValue .get ()).getPartitionInfo ();
195
+ @ Override
196
+ public Map <String , PartitionItem > getNameToPartitionItems (Optional <MvccSnapshot > snapshot ) {
197
+ return getOrFetchSnapshotCacheValue (snapshot ).getPartitionInfo ().getNameToPartitionItem ();
208
198
}
209
199
210
200
@ Override
211
201
public PartitionType getPartitionType (Optional <MvccSnapshot > snapshot ) {
212
- makeSureInitialized ();
213
202
return isValidRelatedTable () ? PartitionType .RANGE : PartitionType .UNPARTITIONED ;
214
203
}
215
204
216
205
@ Override
217
206
public Set <String > getPartitionColumnNames (Optional <MvccSnapshot > snapshot ) throws DdlException {
218
- return getPartitionColumnsFromCache ( ).stream ().map (Column ::getName ).collect (Collectors .toSet ());
207
+ return getPartitionColumns ( snapshot ).stream ().map (Column ::getName ).collect (Collectors .toSet ());
219
208
}
220
209
221
210
@ Override
222
211
public List <Column > getPartitionColumns (Optional <MvccSnapshot > snapshot ) {
223
- return getPartitionColumnsFromCache ();
224
- }
225
-
226
- private List <Column > getPartitionColumnsFromCache () {
227
- makeSureInitialized ();
228
- Optional <SchemaCacheValue > schemaCacheValue = getSchemaCacheValue ();
229
- return schemaCacheValue
230
- .map (cacheValue -> ((IcebergSchemaCacheValue ) cacheValue ).getPartitionColumns ())
231
- .orElseGet (Lists ::newArrayList );
212
+ IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue (snapshot );
213
+ IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue (snapshotValue .getSnapshot ().getSchemaId ());
214
+ return schemaValue .getPartitionColumns ();
232
215
}
233
216
234
217
@ Override
235
218
public MTMVSnapshotIf getPartitionSnapshot (String partitionName , MTMVRefreshContext context ,
236
219
Optional <MvccSnapshot > snapshot ) throws AnalysisException {
237
- long latestSnapshotId = getPartitionInfoFromCache ().getLatestSnapshotId (partitionName );
220
+ IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue (snapshot );
221
+ long latestSnapshotId = snapshotValue .getPartitionInfo ().getLatestSnapshotId (partitionName );
238
222
if (latestSnapshotId <= 0 ) {
239
223
throw new AnalysisException ("can not find partition: " + partitionName );
240
224
}
@@ -244,16 +228,9 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
244
228
@ Override
245
229
public MTMVSnapshotIf getTableSnapshot (MTMVRefreshContext context , Optional <MvccSnapshot > snapshot )
246
230
throws AnalysisException {
247
- return new MTMVVersionSnapshot (getLatestSnapshotIdFromCache ());
248
- }
249
-
250
- public long getLatestSnapshotIdFromCache () throws AnalysisException {
251
231
makeSureInitialized ();
252
- Optional <SchemaCacheValue > schemaCacheValue = getSchemaCacheValue ();
253
- if (!schemaCacheValue .isPresent ()) {
254
- throw new AnalysisException ("Can't find schema cache of table " + name );
255
- }
256
- return ((IcebergSchemaCacheValue ) schemaCacheValue .get ()).getSnapshotId ();
232
+ IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue (snapshot );
233
+ return new MTMVVersionSnapshot (snapshotValue .getSnapshot ().getSnapshotId ());
257
234
}
258
235
259
236
@ Override
@@ -268,11 +245,13 @@ public boolean isPartitionColumnAllowNull() {
268
245
*/
269
246
@ Override
270
247
public boolean isValidRelatedTable () {
248
+ makeSureInitialized ();
271
249
if (isValidRelatedTableCached ) {
272
250
return isValidRelatedTable ;
273
251
}
274
252
isValidRelatedTable = false ;
275
253
Set <String > allFields = Sets .newHashSet ();
254
+ table = getIcebergTable ();
276
255
for (PartitionSpec spec : table .specs ().values ()) {
277
256
if (spec == null ) {
278
257
isValidRelatedTableCached = true ;
@@ -299,26 +278,74 @@ public boolean isValidRelatedTable() {
299
278
return isValidRelatedTable ;
300
279
}
301
280
302
- protected IcebergPartitionInfo loadPartitionInfo () throws AnalysisException {
303
- List <IcebergPartition > icebergPartitions = loadIcebergPartition ();
281
+ @ Override
282
+ public MvccSnapshot loadSnapshot () {
283
+ return new IcebergMvccSnapshot (getIcebergSnapshotCacheValue ());
284
+ }
285
+
286
+ public long getLatestSnapshotId () {
287
+ table = getIcebergTable ();
288
+ Snapshot snapshot = table .currentSnapshot ();
289
+ return snapshot == null ? IcebergUtils .UNKNOWN_SNAPSHOT_ID : table .currentSnapshot ().snapshotId ();
290
+ }
291
+
292
+ public long getSchemaId (long snapshotId ) {
293
+ table = getIcebergTable ();
294
+ return snapshotId == IcebergUtils .UNKNOWN_SNAPSHOT_ID
295
+ ? IcebergUtils .UNKNOWN_SNAPSHOT_ID
296
+ : table .snapshot (snapshotId ).schemaId ();
297
+ }
298
+
299
+ @ Override
300
+ public List <Column > getFullSchema () {
301
+ Optional <MvccSnapshot > snapshotFromContext = MvccUtil .getSnapshotFromContext (this );
302
+ IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue (snapshotFromContext );
303
+ return getIcebergSchemaCacheValue (cacheValue .getSnapshot ().getSchemaId ()).getSchema ();
304
+ }
305
+
306
+ @ Override
307
+ public boolean supportInternalPartitionPruned () {
308
+ return true ;
309
+ }
310
+
311
+ public IcebergSchemaCacheValue getIcebergSchemaCacheValue (long schemaId ) {
312
+ ExternalSchemaCache cache = Env .getCurrentEnv ().getExtMetaCacheMgr ().getSchemaCache (catalog );
313
+ Optional <SchemaCacheValue > schemaCacheValue = cache .getSchemaValue (
314
+ new IcebergSchemaCacheKey (dbName , name , schemaId ));
315
+ if (!schemaCacheValue .isPresent ()) {
316
+ throw new CacheException ("failed to getSchema for: %s.%s.%s.%s" ,
317
+ null , catalog .getName (), dbName , name , schemaId );
318
+ }
319
+ return (IcebergSchemaCacheValue ) schemaCacheValue .get ();
320
+ }
321
+
322
+ public IcebergPartitionInfo loadPartitionInfo (long snapshotId ) throws AnalysisException {
323
+ // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet.
324
+ if (!isValidRelatedTable () || snapshotId == IcebergUtils .UNKNOWN_SNAPSHOT_ID ) {
325
+ return new IcebergPartitionInfo ();
326
+ }
327
+ List <IcebergPartition > icebergPartitions = loadIcebergPartition (snapshotId );
304
328
Map <String , IcebergPartition > nameToPartition = Maps .newHashMap ();
305
329
Map <String , PartitionItem > nameToPartitionItem = Maps .newHashMap ();
330
+ table = getIcebergTable ();
331
+ partitionColumns = getIcebergSchemaCacheValue (table .snapshot (snapshotId ).schemaId ()).getPartitionColumns ();
306
332
for (IcebergPartition partition : icebergPartitions ) {
307
333
nameToPartition .put (partition .getPartitionName (), partition );
308
334
String transform = table .specs ().get (partition .getSpecId ()).fields ().get (0 ).transform ().toString ();
309
- Range <PartitionKey > partitionRange = getPartitionRange (partition .getPartitionValues ().get (0 ), transform );
335
+ Range <PartitionKey > partitionRange = getPartitionRange (
336
+ partition .getPartitionValues ().get (0 ), transform , partitionColumns );
310
337
PartitionItem item = new RangePartitionItem (partitionRange );
311
338
nameToPartitionItem .put (partition .getPartitionName (), item );
312
339
}
313
340
Map <String , Set <String >> partitionNameMap = mergeOverlapPartitions (nameToPartitionItem );
314
341
return new IcebergPartitionInfo (nameToPartitionItem , nameToPartition , partitionNameMap );
315
342
}
316
343
317
- public List <IcebergPartition > loadIcebergPartition () {
344
+ public List <IcebergPartition > loadIcebergPartition (long snapshotId ) {
318
345
PartitionsTable partitionsTable = (PartitionsTable ) MetadataTableUtils
319
346
.createMetadataTableInstance (table , MetadataTableType .PARTITIONS );
320
347
List <IcebergPartition > partitions = Lists .newArrayList ();
321
- try (CloseableIterable <FileScanTask > tasks = partitionsTable .newScan ().planFiles ()) {
348
+ try (CloseableIterable <FileScanTask > tasks = partitionsTable .newScan ().useSnapshot ( snapshotId ). planFiles ()) {
322
349
for (FileScanTask task : tasks ) {
323
350
CloseableIterable <StructLike > rows = task .asDataTask ().rows ();
324
351
for (StructLike row : rows ) {
@@ -344,6 +371,7 @@ public IcebergPartition generateIcebergPartition(StructLike row) {
344
371
// 8. equality_delete_file_count,
345
372
// 9. last_updated_at,
346
373
// 10. last_updated_snapshot_id
374
+ table = getIcebergTable ();
347
375
Preconditions .checkState (!table .spec ().fields ().isEmpty (), table .name () + " is not a partition table." );
348
376
int specId = row .get (1 , Integer .class );
349
377
PartitionSpec partitionSpec = table .specs ().get (specId );
@@ -382,13 +410,14 @@ public IcebergPartition generateIcebergPartition(StructLike row) {
382
410
}
383
411
384
412
@ VisibleForTesting
385
- public Range <PartitionKey > getPartitionRange (String value , String transform )
413
+ public Range <PartitionKey > getPartitionRange (String value , String transform , List < Column > partitionColumns )
386
414
throws AnalysisException {
387
- // For NULL value, create a lessThan partition for it.
415
+ // For NULL value, create a minimum partition for it.
388
416
if (value == null ) {
389
- PartitionKey nullKey = PartitionKey .createPartitionKey (
390
- Lists .newArrayList (new PartitionValue ("0000-01-02" )), partitionColumns );
391
- return Range .lessThan (nullKey );
417
+ PartitionKey nullLowKey = PartitionKey .createPartitionKey (
418
+ Lists .newArrayList (new PartitionValue ("0000-01-01" )), partitionColumns );
419
+ PartitionKey nullUpKey = nullLowKey .successor ();
420
+ return Range .closedOpen (nullLowKey , nullUpKey );
392
421
}
393
422
LocalDateTime epoch = Instant .EPOCH .atZone (ZoneId .of ("UTC" )).toLocalDateTime ();
394
423
LocalDateTime target ;
@@ -525,4 +554,12 @@ public boolean validRelatedTableCache() {
525
554
public void setIsValidRelatedTableCached (boolean isCached ) {
526
555
this .isValidRelatedTableCached = isCached ;
527
556
}
557
+
558
+ private IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue (Optional <MvccSnapshot > snapshot ) {
559
+ if (snapshot .isPresent ()) {
560
+ return ((IcebergMvccSnapshot ) snapshot .get ()).getSnapshotCacheValue ();
561
+ } else {
562
+ return getIcebergSnapshotCacheValue ();
563
+ }
564
+ }
528
565
}
0 commit comments