-
Notifications
You must be signed in to change notification settings - Fork 412
Fallback for upsert when arrow cannot compare source rows with target rows #1878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @koenvo It looks like a lot of folks are waiting for this.
Could you run a poormans benchmark, similar to what I did here: #1685 (comment) Just to see how the two methods compare in terms of performance?
pyiceberg/table/upsert_util.py
Outdated
|
|
||
| MARKER_COLUMN_NAME = "__from_target" | ||
|
|
||
| assert MARKER_COLUMN_NAME not in join_cols_set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid assert outside of the tests. Could you raise a ValueError instead?
Poor man's benchmarkThis compares the performance of the original vs fallback
No significant performance regression observed. Fallback behaves as expected. |
|
First of all, sorry for the late reply, I was busy with the Iceberg summit :) @koenvo The reason I was asking for a benchmark is to see if we can replace the existing logic with your logic that also works with |
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks for the PR. I added a few comments
|
So i took some time to think about this issue. The main issue here is that pyarrow's Taking a step back, the
Both operations are combined by using the This PR currently skips step 2 as a fallback mechanism. All rows matching the join keys will be returned, regardless of whether its an exact match. I think this is fine since the overwrite will just overwrite more data. We should update this comment though, iceberg-python/pyiceberg/table/__init__.py Lines 1204 to 1207 in b85127e
I cant think of efficient way to do step 2. I think it is possible to build a mask with all rows of the "none join keys" columns and then filter the "rows to update" table... iceberg-python/pyiceberg/table/upsert_util.py Lines 74 to 83 in b85127e
|
|
Thanks @kevinjqliu for the write-up. I checked locally and also noticed that the fallback has a different behavior: How about reverting my work in #1685? As I wasn't aware of the limitation of Arrow which doesn't support complex types in joins. |
|
I was already experimenting with creating a mask. So instead of creating an expression (like you referenced to) it will be a mask. We probably need to use the result of the join as indices for both tables. This requires to have index columns for both source and target, and do something like take(source_index_column) and take(target_index_column). |
This doesn't work I tried something like this: def get_rows_to_update(....):
.....
filtered_source = source_table.take(joined[SOURCE_INDEX_COLUMN_NAME])
filtered_target = target_table.take(joined[TARGET_INDEX_COLUMN_NAME])
diff_expr = functools.reduce(
pc.or_,
[
pc.or_kleene(
pc.not_equal(filtered_source[col], filtered_target[col]),
pc.is_null(pc.not_equal(filtered_source[col], filtered_target[col])),
)
for col in non_key_cols
],
)
filtered_source = filtered_source.filter(diff_expr)
|
|
When we don't check if rows need to be updated, we can probably replace with self.transaction() as tx:
overwrite_mask_predicate = upsert_util.create_match_filter(df, join_cols)
tx.overwrite(df, overwrite_filter=overwrite_mask_predicate)
This breaks |
|
sadly it looks like comparison functions ( I was able to verify and reproduce it in a jupyter notebook https://gist.github.com/kevinjqliu/33670f6e852fe175724bffd2e2f4cf07 |
|
Im in favor of reverting #1685, let me see if that will help resolve this issue |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @koenvo for following up so quickly. I think this is the right approach for now until we support doing joins/comparision on complex types in PyArrow.
pyiceberg/table/upsert_util.py
Outdated
| # Step 3: Perform a left outer join to find which rows from source exist in target | ||
| joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be an inner join, since we don't support null-keys.
| # Step 3: Perform a left outer join to find which rows from source exist in target | |
| joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") | |
| # Step 3: Perform a join to find which rows from source exist in target | |
| joined = source_index.join(target_index, keys=list(join_cols_set), join_type="inner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good catch! We can do inner join and remove the matching_indices = joined.filter(pc.field(MARKER_COLUMN_NAME)) step.
tests/table/test_upsert.py
Outdated
| tbl.upsert(df) | ||
|
|
||
|
|
||
| def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit change the name since it no longer fails :)
i also added another test here for when complex type is used as the join key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied your test to this branch and renamed the non failing :-)
…+ rename upsert test with struct field as non-join key
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! very elegant workaround.
| with pytest.raises( | ||
| pa.lib.ArrowNotImplementedError, match="Keys of type struct<sub1: large_string not null, sub2: large_string not null>" | ||
| ): | ||
| _ = tbl.upsert(update_data, join_cols=["nested_type"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw this fails not in get_rows_to_update but in has_duplicate_rows
group_by has the same limitation as join
../../Library/Caches/pypoetry/virtualenvs/pyiceberg-Is5Rt7Ah-py3.12/lib/python3.12/site-packages/pyarrow/acero.py:410: in _group_by
return decl.to_table(use_threads=use_threads)
pyarrow/_acero.pyx:590: in pyarrow._acero.Declaration.to_table
???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ???
E pyarrow.lib.ArrowNotImplementedError: Keys of type struct<sub1: large_string not null, sub2: large_string not null>
pyarrow/error.pxi:92: ArrowNotImplementedError
… rows (#1878) <!-- Fixes #1711 --> ## Rationale for this change Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like `struct`, `list`, and `map` — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like: ```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo``` This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data. --- ### Before ```python # Fails if venue_geo is a non-key struct field txn.upsert(df, join_cols=["match_id"]) ``` > ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo --- ### After ```python # Falls back to key-based join and proceeds txn.upsert(df, join_cols=["match_id"]) ``` > ✅ Successfully inserts or updates the record, skipping complex field comparison during join --- ## ✅ Are these changes tested? Yes: - A test was added to reproduce the failure scenario with complex non-key fields. - The new behavior is verified by asserting that the upsert completes successfully using the fallback logic. --- > ℹ️ **Note** > This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness. --- ## Are there any user-facing changes? Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
… rows (#1878) <!-- Fixes #1711 --> Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like `struct`, `list`, and `map` — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like: ```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo``` This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data. --- ```python txn.upsert(df, join_cols=["match_id"]) ``` > ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo --- ```python txn.upsert(df, join_cols=["match_id"]) ``` > ✅ Successfully inserts or updates the record, skipping complex field comparison during join --- Yes: - A test was added to reproduce the failure scenario with complex non-key fields. - The new behavior is verified by asserting that the upsert completes successfully using the fallback logic. --- > ℹ️ **Note** > This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness. --- Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
|
Leaving this snippet from my stash here for when pyarrow supports comparison of complex types. |
… rows (apache#1878) <!-- Fixes apache#1711 --> ## Rationale for this change Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like `struct`, `list`, and `map` — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like: ```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo``` This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data. --- ### Before ```python # Fails if venue_geo is a non-key struct field txn.upsert(df, join_cols=["match_id"]) ``` > ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo --- ### After ```python # Falls back to key-based join and proceeds txn.upsert(df, join_cols=["match_id"]) ``` > ✅ Successfully inserts or updates the record, skipping complex field comparison during join --- ## ✅ Are these changes tested? Yes: - A test was added to reproduce the failure scenario with complex non-key fields. - The new behavior is verified by asserting that the upsert completes successfully using the fallback logic. --- > ℹ️ **Note** > This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness. --- ## Are there any user-facing changes? Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
… rows (apache#1878) <!-- Fixes apache#1711 --> Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like `struct`, `list`, and `map` — unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like: ```ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo``` This PR introduces a **fallback mechanism**: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data. --- ```python txn.upsert(df, join_cols=["match_id"]) ``` > ❌ ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo --- ```python txn.upsert(df, join_cols=["match_id"]) ``` > ✅ Successfully inserts or updates the record, skipping complex field comparison during join --- Yes: - A test was added to reproduce the failure scenario with complex non-key fields. - The new behavior is verified by asserting that the upsert completes successfully using the fallback logic. --- > ℹ️ **Note** > This change does not affect users who do not include complex types in their schemas. For those who do, it improves resilience while preserving data correctness. --- Yes — upserts involving complex non-key columns (like `struct`, `list`, or `map`) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.
Rationale for this change
Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like
struct,list, andmap— unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like:ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geoThis PR introduces a fallback mechanism: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.
Before
After
✅ Are these changes tested?
Yes:
Are there any user-facing changes?
Yes — upserts involving complex non-key columns (like
struct,list, ormap) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.