Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Conversation

@koenvo
Copy link
Contributor

@koenvo koenvo commented Apr 2, 2025

Rationale for this change

Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like struct, list, and map — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like:

ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo

This PR introduces a fallback mechanism: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.


Before

# Fails if venue_geo is a non-key struct field
txn.upsert(df, join_cols=["match_id"])

❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo


After

# Falls back to key-based join and proceeds
txn.upsert(df, join_cols=["match_id"])

✅ Successfully inserts or updates the record, skipping complex field comparison during join


✅ Are these changes tested?

Yes:

  • A test was added to reproduce the failure scenario with complex non-key fields.
  • The new behavior is verified by asserting that the upsert completes successfully using the fallback logic.

ℹ️ Note
This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness.


Are there any user-facing changes?

Yes — upserts involving complex non-key columns (like struct, list, or map) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.

@koenvo koenvo mentioned this pull request Apr 3, 2025
3 tasks
@koenvo koenvo marked this pull request as ready for review April 3, 2025 07:08
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @koenvo It looks like a lot of folks are waiting for this.

Could you run a poormans benchmark, similar to what I did here: #1685 (comment) Just to see how the two methods compare in terms of performance?


MARKER_COLUMN_NAME = "__from_target"

assert MARKER_COLUMN_NAME not in join_cols_set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to avoid assert outside of the tests. Could you raise a ValueError instead?

@koenvo
Copy link
Contributor Author

koenvo commented Apr 8, 2025

Poor man's benchmark

This compares the performance of the original vs fallback upsert logic.
The "With skips" case simulates situations where non-matching rows can be skipped during comparison.

Condition Original (s) Fallback (s) Diff (ms) Diff (%)
Without skips 0.727 0.724 -2.73 -0.38%
With skips 0.681 0.732 +51.24 +7.53%

No significant performance regression observed. Fallback behaves as expected.

@Fokko
Copy link
Contributor

Fokko commented Apr 17, 2025

First of all, sorry for the late reply, I was busy with the Iceberg summit :)

@koenvo The reason I was asking for a benchmark is to see if we can replace the existing logic with your logic that also works with {map,list,struct} types. I'm a simple man and I like simple things.

pyiceberg/table/upsert_util.py Outdated Show resolved Hide resolved
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks for the PR. I added a few comments

tests/table/test_upsert.py Outdated Show resolved Hide resolved
tests/table/test_upsert.py Show resolved Hide resolved
tests/table/test_upsert.py Outdated Show resolved Hide resolved
pyiceberg/table/upsert_util.py Outdated Show resolved Hide resolved
pyiceberg/table/upsert_util.py Outdated Show resolved Hide resolved
pyiceberg/table/upsert_util.py Outdated Show resolved Hide resolved
pyiceberg/table/upsert_util.py Outdated Show resolved Hide resolved
@kevinjqliu
Copy link
Contributor

So i took some time to think about this issue.

The main issue here is that pyarrow's join does not support complex types (regardless of whether the complex type is part of the join keys).

Taking a step back, the get_rows_to_update function does 2 things

  1. it filters both the iceberg table (target table) and the upsert dataframe (source table) on the join keys. Any matching rows will be part of the potential "rows to update" table
  2. it does an extra optimization to avoid rewriting rows that are exact match. This is done by matching the "none join keys". We filter out any rows that are exact match.

Both operations are combined by using the join function.

This PR currently skips step 2 as a fallback mechanism. All rows matching the join keys will be returned, regardless of whether its an exact match. I think this is fine since the overwrite will just overwrite more data.

We should update this comment though,

# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
# this extra step avoids unnecessary IO and writes
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)

I cant think of efficient way to do step 2. I think it is possible to build a mask with all rows of the "none join keys" columns and then filter the "rows to update" table...
We are kind of already building the filter here

diff_expr = functools.reduce(
operator.or_,
[
pc.or_kleene(
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
)
for col in non_key_cols
],
)

@Fokko
Copy link
Contributor

Fokko commented Apr 23, 2025

Thanks @kevinjqliu for the write-up. I checked locally and also noticed that the fallback has a different behavior:

FAILED tests/table/test_upsert.py::test_merge_scenario_skip_upd_row - AssertionError: rows updated should be 1, but got 2
FAILED tests/table/test_upsert.py::test_merge_scenario_date_as_key - AssertionError: rows updated should be 1, but got 2
FAILED tests/table/test_upsert.py::test_merge_scenario_string_as_key - AssertionError: rows updated should be 1, but got 2
FAILED tests/table/test_upsert.py::test_upsert_with_identifier_fields - assert 2 == 1

How about reverting my work in #1685? As I wasn't aware of the limitation of Arrow which doesn't support complex types in joins.

@koenvo
Copy link
Contributor Author

koenvo commented Apr 23, 2025

I was already experimenting with creating a mask. So instead of creating an expression (like you referenced to) it will be a mask.

We probably need to use the result of the join as indices for both tables. This requires to have index columns for both source and target, and do something like take(source_index_column) and take(target_index_column).

@koenvo
Copy link
Contributor Author

koenvo commented Apr 23, 2025

I was already experimenting with creating a mask. So instead of creating an expression (like you referenced to) it will be a mask.

We probably need to use the result of the join as indices for both tables. This requires to have index columns for both source and target, and do something like take(source_index_column) and take(target_index_column).

This doesn't work as expected.

I tried something like this:

def get_rows_to_update(....):
        .....

        filtered_source = source_table.take(joined[SOURCE_INDEX_COLUMN_NAME])
        filtered_target = target_table.take(joined[TARGET_INDEX_COLUMN_NAME])


        diff_expr = functools.reduce(
            pc.or_,
            [
                pc.or_kleene(
                    pc.not_equal(filtered_source[col], filtered_target[col]),
                    pc.is_null(pc.not_equal(filtered_source[col], filtered_target[col])),
                )
                for col in non_key_cols
            ],
        )

        filtered_source = filtered_source.filter(diff_expr)

E pyarrow.lib.ArrowNotImplementedError: Function 'not_equal' has no kernel matching input types (struct<sub1: large_string not null, sub2: large_string not null>, struct<sub1: large_string not null, sub2: large_string not null>)

@koenvo
Copy link
Contributor Author

koenvo commented Apr 23, 2025

When we don't check if rows need to be updated, we can probably replace upsert by:

with self.transaction() as tx:
    overwrite_mask_predicate = upsert_util.create_match_filter(df, join_cols)
    tx.overwrite(df, overwrite_filter=overwrite_mask_predicate)

Only need a way to figure out the updated/inserted rows. https://github.com/apache/iceberg-python/pull/1947/files

This breaks when_matched_update_all and when_not_matched_insert_all btw.

@kevinjqliu
Copy link
Contributor

sadly it looks like comparison functions (pc.equal) doesn't work with complex types either, bummer!

I was able to verify and reproduce it in a jupyter notebook https://gist.github.com/kevinjqliu/33670f6e852fe175724bffd2e2f4cf07

@kevinjqliu
Copy link
Contributor

Im in favor of reverting #1685, let me see if that will help resolve this issue

@Fokko Fokko added this to the PyIceberg 0.9.1 milestone Apr 24, 2025
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @koenvo for following up so quickly. I think this is the right approach for now until we support doing joins/comparision on complex types in PyArrow.

Comment on lines 105 to 106
# Step 3: Perform a left outer join to find which rows from source exist in target
joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be an inner join, since we don't support null-keys.

Suggested change
# Step 3: Perform a left outer join to find which rows from source exist in target
joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer")
# Step 3: Perform a join to find which rows from source exist in target
joined = source_index.join(target_index, keys=list(join_cols_set), join_type="inner")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good catch! We can do inner join and remove the matching_indices = joined.filter(pc.field(MARKER_COLUMN_NAME)) step.

tbl.upsert(df)


def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit change the name since it no longer fails :)

i also added another test here for when complex type is used as the join key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied your test to this branch and renamed the non failing :-)

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! very elegant workaround.

with pytest.raises(
pa.lib.ArrowNotImplementedError, match="Keys of type struct<sub1: large_string not null, sub2: large_string not null>"
):
_ = tbl.upsert(update_data, join_cols=["nested_type"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this fails not in get_rows_to_update but in has_duplicate_rows
group_by has the same limitation as join

../../Library/Caches/pypoetry/virtualenvs/pyiceberg-Is5Rt7Ah-py3.12/lib/python3.12/site-packages/pyarrow/acero.py:410: in _group_by
    return decl.to_table(use_threads=use_threads)
pyarrow/_acero.pyx:590: in pyarrow._acero.Declaration.to_table
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   pyarrow.lib.ArrowNotImplementedError: Keys of type struct<sub1: large_string not null, sub2: large_string not null>

pyarrow/error.pxi:92: ArrowNotImplementedError

@kevinjqliu kevinjqliu merged commit be528ae into apache:main Apr 24, 2025
7 checks passed
@kevinjqliu
Copy link
Contributor

Thanks @koenvo for the PR and @Fokko for the review :)

@koenvo koenvo deleted the bugfix/upsert-complex-type branch April 25, 2025 07:41
@koenvo
Copy link
Contributor Author

koenvo commented Apr 25, 2025

Thanks @koenvo for the PR and @Fokko for the review :)

You're welcome. Thanks for the feedback!

Fokko pushed a commit that referenced this pull request Apr 25, 2025
… rows (#1878)

<!-- Fixes #1711 -->

## Rationale for this change

Upsert operations in PyIceberg rely on Arrow joins between source and
target rows. However, Arrow Acero cannot compare certain complex types —
like `struct`, `list`, and `map` — unless they’re part of the join key.
When such types exist in non-join columns, the upsert fails with an
error like:

```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo```


This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.

---

### Before

```python
# Fails if venue_geo is a non-key struct field
txn.upsert(df, join_cols=["match_id"])
```

> ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo

---

### After

```python
# Falls back to key-based join and proceeds
txn.upsert(df, join_cols=["match_id"])
```

> ✅ Successfully inserts or updates the record, skipping complex field comparison during join

---

## ✅ Are these changes tested?

Yes:
- A test was added to reproduce the failure scenario with complex non-key fields.
- The new behavior is verified by asserting that the upsert completes successfully using the fallback logic.

---

> ℹ️ **Note**  
> This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness.

---

## Are there any user-facing changes?

Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
Fokko pushed a commit that referenced this pull request Apr 25, 2025
… rows (#1878)

<!-- Fixes #1711 -->

Upsert operations in PyIceberg rely on Arrow joins between source and
target rows. However, Arrow Acero cannot compare certain complex types —
like `struct`, `list`, and `map` — unless they’re part of the join key.
When such types exist in non-join columns, the upsert fails with an
error like:

```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo```

This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.

---

```python
txn.upsert(df, join_cols=["match_id"])
```

> ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo

---

```python
txn.upsert(df, join_cols=["match_id"])
```

> ✅ Successfully inserts or updates the record, skipping complex field comparison during join

---

Yes:
- A test was added to reproduce the failure scenario with complex non-key fields.
- The new behavior is verified by asserting that the upsert completes successfully using the fallback logic.

---

> ℹ️ **Note**
> This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness.

---

Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
@kevinjqliu
Copy link
Contributor

Leaving this snippet from my stash here for when pyarrow supports comparison of complex types.
This is a pyarrow compute only implementation

    # filters the source_table for matching rows based on join key columns
    match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col)) for col in join_cols])
    matching_source_rows = source_table.filter(match_expr)

    # filters the source_table for non-matching rows based on non join key columns, these need to be updated
    non_match_expr = functools.reduce(operator.or_, [~pc.field(col).isin(target_table.column(col)) for col in non_key_cols])
    rows_to_update_table = matching_source_rows.filter(non_match_expr)
    return rows_to_update_table

gabeiglio pushed a commit to Netflix/iceberg-python that referenced this pull request Aug 13, 2025
… rows (apache#1878)

<!-- Fixes apache#1711 -->

## Rationale for this change

Upsert operations in PyIceberg rely on Arrow joins between source and
target rows. However, Arrow Acero cannot compare certain complex types —
like `struct`, `list`, and `map` — unless they’re part of the join key.
When such types exist in non-join columns, the upsert fails with an
error like:

```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo```


This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.

---

### Before

```python
# Fails if venue_geo is a non-key struct field
txn.upsert(df, join_cols=["match_id"])
```

> ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo

---

### After

```python
# Falls back to key-based join and proceeds
txn.upsert(df, join_cols=["match_id"])
```

> ✅ Successfully inserts or updates the record, skipping complex field comparison during join

---

## ✅ Are these changes tested?

Yes:
- A test was added to reproduce the failure scenario with complex non-key fields.
- The new behavior is verified by asserting that the upsert completes successfully using the fallback logic.

---

> ℹ️ **Note**  
> This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness.

---

## Are there any user-facing changes?

Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
MoIMC pushed a commit to imc-trading/iceberg-python that referenced this pull request Dec 11, 2025
… rows (apache#1878)

<!-- Fixes apache#1711 -->

Upsert operations in PyIceberg rely on Arrow joins between source and
target rows. However, Arrow Acero cannot compare certain complex types —
like `struct`, `list`, and `map` — unless they’re part of the join key.
When such types exist in non-join columns, the upsert fails with an
error like:

```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo```

This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.

---

```python
txn.upsert(df, join_cols=["match_id"])
```

> ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo

---

```python
txn.upsert(df, join_cols=["match_id"])
```

> ✅ Successfully inserts or updates the record, skipping complex field comparison during join

---

Yes:
- A test was added to reproduce the failure scenario with complex non-key fields.
- The new behavior is verified by asserting that the upsert completes successfully using the fallback logic.

---

> ℹ️ **Note**
> This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness.

---

Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.