From 3f9c92e3e10fa8663746ce9d3a680c6bd6950c2e Mon Sep 17 00:00:00 2001 From: Evan Date: Wed, 22 Apr 2026 05:56:11 +0200 Subject: [PATCH] [kv] Fix non-target columns not being nulled during partial update on first insert (#2969) (cherry picked from commit 0af34786669e697973567e0ad06a89bdbac19d80) --- .../fluss/flink/sink/FlussSinkITCase.java | 31 +++++---- .../org/apache/fluss/server/kv/KvTablet.java | 3 +- .../kv/rowmerger/AggregateRowMerger.java | 4 +- .../server/kv/rowmerger/DefaultRowMerger.java | 4 +- .../kv/rowmerger/FirstRowRowMerger.java | 5 +- .../fluss/server/kv/rowmerger/RowMerger.java | 4 +- .../kv/rowmerger/VersionedRowMerger.java | 5 +- .../apache/fluss/server/kv/KvTabletTest.java | 44 +++++++++++++ .../kv/rowmerger/FirstRowRowMergerTest.java | 65 +++++++++++++++++++ .../kv/rowmerger/VersionedRowMergerTest.java | 16 +++++ 10 files changed, 158 insertions(+), 23 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMergerTest.java diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java index 1a5e98eebb..cfdaf71654 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java @@ -314,11 +314,11 @@ public void testOrdersTablePKSink() throws Exception { public void testPartialUpdateWithTwoWriters() throws Exception { createTable(TablePath.of(DEFAULT_DB, "partial_update_two_writers_test"), pkTableDescriptor); - // Initial inserts + // Initial inserts targeting only orderId+itemId; amount and address are non-target ArrayList initialOrders = new ArrayList<>(); - initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT)); - initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT)); - initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT)); + initialOrders.add(new TestOrder(2001, 3001, null, null, RowKind.INSERT)); + initialOrders.add(new TestOrder(2002, 3002, null, null, RowKind.INSERT)); + initialOrders.add(new TestOrder(2003, 3003, null, null, RowKind.INSERT)); DataStream initialStream = env.fromData(initialOrders); @@ -361,15 +361,16 @@ public void testPartialUpdateWithTwoWriters() throws Exception { } // Build expected change log: 3 inserts, then before/after for 2001 and 2003 + // Initial inserts target only orderId+itemId, so amount is null (non-target column) List expected = new ArrayList<>(); - expected.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT)); - expected.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT)); - expected.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT)); - // update for 2001: before and after (itemId stays, amount/address updated) - expected.add(new TestOrder(2001, 3001, -1, null, RowKind.UPDATE_BEFORE)); + expected.add(new TestOrder(2001, 3001, null, null, RowKind.INSERT)); + expected.add(new TestOrder(2002, 3002, null, null, RowKind.INSERT)); + expected.add(new TestOrder(2003, 3003, null, null, RowKind.INSERT)); + // update for 2001: before shows stored state (amount=null), after has amount/address set + expected.add(new TestOrder(2001, 3001, null, null, RowKind.UPDATE_BEFORE)); expected.add(new TestOrder(2001, 3001, 100, "addr1", RowKind.UPDATE_AFTER)); // update for 2003 - expected.add(new TestOrder(2003, 3003, -1, null, RowKind.UPDATE_BEFORE)); + expected.add(new TestOrder(2003, 3003, null, null, RowKind.UPDATE_BEFORE)); expected.add(new TestOrder(2003, 3003, 300, "addr3", RowKind.UPDATE_AFTER)); // Poll actual changelog until we have all expected records or timeout @@ -382,12 +383,13 @@ public void testPartialUpdateWithTwoWriters() throws Exception { for (TableBucket bucket : scanRecords.buckets()) { for (ScanRecord record : scanRecords.records(bucket)) { InternalRow row = record.getRow(); + Integer amount = row.isNullAt(2) ? null : row.getInt(2); String address = row.getString(3) != null ? row.getString(3).toString() : null; TestOrder order = new TestOrder( row.getLong(0), row.getLong(1), - row.getInt(2), + amount, address, toFlinkRowKind(record.getChangeType())); actual.add(order); @@ -654,11 +656,12 @@ private static class TestOrder implements Serializable { private static final long serialVersionUID = 1L; private final long orderId; private final long itemId; - private final int amount; + private final Integer amount; private final String address; private final RowKind rowKind; - public TestOrder(long orderId, long itemId, int amount, String address, RowKind rowKind) { + public TestOrder( + long orderId, long itemId, Integer amount, String address, RowKind rowKind) { this.orderId = orderId; this.itemId = itemId; this.amount = amount; @@ -674,7 +677,7 @@ public boolean equals(Object o) { TestOrder testOrder = (TestOrder) o; return orderId == testOrder.orderId && itemId == testOrder.itemId - && amount == testOrder.amount + && Objects.equals(amount, testOrder.amount) && Objects.equals(address, testOrder.address) && rowKind == testOrder.rowKind; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index e72428a02f..85c192c922 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -562,9 +562,10 @@ private long processUpsert( byte[] oldValueBytes = getFromBufferOrKv(key); if (oldValueBytes == null) { + BinaryValue valueToInsert = currentMerger.merge(null, currentValue); return applyInsert( key, - currentValue, + valueToInsert, walBuilder, latestSchemaRow, logOffset, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java index 54583a90b3..f84769eb2e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java @@ -85,7 +85,7 @@ public AggregateRowMerger( } @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { // First write: no existing row if (oldValue == null || oldValue.row == null) { return newValue; @@ -268,7 +268,7 @@ private static class PartialAggregateRowMerger implements RowMerger { } @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { // First write: no existing row if (oldValue == null || oldValue.row == null) { return newValue; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java index 3717850fa8..82f1fb8771 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java @@ -46,7 +46,7 @@ public DefaultRowMerger(KvFormat kvFormat, @Nullable DeleteBehavior deleteBehavi @Nullable @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { // always retain the new row (latest row) return newValue; } @@ -98,7 +98,7 @@ public RowMerger configureTargetColumns( @Nullable @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { return partialUpdater.updateRow(oldValue, newValue); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java index 7a279dd29d..0777a626b7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java @@ -44,7 +44,10 @@ public FirstRowRowMerger(@Nullable DeleteBehavior deleteBehavior) { @Nullable @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { + if (oldValue == null) { + return newValue; + } // always retain the old row (first row) return oldValue; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java index ed8b37a658..3f2f570886 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java @@ -36,12 +36,12 @@ public interface RowMerger { /** * Merge the old value with the new value. * - * @param oldValue the old value + * @param oldValue the old value, or null if this is a first insert (no existing row) * @param newValue the new row * @return the merged value, if the returned row is the same to the old row, then nothing * happens to the row (no update, no delete). */ - BinaryValue merge(BinaryValue oldValue, BinaryValue newValue); + BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue); /** * Merge the old row with a delete row. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java index 0bc64f6c9b..7e60f282e4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java @@ -65,7 +65,10 @@ public VersionedRowMerger(String versionColumnName, @Nullable DeleteBehavior del @Nullable @Override - public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) { + public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) { + if (oldValue == null) { + return newValue; + } // return newRow if newRow's version is larger or equal than oldRow's version return versionComparator.compare(oldValue.row, newValue.row) <= 0 ? newValue : oldValue; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index da918f45e8..d4d30b5d1e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -463,6 +463,50 @@ void testPartialUpdateAndDelete() throws Exception { checkEqual(actualLogRecords, expectedLogs, rowType); } + @Test + void testPartialUpdateFirstInsertThenUpdate() throws Exception { + initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>()); + RowType rowType = DATA2_SCHEMA.getRowType(); + KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // First insert: partial update columns a and b, column c should be null + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "v1", "ignored"})); + kvTablet.putAsLeader(batch1, new int[] {0, 1}); + long endOffset = logTablet.localLogEndOffset(); + + // Verify first insert stored correctly with null for non-target column + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null}))); + + // Second update: partial update columns a and c, column b should retain "v1" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "ignored2", "c1"})); + kvTablet.putAsLeader(batch2, new int[] {0, 2}); + + // Verify: b should retain "v1" from first insert, c should be updated to "c1" + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", "c1"}))); + + // Verify CDC log for the second update + LogRecords actualLogRecords = readLogRecords(endOffset); + List expectedLogs = + Collections.singletonList( + logRecords( + rowType, + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, "v1", null}, + new Object[] {1, "v1", "c1"}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + } + @Test void testPutWithMultiThread() throws Exception { initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMergerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMergerTest.java new file mode 100644 index 0000000000..ffe21bfef9 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMergerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger; + +import org.apache.fluss.metadata.DeleteBehavior; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.record.BinaryValue; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.apache.fluss.testutils.DataTestUtils.compactedRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FirstRowRowMerger}. */ +class FirstRowRowMergerTest { + + private static final Schema SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + + private static final RowType ROW_TYPE = SCHEMA.getRowType(); + + private static BinaryValue binaryValue(Object[] objects) { + return new BinaryValue((short) 1, compactedRow(ROW_TYPE, objects)); + } + + @Test + void testMergeWithNullOldValueReturnsNewValue() { + FirstRowRowMerger merger = new FirstRowRowMerger(DeleteBehavior.DISABLE); + + BinaryValue newValue = binaryValue(new Object[] {1, "first"}); + BinaryValue result = merger.merge(null, newValue); + assertThat(result).isSameAs(newValue); + } + + @Test + void testMergeRetainsFirstRow() { + FirstRowRowMerger merger = new FirstRowRowMerger(DeleteBehavior.DISABLE); + + BinaryValue oldValue = binaryValue(new Object[] {1, "first"}); + BinaryValue newValue = binaryValue(new Object[] {1, "second"}); + BinaryValue result = merger.merge(oldValue, newValue); + assertThat(result).isSameAs(oldValue); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java index 9163b23b0c..461fcca424 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java @@ -143,6 +143,22 @@ void testAllTypes(DataType type, List testSpecs) { } } + @Test + void testMergeWithNullOldValueReturnsNewValue() { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + RowType rowType = schema.getRowType(); + VersionedRowMerger merger = new VersionedRowMerger("a", DeleteBehavior.DISABLE); + merger.configureTargetColumns(null, (short) 1, schema); + + BinaryValue newValue = binaryValue(rowType, new Object[] {42, "first"}); + BinaryValue result = merger.merge(null, newValue); + assertThat(result).isSameAs(newValue); + } + @Test void testNormal() { Schema schema =