Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Add Spark trip validation helper and unit tests with domain model parity#2

Open
Copilot wants to merge 2 commits intodevabhiab54/dataengineer-transformations-python:devfrom
copilot/add-trip-validation-helperabhiab54/dataengineer-transformations-python:copilot/add-trip-validation-helperCopy head branch name to clipboard
Open

Add Spark trip validation helper and unit tests with domain model parity#2
Copilot wants to merge 2 commits intodevabhiab54/dataengineer-transformations-python:devfrom
copilot/add-trip-validation-helperabhiab54/dataengineer-transformations-python:copilot/add-trip-validation-helperCopy head branch name to clipboard

Conversation

Copy link

Copilot AI commented Jan 7, 2026

Adds a pure Spark expression-based validation helper for CitiBike trip coordinates that mirrors domain-level validation logic, enabling efficient filtering of invalid trips in distributed processing.

Implementation

  • data_transformations/citibike/domain.py: Trip dataclass with is_valid() method validating coordinate presence and ranges (lat: [-90, 90], lon: [-180, 180])

  • data_transformations/citibike/validation.py:

    • trip_is_valid_expr(): Returns Spark Column expression checking coordinate nullability and ranges
    • add_trip_validity_column(): Adds boolean validity column with null coalescing
  • tests/conftest.py: Session-scoped spark fixture for unit tests

  • tests/unit/test_validation_vs_domain.py: Parametrized test asserting Spark validation matches domain validation across 6 edge cases

Usage

from data_transformations.citibike.validation import add_trip_validity_column

# Add validity column to trip DataFrame
df_with_validity = add_trip_validity_column(trips_df)
valid_trips = df_with_validity.filter(F.col("trip_is_valid"))

The Spark expression uses native column operations (isNotNull(), between()) rather than UDFs, enabling predicate pushdown and optimized execution.

Warning

Firewall rules blocked me from connecting to one or more addresses (expand for details)

I tried to connect to the following addresses, but was blocked by firewall rules:

  • esm.ubuntu.com
    • Triggering command: /usr/lib/apt/methods/https /usr/lib/apt/methods/https (dns block)

If you need me to access, download, or install something from one of these locations, you can either:

Original prompt

Add Spark trip validation helper and unit test

This PR adds a Spark boolean expression helper that mirrors the Trip.is_valid() domain validation, plus a unit test that compares the Spark validation results with the domain-level Trip.is_valid() implementation.

Files to add (exact content provided):

  1. data_transformations/citibike/validation.py
from typing import List

from pyspark.sql import Column, DataFrame
from pyspark.sql import functions as F

# Reuse the same coordinate names used across the codebase
REQUIRED_COORDS: List[str] = [
    "start_station_latitude",
    "start_station_longitude",
    "end_station_latitude",
    "end_station_longitude",
]


def trip_is_valid_expr() -> Column:
    """
    Returns a Spark Column (boolean expression) that is True when:
      - All four coordinate columns are non-null
      - Latitudes are in [-90, 90]
      - Longitudes are in [-180, 180]
    The expression itself may evaluate to NULL for rows with NULL inputs; callers
    may want to coalesce to False if a strict boolean is required.
    """
    return (
        F.col("start_station_latitude").isNotNull()
        & F.col("start_station_longitude").isNotNull()
        & F.col("end_station_latitude").isNotNull()
        & F.col("end_station_longitude").isNotNull()
        & F.col("start_station_latitude").between(-90.0, 90.0)
        & F.col("end_station_latitude").between(-90.0, 90.0)
        & F.col("start_station_longitude").between(-180.0, 180.0)
        & F.col("end_station_longitude").between(-180.0, 180.0)
    )


def add_trip_validity_column(df: DataFrame, col_name: str = "trip_is_valid") -> DataFrame:
    """
    Returns a new DataFrame with a boolean column named `col_name` indicating validity.
    Uses coalesce(..., False) so the column is always boolean (no NULLs).
    """
    expr = trip_is_valid_expr()
    return df.withColumn(col_name, F.coalesce(expr, F.lit(False)))
  1. tests/unit/test_validation_vs_domain.py
import pytest
from pyspark.sql import Row
from data_transformations.citibike.validation import add_trip_validity_column
from data_transformations.citibike.domain import Trip


def build_trip_row(start_lat, start_lon, end_lat, end_lon):
    return Row(
        start_station_latitude=start_lat,
        start_station_longitude=start_lon,
        end_station_latitude=end_lat,
        end_station_longitude=end_lon,
    )


@pytest.mark.parametrize(
    "start_lat,start_lon,end_lat,end_lon",
    [
        (40.0, -73.0, 40.001, -73.001),  # valid, small distance
        (40.0, -73.0, 40.0, -73.0),      # same point -> valid, distance 0
        (None, -73.0, 40.0, -73.0),      # missing start lat -> invalid
        (95.0, -73.0, 40.0, -73.0),      # start lat out of range -> invalid
        (40.0, -200.0, 40.0, -73.0),     # start lon out of range -> invalid
        (40.0, -73.0, None, None),       # missing end coords -> invalid
    ],
)
def test_spark_validation_matches_domain(spark, start_lat, start_lon, end_lat, end_lon):
    """
    For each sample row, compare the boolean result from the Spark expression
    with the Trip.is_valid() domain implementation.
    """
    rows = [build_trip_row(start_lat, start_lon, end_lat, end_lon)]
    df = spark.createDataFrame(rows)

    df_with_flag = add_trip_validity_column(df, col_name="trip_is_valid")
    collected = df_with_flag.collect()

    assert len(collected) == 1
    row = collected[0].asDict()

    # Domain-level validation using the Trip dataclass
    trip = Trip(
        start_station_latitude=row.get("start_station_latitude"),
        start_station_longitude=row.get("start_station_longitude"),
        end_station_latitude=row.get("end_station_latitude"),
        end_station_longitude=row.get("end_station_longitude"),
    )
    domain_valid = trip.is_valid()

    # Spark-side validation (coalesced to False by helper) should be boolean
    spark_valid = bool(row.get("trip_is_valid"))

    assert spark_valid == domain_valid, f"Mismatch for row {row}: spark={spark_valid}, domain={domain_valid}"

Notes / acceptance criteria

  • Tests should pass using the existing pytest spark fixture (tests/conftest.py) in the dev branch.
  • The new helper is implemented in a pure Spark expression (no Python UDFs).
  • The test asserts parity between the domain Trip.is_valid() and the Spark expression.

Please create the PR on branch 'dev'.

This pull request was created from Copilot chat.


💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

@abhiab54 abhiab54 marked this pull request as ready for review January 7, 2026 23:44
Co-authored-by: abhiab54 <65629795+abhiab54@users.noreply.github.com>
Copilot AI changed the title [WIP] Add Spark trip validation helper and unit test Add Spark trip validation helper and unit tests with domain model parity Jan 7, 2026
Copilot AI requested a review from abhiab54 January 7, 2026 23:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.