Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ public void testOrdersTablePKSink() throws Exception {
public void testPartialUpdateWithTwoWriters() throws Exception {
createTable(TablePath.of(DEFAULT_DB, "partial_update_two_writers_test"), pkTableDescriptor);

// Initial inserts
// Initial inserts targeting only orderId+itemId; amount and address are non-target
ArrayList<TestOrder> initialOrders = new ArrayList<>();
initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
initialOrders.add(new TestOrder(2001, 3001, null, null, RowKind.INSERT));
initialOrders.add(new TestOrder(2002, 3002, null, null, RowKind.INSERT));
initialOrders.add(new TestOrder(2003, 3003, null, null, RowKind.INSERT));

DataStream<TestOrder> initialStream = env.fromData(initialOrders);

Expand Down Expand Up @@ -361,15 +361,16 @@ public void testPartialUpdateWithTwoWriters() throws Exception {
}

// Build expected change log: 3 inserts, then before/after for 2001 and 2003
// Initial inserts target only orderId+itemId, so amount is null (non-target column)
List<TestOrder> expected = new ArrayList<>();
expected.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
expected.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
expected.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
// update for 2001: before and after (itemId stays, amount/address updated)
expected.add(new TestOrder(2001, 3001, -1, null, RowKind.UPDATE_BEFORE));
expected.add(new TestOrder(2001, 3001, null, null, RowKind.INSERT));
expected.add(new TestOrder(2002, 3002, null, null, RowKind.INSERT));
expected.add(new TestOrder(2003, 3003, null, null, RowKind.INSERT));
// update for 2001: before shows stored state (amount=null), after has amount/address set
expected.add(new TestOrder(2001, 3001, null, null, RowKind.UPDATE_BEFORE));
expected.add(new TestOrder(2001, 3001, 100, "addr1", RowKind.UPDATE_AFTER));
// update for 2003
expected.add(new TestOrder(2003, 3003, -1, null, RowKind.UPDATE_BEFORE));
expected.add(new TestOrder(2003, 3003, null, null, RowKind.UPDATE_BEFORE));
expected.add(new TestOrder(2003, 3003, 300, "addr3", RowKind.UPDATE_AFTER));

// Poll actual changelog until we have all expected records or timeout
Expand All @@ -382,12 +383,13 @@ public void testPartialUpdateWithTwoWriters() throws Exception {
for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
Integer amount = row.isNullAt(2) ? null : row.getInt(2);
String address = row.getString(3) != null ? row.getString(3).toString() : null;
TestOrder order =
new TestOrder(
row.getLong(0),
row.getLong(1),
row.getInt(2),
amount,
address,
toFlinkRowKind(record.getChangeType()));
actual.add(order);
Expand Down Expand Up @@ -654,11 +656,12 @@ private static class TestOrder implements Serializable {
private static final long serialVersionUID = 1L;
private final long orderId;
private final long itemId;
private final int amount;
private final Integer amount;
private final String address;
private final RowKind rowKind;

public TestOrder(long orderId, long itemId, int amount, String address, RowKind rowKind) {
public TestOrder(
long orderId, long itemId, Integer amount, String address, RowKind rowKind) {
this.orderId = orderId;
this.itemId = itemId;
this.amount = amount;
Expand All @@ -674,7 +677,7 @@ public boolean equals(Object o) {
TestOrder testOrder = (TestOrder) o;
return orderId == testOrder.orderId
&& itemId == testOrder.itemId
&& amount == testOrder.amount
&& Objects.equals(amount, testOrder.amount)
&& Objects.equals(address, testOrder.address)
&& rowKind == testOrder.rowKind;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,10 @@ private long processUpsert(

byte[] oldValueBytes = getFromBufferOrKv(key);
if (oldValueBytes == null) {
BinaryValue valueToInsert = currentMerger.merge(null, currentValue);
return applyInsert(
key,
currentValue,
valueToInsert,
walBuilder,
latestSchemaRow,
logOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public AggregateRowMerger(
}

@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
// First write: no existing row
if (oldValue == null || oldValue.row == null) {
return newValue;
Expand Down Expand Up @@ -268,7 +268,7 @@ private static class PartialAggregateRowMerger implements RowMerger {
}

@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
// First write: no existing row
if (oldValue == null || oldValue.row == null) {
return newValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DefaultRowMerger(KvFormat kvFormat, @Nullable DeleteBehavior deleteBehavi

@Nullable
@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
// always retain the new row (latest row)
return newValue;
}
Expand Down Expand Up @@ -98,7 +98,7 @@ public RowMerger configureTargetColumns(

@Nullable
@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
return partialUpdater.updateRow(oldValue, newValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public FirstRowRowMerger(@Nullable DeleteBehavior deleteBehavior) {

@Nullable
@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
if (oldValue == null) {
return newValue;
}
// always retain the old row (first row)
return oldValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public interface RowMerger {
/**
* Merge the old value with the new value.
*
* @param oldValue the old value
* @param oldValue the old value, or null if this is a first insert (no existing row)
* @param newValue the new row
* @return the merged value, if the returned row is the same to the old row, then nothing
* happens to the row (no update, no delete).
*/
BinaryValue merge(BinaryValue oldValue, BinaryValue newValue);
BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue);

/**
* Merge the old row with a delete row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public VersionedRowMerger(String versionColumnName, @Nullable DeleteBehavior del

@Nullable
@Override
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
public BinaryValue merge(@Nullable BinaryValue oldValue, BinaryValue newValue) {
if (oldValue == null) {
return newValue;
}
// return newRow if newRow's version is larger or equal than oldRow's version
return versionComparator.compare(oldValue.row, newValue.row) <= 0 ? newValue : oldValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,50 @@ void testPartialUpdateAndDelete() throws Exception {
checkEqual(actualLogRecords, expectedLogs, rowType);
}

@Test
void testPartialUpdateFirstInsertThenUpdate() throws Exception {
initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>());
RowType rowType = DATA2_SCHEMA.getRowType();
KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
KvRecordTestUtils.KvRecordFactory.of(rowType);

// First insert: partial update columns a and b, column c should be null
KvRecordBatch batch1 =
kvRecordBatchFactory.ofRecords(
data2kvRecordFactory.ofRecord(
"k1".getBytes(), new Object[] {1, "v1", "ignored"}));
kvTablet.putAsLeader(batch1, new int[] {0, 1});
long endOffset = logTablet.localLogEndOffset();

// Verify first insert stored correctly with null for non-target column
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null})));

// Second update: partial update columns a and c, column b should retain "v1"
KvRecordBatch batch2 =
kvRecordBatchFactory.ofRecords(
data2kvRecordFactory.ofRecord(
"k1".getBytes(), new Object[] {1, "ignored2", "c1"}));
kvTablet.putAsLeader(batch2, new int[] {0, 2});

// Verify: b should retain "v1" from first insert, c should be updated to "c1"
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", "c1"})));

// Verify CDC log for the second update
LogRecords actualLogRecords = readLogRecords(endOffset);
List<MemoryLogRecords> expectedLogs =
Collections.singletonList(
logRecords(
rowType,
endOffset,
Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER),
Arrays.asList(
new Object[] {1, "v1", null},
new Object[] {1, "v1", "c1"})));
checkEqual(actualLogRecords, expectedLogs, rowType);
}

@Test
void testPutWithMultiThread() throws Exception {
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.rowmerger;

import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;

import org.junit.jupiter.api.Test;

import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link FirstRowRowMerger}. */
class FirstRowRowMergerTest {

private static final Schema SCHEMA =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.primaryKey("id")
.build();

private static final RowType ROW_TYPE = SCHEMA.getRowType();

private static BinaryValue binaryValue(Object[] objects) {
return new BinaryValue((short) 1, compactedRow(ROW_TYPE, objects));
}

@Test
void testMergeWithNullOldValueReturnsNewValue() {
FirstRowRowMerger merger = new FirstRowRowMerger(DeleteBehavior.DISABLE);

BinaryValue newValue = binaryValue(new Object[] {1, "first"});
BinaryValue result = merger.merge(null, newValue);
assertThat(result).isSameAs(newValue);
}

@Test
void testMergeRetainsFirstRow() {
FirstRowRowMerger merger = new FirstRowRowMerger(DeleteBehavior.DISABLE);

BinaryValue oldValue = binaryValue(new Object[] {1, "first"});
BinaryValue newValue = binaryValue(new Object[] {1, "second"});
BinaryValue result = merger.merge(oldValue, newValue);
assertThat(result).isSameAs(oldValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ void testAllTypes(DataType type, List<TestSpec> testSpecs) {
}
}

@Test
void testMergeWithNullOldValueReturnsNewValue() {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.build();
RowType rowType = schema.getRowType();
VersionedRowMerger merger = new VersionedRowMerger("a", DeleteBehavior.DISABLE);
merger.configureTargetColumns(null, (short) 1, schema);

BinaryValue newValue = binaryValue(rowType, new Object[] {42, "first"});
BinaryValue result = merger.merge(null, newValue);
assertThat(result).isSameAs(newValue);
}

@Test
void testNormal() {
Schema schema =
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.