Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6d6ff2d

Browse filesBrowse files
authored
[feature](mtmv)Support iceberg mtmv query. (#45659)
### What problem does this PR solve? 1. Implement MvccTable interface for IcebertExternalTable 2. IcebergExternalTable overrides the methods in ExternalTable and supports partition pruning 3. Add snapshot cache in IcebergMetadataCache to store IcebergExternalTable partition infos. Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None
1 parent 9b0d962 commit 6d6ff2d
Copy full SHA for 6d6ff2d

File tree

Expand file treeCollapse file tree

13 files changed

+423
-107
lines changed
Filter options
Expand file treeCollapse file tree

13 files changed

+423
-107
lines changed

‎fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+1-6Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,9 @@ public boolean isDefaultPartition() {
6565

6666
@Override
6767
public PartitionKeyDesc toPartitionKeyDesc() {
68-
if (partitionKeyRange.hasLowerBound()) {
69-
return PartitionKeyDesc.createFixed(
68+
return PartitionKeyDesc.createFixed(
7069
PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
7170
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
72-
} else {
73-
// For null partition value.
74-
return PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
75-
}
7671
}
7772

7873
@Override

‎fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ public Optional<SchemaCacheValue> initSchema() {
543543
}
544544

545545
private List<Column> getIcebergSchema() {
546-
return IcebergUtils.getSchema(catalog, dbName, name);
546+
return IcebergUtils.getSchema(catalog, dbName, name, IcebergUtils.UNKNOWN_SNAPSHOT_ID);
547547
}
548548

549549
private List<Column> getHudiSchema() {

‎fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java

Copy file name to clipboardExpand all lines: fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+104-67Lines changed: 104 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@
2727
import org.apache.doris.catalog.RangePartitionItem;
2828
import org.apache.doris.common.AnalysisException;
2929
import org.apache.doris.common.DdlException;
30+
import org.apache.doris.datasource.CacheException;
31+
import org.apache.doris.datasource.ExternalSchemaCache;
32+
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
3033
import org.apache.doris.datasource.ExternalTable;
3134
import org.apache.doris.datasource.SchemaCacheValue;
3235
import org.apache.doris.datasource.mvcc.MvccSnapshot;
36+
import org.apache.doris.datasource.mvcc.MvccTable;
37+
import org.apache.doris.datasource.mvcc.MvccUtil;
3338
import org.apache.doris.mtmv.MTMVBaseTableIf;
3439
import org.apache.doris.mtmv.MTMVRefreshContext;
3540
import org.apache.doris.mtmv.MTMVRelatedTableIf;
@@ -77,7 +82,7 @@
7782
import java.util.Set;
7883
import java.util.stream.Collectors;
7984

80-
public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf {
85+
public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
8186

8287
public static final String YEAR = "year";
8388
public static final String MONTH = "month";
@@ -117,39 +122,23 @@ public void setPartitionColumns(List<Column> partitionColumns) {
117122
}
118123

119124
@Override
120-
public Optional<SchemaCacheValue> initSchema() {
121-
table = IcebergUtils.getIcebergTable(catalog, dbName, name);
122-
List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name);
123-
Snapshot snapshot = table.currentSnapshot();
124-
if (snapshot == null) {
125-
LOG.debug("Table {} is empty", name);
126-
return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null));
127-
}
128-
long snapshotId = snapshot.snapshotId();
129-
partitionColumns = null;
130-
IcebergPartitionInfo partitionInfo = null;
131-
if (isValidRelatedTable()) {
132-
PartitionSpec spec = table.spec();
133-
partitionColumns = Lists.newArrayList();
134-
135-
// For iceberg table, we only support table with 1 partition column as RelatedTable.
136-
// So we use spec.fields().get(0) to get the partition column.
137-
Types.NestedField col = table.schema().findField(spec.fields().get(0).sourceId());
125+
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
126+
table = getIcebergTable();
127+
List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
128+
((IcebergSchemaCacheKey) key).getSchemaId());
129+
List<Column> tmpColumns = Lists.newArrayList();
130+
PartitionSpec spec = table.spec();
131+
for (PartitionField field : spec.fields()) {
132+
Types.NestedField col = table.schema().findField(field.sourceId());
138133
for (Column c : schema) {
139134
if (c.getName().equalsIgnoreCase(col.name())) {
140-
partitionColumns.add(c);
135+
tmpColumns.add(c);
141136
break;
142137
}
143138
}
144-
Preconditions.checkState(partitionColumns.size() == 1,
145-
"Support 1 partition column for iceberg table, but found " + partitionColumns.size());
146-
try {
147-
partitionInfo = loadPartitionInfo();
148-
} catch (AnalysisException e) {
149-
LOG.warn("Failed to load iceberg table {} partition info.", name, e);
150-
}
151139
}
152-
return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns, snapshotId, partitionInfo));
140+
partitionColumns = tmpColumns;
141+
return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns));
153142
}
154143

155144
@Override
@@ -187,6 +176,11 @@ public Table getIcebergTable() {
187176
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName());
188177
}
189178

179+
private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
180+
return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
181+
.getSnapshotCache(catalog, dbName, name);
182+
}
183+
190184
@Override
191185
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
192186
Env.getCurrentEnv().getRefreshManager()
@@ -195,46 +189,36 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
195189

196190
@Override
197191
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
198-
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
192+
return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
199193
}
200194

201-
private IcebergPartitionInfo getPartitionInfoFromCache() {
202-
makeSureInitialized();
203-
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
204-
if (!schemaCacheValue.isPresent()) {
205-
return new IcebergPartitionInfo();
206-
}
207-
return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo();
195+
@Override
196+
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
197+
return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
208198
}
209199

210200
@Override
211201
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
212-
makeSureInitialized();
213202
return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED;
214203
}
215204

216205
@Override
217206
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException {
218-
return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
207+
return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
219208
}
220209

221210
@Override
222211
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
223-
return getPartitionColumnsFromCache();
224-
}
225-
226-
private List<Column> getPartitionColumnsFromCache() {
227-
makeSureInitialized();
228-
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
229-
return schemaCacheValue
230-
.map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns())
231-
.orElseGet(Lists::newArrayList);
212+
IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot);
213+
IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
214+
return schemaValue.getPartitionColumns();
232215
}
233216

234217
@Override
235218
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
236219
Optional<MvccSnapshot> snapshot) throws AnalysisException {
237-
long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName);
220+
IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot);
221+
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
238222
if (latestSnapshotId <= 0) {
239223
throw new AnalysisException("can not find partition: " + partitionName);
240224
}
@@ -244,16 +228,9 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
244228
@Override
245229
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
246230
throws AnalysisException {
247-
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
248-
}
249-
250-
public long getLatestSnapshotIdFromCache() throws AnalysisException {
251231
makeSureInitialized();
252-
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
253-
if (!schemaCacheValue.isPresent()) {
254-
throw new AnalysisException("Can't find schema cache of table " + name);
255-
}
256-
return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId();
232+
IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot);
233+
return new MTMVVersionSnapshot(snapshotValue.getSnapshot().getSnapshotId());
257234
}
258235

259236
@Override
@@ -268,11 +245,13 @@ public boolean isPartitionColumnAllowNull() {
268245
*/
269246
@Override
270247
public boolean isValidRelatedTable() {
248+
makeSureInitialized();
271249
if (isValidRelatedTableCached) {
272250
return isValidRelatedTable;
273251
}
274252
isValidRelatedTable = false;
275253
Set<String> allFields = Sets.newHashSet();
254+
table = getIcebergTable();
276255
for (PartitionSpec spec : table.specs().values()) {
277256
if (spec == null) {
278257
isValidRelatedTableCached = true;
@@ -299,26 +278,74 @@ public boolean isValidRelatedTable() {
299278
return isValidRelatedTable;
300279
}
301280

302-
protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException {
303-
List<IcebergPartition> icebergPartitions = loadIcebergPartition();
281+
@Override
282+
public MvccSnapshot loadSnapshot() {
283+
return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
284+
}
285+
286+
public long getLatestSnapshotId() {
287+
table = getIcebergTable();
288+
Snapshot snapshot = table.currentSnapshot();
289+
return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : table.currentSnapshot().snapshotId();
290+
}
291+
292+
public long getSchemaId(long snapshotId) {
293+
table = getIcebergTable();
294+
return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
295+
? IcebergUtils.UNKNOWN_SNAPSHOT_ID
296+
: table.snapshot(snapshotId).schemaId();
297+
}
298+
299+
@Override
300+
public List<Column> getFullSchema() {
301+
Optional<MvccSnapshot> snapshotFromContext = MvccUtil.getSnapshotFromContext(this);
302+
IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext);
303+
return getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
304+
}
305+
306+
@Override
307+
public boolean supportInternalPartitionPruned() {
308+
return true;
309+
}
310+
311+
public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
312+
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
313+
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
314+
new IcebergSchemaCacheKey(dbName, name, schemaId));
315+
if (!schemaCacheValue.isPresent()) {
316+
throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
317+
null, catalog.getName(), dbName, name, schemaId);
318+
}
319+
return (IcebergSchemaCacheValue) schemaCacheValue.get();
320+
}
321+
322+
public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException {
323+
// snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet.
324+
if (!isValidRelatedTable() || snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
325+
return new IcebergPartitionInfo();
326+
}
327+
List<IcebergPartition> icebergPartitions = loadIcebergPartition(snapshotId);
304328
Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
305329
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
330+
table = getIcebergTable();
331+
partitionColumns = getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
306332
for (IcebergPartition partition : icebergPartitions) {
307333
nameToPartition.put(partition.getPartitionName(), partition);
308334
String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
309-
Range<PartitionKey> partitionRange = getPartitionRange(partition.getPartitionValues().get(0), transform);
335+
Range<PartitionKey> partitionRange = getPartitionRange(
336+
partition.getPartitionValues().get(0), transform, partitionColumns);
310337
PartitionItem item = new RangePartitionItem(partitionRange);
311338
nameToPartitionItem.put(partition.getPartitionName(), item);
312339
}
313340
Map<String, Set<String>> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem);
314341
return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap);
315342
}
316343

317-
public List<IcebergPartition> loadIcebergPartition() {
344+
public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
318345
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
319346
.createMetadataTableInstance(table, MetadataTableType.PARTITIONS);
320347
List<IcebergPartition> partitions = Lists.newArrayList();
321-
try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().planFiles()) {
348+
try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
322349
for (FileScanTask task : tasks) {
323350
CloseableIterable<StructLike> rows = task.asDataTask().rows();
324351
for (StructLike row : rows) {
@@ -344,6 +371,7 @@ public IcebergPartition generateIcebergPartition(StructLike row) {
344371
// 8. equality_delete_file_count,
345372
// 9. last_updated_at,
346373
// 10. last_updated_snapshot_id
374+
table = getIcebergTable();
347375
Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table.");
348376
int specId = row.get(1, Integer.class);
349377
PartitionSpec partitionSpec = table.specs().get(specId);
@@ -382,13 +410,14 @@ public IcebergPartition generateIcebergPartition(StructLike row) {
382410
}
383411

384412
@VisibleForTesting
385-
public Range<PartitionKey> getPartitionRange(String value, String transform)
413+
public Range<PartitionKey> getPartitionRange(String value, String transform, List<Column> partitionColumns)
386414
throws AnalysisException {
387-
// For NULL value, create a lessThan partition for it.
415+
// For NULL value, create a minimum partition for it.
388416
if (value == null) {
389-
PartitionKey nullKey = PartitionKey.createPartitionKey(
390-
Lists.newArrayList(new PartitionValue("0000-01-02")), partitionColumns);
391-
return Range.lessThan(nullKey);
417+
PartitionKey nullLowKey = PartitionKey.createPartitionKey(
418+
Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns);
419+
PartitionKey nullUpKey = nullLowKey.successor();
420+
return Range.closedOpen(nullLowKey, nullUpKey);
392421
}
393422
LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
394423
LocalDateTime target;
@@ -525,4 +554,12 @@ public boolean validRelatedTableCache() {
525554
public void setIsValidRelatedTableCached(boolean isCached) {
526555
this.isValidRelatedTableCached = isCached;
527556
}
557+
558+
private IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
559+
if (snapshot.isPresent()) {
560+
return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue();
561+
} else {
562+
return getIcebergSnapshotCacheValue();
563+
}
564+
}
528565
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.