diff --git a/.github/environment.yml b/.github/environment.yml index 5a4905c7..76875d6b 100644 --- a/.github/environment.yml +++ b/.github/environment.yml @@ -9,4 +9,4 @@ dependencies: - pdal - pytest - meshio - - pandas + - geopandas diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 47180b8d..fc69ec07 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,11 +24,13 @@ jobs: fail-fast: true matrix: os: ['ubuntu-latest', 'macos-latest', 'windows-latest'] - python-version: ['3.9', '3.10', '3.11', '3.12'] - numpy-version: ['1.24', '2.0'] + python-version: ['3.10', '3.11', '3.12', '3.13'] + numpy-version: ['1.24', '2.1'] exclude: - python-version: '3.12' numpy-version: '1.24' + - python-version: '3.13' + numpy-version: '1.24' steps: - name: Check out python-pdal @@ -44,7 +46,7 @@ jobs: - name: Setup micromamba uses: conda-incubator/setup-miniconda@v3 with: - miniforge-variant: Mambaforge + miniforge-variant: Miniforge3 miniforge-version: latest python-version: ${{ matrix.python-version }} use-mamba: true @@ -72,5 +74,5 @@ jobs: export PDAL_DRIVER_PATH=$PDAL_PLUGIN_PATH:$PDAL_DRIVER_PATH python -m pdal pdal --drivers --debug - py.test -v test/ + python -m pytest -v test/ diff --git a/CMakeLists.txt b/CMakeLists.txt index e22cd1a1..61610cfb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.11.0) +cmake_minimum_required(VERSION 3.16.0) project(pdal-python VERSION ${SKBUILD_PROJECT_VERSION} DESCRIPTION "PDAL Python bindings" HOMEPAGE_URL "https://github.com/PDAL/Python") @@ -6,7 +6,6 @@ project(pdal-python VERSION ${SKBUILD_PROJECT_VERSION} set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) -set(CMAKE_BUILD_TYPE "Release") # Python-finding settings set(Python3_FIND_STRATEGY "LOCATION") @@ -25,7 +24,7 @@ endif() find_package(Python3 COMPONENTS Interpreter ${DEVELOPMENT_COMPONENT} NumPy REQUIRED) # find PDAL. Require 2.1+ -find_package(PDAL 2.6 REQUIRED) +find_package(PDAL 2.7 REQUIRED) # find PyBind11 find_package(pybind11 REQUIRED) diff --git a/README.rst b/README.rst index 0132ec84..cbab9ac1 100644 --- a/README.rst +++ b/README.rst @@ -20,6 +20,21 @@ PDAL Python support is installable via PyPI: pip install PDAL + +Developers can control many settings including debug builds and where the libraries are installed +using `scikit-build-core `_ settings: + +.. code-block:: + + python -m pip install \ + -Cbuild-dir=build \ + -e \ + . \ + --config-settings=cmake.build-type="Debug" \ + -vv \ + --no-deps \ + --no-build-isolation + GitHub ................................................................................ @@ -168,7 +183,7 @@ PDAL and Python: print(len(intensity)) # 704 points # Now use pdal to clamp points that have intensity 100 <= v < 300 - pipeline = pdal.Filter.range(limits="Intensity[100:300)").pipeline(intensity) + pipeline = pdal.Filter.expression(expression="Intensity >= 100 && Intensity < 300").pipeline(intensity) print(pipeline.execute()) # 387 points clamped = pipeline.arrays[0] @@ -193,6 +208,104 @@ PDAL and Python: with tiledb.open("clamped") as a: print(a.schema) +Reading using Numpy Arrays as buffers (advanced) +................................................................................ + +It's also possible to treat the Numpy arrays passed to PDAL as buffers that are iteratively populated through +custom python functions during the execution of the pipeline. + +This may be useful in cases where you want the reading of the input data to be handled in a streamable fashion, +like for example: + +* When the total Numpy array data wouldn't fit into memory. +* To initiate execution of a streamable PDAL pipeline while the input data is still being read. + +To enable this mode, you just need to include the python populate function along with each corresponding Numpy array. + +.. code-block:: python + + # Numpy array to be used as buffer + in_buffer = np.zeros(max_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)]) + + # The function to populate the buffer iteratively + def load_next_chunk() -> int: + """ + Function called by PDAL before reading the data from the buffer. + + IMPORTANT: must return the total number of items to be read from the buffer. + The Pipeline execution will keep calling this function in a loop until 0 is returned. + """ + # + # Replace here with your code that populates the buffer and returns the number of elements to read + # + chunk_size = next_chunk.size + in_buffer[:chunk_size]["X"] = next_chunk[:]["X"] + in_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"] + in_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"] + + return chunk_size + + # Configure input array and handler during Pipeline initialization... + p = pdal.Pipeline(pipeline_json, arrays=[in_buffer], stream_handlers=[load_next_chunk]) + + # ...alternatively you can use the setter on an existing Pipeline + # p.inputs = [(in_buffer, load_next_chunk)] + +The following snippet provides a simple example of how to use a Numpy array as buffer to support writing through PDAL +with total control over the maximum amount of memory to use. + +.. raw:: html + +
+ Example: Streaming the read and write of a very large LAZ file with low memory footprint + +.. code-block:: python + + import numpy as np + import pdal + + in_chunk_size = 10_000_000 + in_pipeline = pdal.Reader.las(**{ + "filename": "in_test.laz" + }).pipeline() + + in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__() + + out_chunk_size = 50_000_000 + out_file = "out_test.laz" + out_pipeline = pdal.Writer.las( + filename=out_file + ).pipeline() + + out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)]) + + def load_next_chunk(): + try: + next_chunk = next(in_pipeline_it) + except StopIteration: + # Stops the streaming + return 0 + + chunk_size = next_chunk.size + out_buffer[:chunk_size]["X"] = next_chunk[:]["X"] + out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"] + out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"] + + print(f"Loaded next chunk -> {chunk_size}") + + return chunk_size + + out_pipeline.inputs = [(out_buffer, load_next_chunk)] + + out_pipeline.loglevel = 20 # INFO + count = out_pipeline.execute_streaming(out_chunk_size) + + print(f"\nWROTE - {count}") + +.. raw:: html + +
+ Executing Streamable Pipelines ................................................................................ Streamable pipelines (pipelines that consist exclusively of streamable PDAL @@ -203,7 +316,7 @@ returns an iterator object that yields Numpy arrays of up to ``chunk_size`` size .. code-block:: python import pdal - pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.range(limits="Intensity[80:120)") + pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.expression(expression="Intensity > 80 && Intensity < 120)") for array in pipeline.iterator(chunk_size=500): print(len(array)) # or to concatenate all arrays into one diff --git a/src/pdal/PyArray.cpp b/src/pdal/PyArray.cpp index 55f87b5d..62b4875a 100644 --- a/src/pdal/PyArray.cpp +++ b/src/pdal/PyArray.cpp @@ -34,7 +34,6 @@ #include "PyArray.hpp" #include -#include namespace pdal { @@ -78,7 +77,7 @@ Dimension::Type pdalType(int t) return Type::None; } -std::string toString(PyObject *pname) +std::string pyObjectToString(PyObject *pname) { PyObject* r = PyObject_Str(pname); if (!r) @@ -92,9 +91,11 @@ std::string toString(PyObject *pname) #if NPY_ABI_VERSION < 0x02000000 #define PyDataType_FIELDS(descr) ((descr)->fields) + #define PyDataType_NAMES(descr) ((descr)->names) #endif -Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true) +Array::Array(PyArrayObject* array, std::shared_ptr stream_handler) + : m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler)) { Py_XINCREF(array); @@ -124,7 +125,7 @@ Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true) for (int i = 0; i < numFields; ++i) { - std::string name = toString(PyList_GetItem(names, i)); + std::string name = python::pyObjectToString(PyList_GetItem(names, i)); if (name == "X") xyz |= 1; else if (name == "Y") @@ -163,40 +164,77 @@ Array::~Array() Py_XDECREF(m_array); } - -ArrayIter& Array::iterator() +std::shared_ptr Array::iterator() { - ArrayIter *it = new ArrayIter(m_array); - m_iterators.emplace_back((it)); - return *it; + return std::make_shared(m_array, m_stream_handler); } - -ArrayIter::ArrayIter(PyArrayObject* np_array) +ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr stream_handler) + : m_stream_handler(std::move(stream_handler)) { + // Create iterator m_iter = NpyIter_New(np_array, - NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK, - NPY_KEEPORDER, NPY_NO_CASTING, NULL); + NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK, + NPY_KEEPORDER, NPY_NO_CASTING, NULL); if (!m_iter) throw pdal_error("Unable to create numpy iterator."); + initIterator(); +} + +void ArrayIter::initIterator() +{ + // For a stream handler, first execute it to get the buffer populated and know the size of the data to iterate + int64_t stream_chunk_size = 0; + if (m_stream_handler) { + stream_chunk_size = (*m_stream_handler)(); + if (!stream_chunk_size) { + m_done = true; + return; + } + } + + // Initialize the iterator function char *itererr; m_iterNext = NpyIter_GetIterNext(m_iter, &itererr); if (!m_iterNext) { NpyIter_Deallocate(m_iter); - throw pdal_error(std::string("Unable to create numpy iterator: ") + - itererr); + m_iter = nullptr; + throw pdal_error(std::string("Unable to retrieve iteration function from numpy iterator: ") + itererr); } m_data = NpyIter_GetDataPtrArray(m_iter); - m_stride = NpyIter_GetInnerStrideArray(m_iter); - m_size = NpyIter_GetInnerLoopSizePtr(m_iter); + m_stride = *NpyIter_GetInnerStrideArray(m_iter); + m_size = *NpyIter_GetInnerLoopSizePtr(m_iter); + if (stream_chunk_size) { + // Ensure chunk size is valid and then limit iteration accordingly + if (0 < stream_chunk_size && stream_chunk_size <= m_size) { + m_size = stream_chunk_size; + } else { + throw pdal_error(std::string("Stream chunk size not in the range of array length: ") + + std::to_string(stream_chunk_size)); + } + } m_done = false; } +void ArrayIter::resetIterator() +{ + // Reset the iterator to the initial state + if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) { + NpyIter_Deallocate(m_iter); + m_iter = nullptr; + throw pdal_error("Unable to reset numpy iterator."); + } + + initIterator(); +} + ArrayIter::~ArrayIter() { - NpyIter_Deallocate(m_iter); + if (m_iter != nullptr) { + NpyIter_Deallocate(m_iter); + } } ArrayIter& ArrayIter::operator++() @@ -204,10 +242,15 @@ ArrayIter& ArrayIter::operator++() if (m_done) return *this; - if (--(*m_size)) - *m_data += *m_stride; - else if (!m_iterNext(m_iter)) - m_done = true; + if (--m_size) { + *m_data += m_stride; + } else if (!m_iterNext(m_iter)) { + if (m_stream_handler) { + resetIterator(); + } else { + m_done = true; + } + } return *this; } diff --git a/src/pdal/PyArray.hpp b/src/pdal/PyArray.hpp index e2da961f..b2aca844 100644 --- a/src/pdal/PyArray.hpp +++ b/src/pdal/PyArray.hpp @@ -34,6 +34,7 @@ #pragma once +#include "export.hpp" #include #define NPY_TARGET_VERSION NPY_1_22_API_VERSION @@ -55,16 +56,18 @@ namespace pdal namespace python { + class ArrayIter; +using ArrayStreamHandler = std::function; -class PDAL_DLL Array +class PDAL_EXPORT Array { public: using Shape = std::array; using Fields = std::vector; - Array(PyArrayObject* array); + Array(PyArrayObject* array, std::shared_ptr stream_handler = {}); ~Array(); Array(Array&& a) = default; @@ -76,24 +79,24 @@ class PDAL_DLL Array bool rowMajor() const { return m_rowMajor; }; Shape shape() const { return m_shape; } const Fields& fields() const { return m_fields; }; - ArrayIter& iterator(); + std::shared_ptr iterator(); private: PyArrayObject* m_array; Fields m_fields; bool m_rowMajor; Shape m_shape {}; - std::vector> m_iterators; + std::shared_ptr m_stream_handler; }; -class ArrayIter +class PDAL_EXPORT ArrayIter { public: ArrayIter(const ArrayIter&) = delete; ArrayIter() = delete; - ArrayIter(PyArrayObject*); + ArrayIter(PyArrayObject*, std::shared_ptr); ~ArrayIter(); ArrayIter& operator++(); @@ -101,12 +104,16 @@ class ArrayIter char* operator*() const { return *m_data; } private: - NpyIter *m_iter; + NpyIter *m_iter = nullptr; NpyIter_IterNextFunc *m_iterNext; char **m_data; - npy_intp *m_size; - npy_intp *m_stride; + npy_intp m_size; + npy_intp m_stride; bool m_done; + + std::shared_ptr m_stream_handler; + void initIterator(); + void resetIterator(); }; } // namespace python diff --git a/src/pdal/PyPipeline.cpp b/src/pdal/PyPipeline.cpp index 85ea028a..7f295273 100644 --- a/src/pdal/PyPipeline.cpp +++ b/src/pdal/PyPipeline.cpp @@ -73,8 +73,13 @@ PipelineExecutor::PipelineExecutor( } -point_count_t PipelineExecutor::execute() +point_count_t PipelineExecutor::execute(pdal::StringList allowedDims) { + if (allowedDims.size()) + { + m_manager.pointTable().layout()->setAllowedDims(allowedDims); + } + point_count_t count = m_manager.execute(); m_executed = true; return count; @@ -92,9 +97,14 @@ std::string PipelineExecutor::getSrsWKT2() const return output; } -point_count_t PipelineExecutor::executeStream(point_count_t streamLimit) +point_count_t PipelineExecutor::executeStream(point_count_t streamLimit, + pdal::StringList allowedDims) { CountPointTable table(streamLimit); + if (allowedDims.size()) + { + pointTable().layout()->setAllowedDims(allowedDims); + } m_manager.executeStream(table); m_executed = true; return table.count(); @@ -175,6 +185,12 @@ MetadataNode computePreview(Stage* stage) } if (dims.size()) summary.add("dimensions", dims); + + if (!qi.m_metadata.empty() && qi.m_metadata.valid()) + { + summary.add(qi.m_metadata.clone("metadata")); + } + pdal::Utils::toJSON(summary, strm); return summary; @@ -235,14 +251,20 @@ void PipelineExecutor::addArrayReaders(std::vector> array for (auto f : array->fields()) r.pushField(f); - ArrayIter& iter = array->iterator(); - auto incrementer = [&iter](PointId id) -> char * + auto arrayIter = array->iterator(); + auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char * { - if (! iter) + ArrayIter& iter = *arrayIter; + if (!firstPoint && iter) { + ++iter; + } else { + firstPoint = false; + } + + if (!iter) return nullptr; char *c = *iter; - ++iter; return c; }; @@ -272,6 +294,7 @@ PyObject* buildNumpyDescriptor(PointLayoutPtr layout) { return layout->dimOffset(id1) < layout->dimOffset(id2); }; + auto dims = layout->dims(); std::sort(dims.begin(), dims.end(), sortByOffset); diff --git a/src/pdal/PyPipeline.hpp b/src/pdal/PyPipeline.hpp index c32abcfe..1eed023f 100644 --- a/src/pdal/PyPipeline.hpp +++ b/src/pdal/PyPipeline.hpp @@ -34,6 +34,7 @@ #pragma once +#include "export.hpp" #include #define NPY_TARGET_VERSION NPY_1_22_API_VERSION @@ -55,13 +56,13 @@ PyArrayObject* meshToNumpyArray(const TriangularMesh* mesh); class Array; -class PDAL_DLL PipelineExecutor { +class PDAL_EXPORT PipelineExecutor { public: PipelineExecutor(std::string const& json, std::vector> arrays, int level); virtual ~PipelineExecutor() = default; - point_count_t execute(); - point_count_t executeStream(point_count_t streamLimit); + point_count_t execute(pdal::StringList allowedDims); + point_count_t executeStream(point_count_t streamLimit, pdal::StringList allowedDims); const PointViewSet& views() const; std::string getPipeline() const; diff --git a/src/pdal/StreamableExecutor.cpp b/src/pdal/StreamableExecutor.cpp index 9f5b4b8b..5fa01931 100644 --- a/src/pdal/StreamableExecutor.cpp +++ b/src/pdal/StreamableExecutor.cpp @@ -187,11 +187,17 @@ StreamableExecutor::StreamableExecutor(std::string const& json, std::vector> arrays, int level, point_count_t chunkSize, - int prefetch) + int prefetch, + pdal::StringList allowedDims) : PipelineExecutor(json, arrays, level) , m_table(chunkSize, prefetch) , m_exc(nullptr) { + + if (allowedDims.size()) + { + m_table.layout()->setAllowedDims(allowedDims); + } m_thread.reset(new std::thread([this]() { try { diff --git a/src/pdal/StreamableExecutor.hpp b/src/pdal/StreamableExecutor.hpp index 45d015c3..f565c8ee 100644 --- a/src/pdal/StreamableExecutor.hpp +++ b/src/pdal/StreamableExecutor.hpp @@ -81,7 +81,8 @@ class StreamableExecutor : public PipelineExecutor std::vector> arrays, int level, point_count_t chunkSize, - int prefetch); + int prefetch, + pdal::StringList allowedDim); ~StreamableExecutor(); MetadataNode getMetadata() { return m_table.metadata(); } diff --git a/src/pdal/__init__.py b/src/pdal/__init__.py index a312f568..f311ba7e 100644 --- a/src/pdal/__init__.py +++ b/src/pdal/__init__.py @@ -1,5 +1,5 @@ __all__ = ["Pipeline", "Stage", "Reader", "Filter", "Writer", "dimensions", "info"] -__version__ = '3.4.5' +__version__ = '3.5.0' from . import libpdalpython from .drivers import inject_pdal_drivers diff --git a/src/pdal/export.hpp b/src/pdal/export.hpp new file mode 100644 index 00000000..5a6c9aea --- /dev/null +++ b/src/pdal/export.hpp @@ -0,0 +1,44 @@ +/****************************************************************************** +* Copyright (c) 2025, Hobu Inc. (info@hobu.co) +* +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following +* conditions are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in +* the documentation and/or other materials provided +* with the distribution. +* * Neither the name of Hobu, Inc. nor the +* names of its contributors may be used to endorse or promote +* products derived from this software without specific prior +* written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY +* OF SUCH DAMAGE. +****************************************************************************/ + + +#include + +#ifndef PDAL_EXPORT +# define PDAL_EXPORT PDAL_DLL +#endif + +#ifndef PDAL_DLL +# define PDAL_DLL PDAL_EXPORT +#endif diff --git a/src/pdal/libpdalpython.cpp b/src/pdal/libpdalpython.cpp index 229f928d..09118fbf 100644 --- a/src/pdal/libpdalpython.cpp +++ b/src/pdal/libpdalpython.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -165,36 +166,46 @@ namespace pdal { class Pipeline { public: - point_count_t execute() { + point_count_t execute(pdal::StringList allowedDims) { point_count_t response(0); { py::gil_scoped_release release; - response = getExecutor()->execute(); + response = getExecutor()->execute(allowedDims); } return response; - } - point_count_t executeStream(point_count_t streamLimit) { + point_count_t executeStream(point_count_t streamLimit, pdal::StringList allowedDims) { point_count_t response(0); { py::gil_scoped_release release; - response = getExecutor()->executeStream(streamLimit); + response = getExecutor()->executeStream(streamLimit, allowedDims); } return response; } - std::unique_ptr iterator(int chunk_size, int prefetch) { + std::unique_ptr iterator(int chunk_size, int prefetch, pdal::StringList allowedDims) { return std::unique_ptr(new PipelineIterator( - getJson(), _inputs, _loglevel, chunk_size, prefetch + getJson(), _inputs, _loglevel, chunk_size, prefetch, allowedDims )); } - void setInputs(std::vector ndarrays) { + void setInputs(const std::vector& inputs) { _inputs.clear(); - for (const auto& ndarray: ndarrays) { - PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr(); - _inputs.push_back(std::make_shared(ndarray_ptr)); + for (const auto& input_obj: inputs) { + if (py::isinstance(input_obj)) { + // Backward compatibility for accepting list of numpy arrays + auto ndarray = input_obj.cast(); + _inputs.push_back(std::make_shared((PyArrayObject*)ndarray.ptr())); + } else { + // Now expected to be a list of pairs: (numpy array, stream handler) + auto input = input_obj.cast>(); + _inputs.push_back(std::make_shared( + (PyArrayObject*)input.first.ptr(), + input.second ? + std::make_shared(input.second) + : nullptr)); + } } delExecutor(); } @@ -308,9 +319,9 @@ namespace pdal { py::class_(m, "Pipeline") .def(py::init<>()) - .def("execute", &Pipeline::execute) - .def("execute_streaming", &Pipeline::executeStream, "chunk_size"_a=10000) - .def("iterator", &Pipeline::iterator, "chunk_size"_a=10000, "prefetch"_a=0) + .def("execute", &Pipeline::execute, py::arg("allowed_dims") =py::list()) + .def("execute_streaming", &Pipeline::executeStream, "chunk_size"_a=10000, py::arg("allowed_dims") =py::list()) + .def("iterator", &Pipeline::iterator, "chunk_size"_a=10000, "prefetch"_a=0, py::arg("allowed_dims") =py::list()) .def_property("inputs", nullptr, &Pipeline::setInputs) .def_property("loglevel", &Pipeline::getLoglevel, &Pipeline::setLogLevel) .def_property_readonly("log", &Pipeline::getLog) @@ -333,10 +344,10 @@ namespace pdal { m.def("infer_writer_driver", &getWriterDriver); if (pdal::Config::versionMajor() < 2) - throw pybind11::import_error("PDAL version must be >= 2.6"); + throw pybind11::import_error("PDAL version must be >= 2.7"); - if (pdal::Config::versionMajor() == 2 && pdal::Config::versionMinor() < 6) - throw pybind11::import_error("PDAL version must be >= 2.6"); + if (pdal::Config::versionMajor() == 2 && pdal::Config::versionMinor() < 7) + throw pybind11::import_error("PDAL version must be >= 2.7"); }; }; // namespace pdal diff --git a/src/pdal/pipeline.py b/src/pdal/pipeline.py index c13a6d2c..60a181c0 100644 --- a/src/pdal/pipeline.py +++ b/src/pdal/pipeline.py @@ -2,7 +2,7 @@ import json import logging -from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast +from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable import numpy as np import pathlib @@ -17,6 +17,11 @@ except ModuleNotFoundError: # pragma: no cover DataFrame = None +try: + from geopandas import GeoDataFrame, points_from_xy +except ModuleNotFoundError: # pragma: no cover + GeoDataFrame = points_from_xy = None + from . import drivers, libpdalpython LogLevelToPDAL = { @@ -36,6 +41,7 @@ def __init__( loglevel: int = logging.ERROR, json: Optional[str] = None, dataframes: Sequence[DataFrame] = (), + stream_handlers: Sequence[Callable[[], int]] = (), ): if json: @@ -45,7 +51,7 @@ def __init__( # Convert our data frames to Numpy Structured Arrays if dataframes: - arrays = [df.to_records() for df in dataframes] + arrays = [df.to_records() if not "geometry" in df.columns else df.drop(columns=["geometry"]).to_records() for df in dataframes] super().__init__() self._stages: List[Stage] = [] @@ -53,7 +59,14 @@ def __init__( stages = _parse_stages(spec) if isinstance(spec, str) else spec for stage in stages: self |= stage - self.inputs = arrays + + if stream_handlers: + if len(stream_handlers) != len(arrays): + raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes") + self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)] + else: + self.inputs = [(a, None) for a in arrays] + self.loglevel = loglevel def __getstate__(self): @@ -124,13 +137,26 @@ def get_meshio(self, idx: int) -> Optional[Mesh]: [("triangle", np.stack((mesh["A"], mesh["B"], mesh["C"]), 1))], ) - def get_dataframe(self, idx: int) -> Optional[DataFrame]: if DataFrame is None: raise RuntimeError("Pandas support requires Pandas to be installed") return DataFrame(self.arrays[idx]) + def get_geodataframe(self, idx: int, xyz: bool=False, crs: Any=None) -> Optional[GeoDataFrame]: + if GeoDataFrame is None: + raise RuntimeError("GeoPandas support requires GeoPandas to be installed") + df = DataFrame(self.arrays[idx]) + coords = [df["X"], df["Y"], df["Z"]] if xyz else [df["X"], df["Y"]] + geometry = points_from_xy(*coords) + gdf = GeoDataFrame( + df, + geometry=geometry, + crs=crs, + ) + df = coords = geometry = None + return gdf + def _get_json(self) -> str: return self.toJSON() diff --git a/test/data/simple.laz b/test/data/simple.laz new file mode 100644 index 00000000..6f774c5b Binary files /dev/null and b/test/data/simple.laz differ diff --git a/test/test_pipeline.py b/test/test_pipeline.py index 5a40b58e..317c5b69 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -3,6 +3,8 @@ import os import sys +from typing import List +from itertools import product import numpy as np import pytest @@ -82,6 +84,17 @@ def test_execute_streaming(self, filename): count2 = r.execute_streaming(chunk_size=100) assert count == count2 + + @pytest.mark.parametrize("filename", ["range.json", "range.py"]) + def test_subsetstreaming(self, filename): + """Can we fetch a subset of PDAL dimensions as a numpy array while streaming""" + r = get_pipeline(filename) + limit = ['X','Y','Z','Intensity'] + arrays = list(r.iterator(chunk_size=100,allowed_dims=limit)) + assert len(arrays) == 11 + assert len(arrays[0].dtype) == 4 + + @pytest.mark.parametrize("filename", ["sort.json", "sort.py"]) def test_execute_streaming_non_streamable(self, filename): r = get_pipeline(filename) @@ -113,6 +126,18 @@ def test_array(self, filename): assert a[0][0] == 635619.85 assert a[1064][2] == 456.92 + @pytest.mark.parametrize("filename", ["sort.json", "sort.py"]) + def test_subsetarray(self, filename): + """Can we fetch a subset of PDAL dimensions as a numpy array""" + r = get_pipeline(filename) + limit = ['X','Y','Z'] + r.execute(allowed_dims=limit) + arrays = r.arrays + assert len(arrays) == 1 + assert len(arrays[0].dtype) == 3 + + + @pytest.mark.parametrize("filename", ["sort.json", "sort.py"]) def test_metadata(self, filename): """Can we fetch PDAL metadata""" @@ -371,6 +396,15 @@ def test_quickinfo(self): assert 'readers.las' in info.keys() assert info['readers.las']['num_points'] == 1065 + def test_quickinfo_offsets_scales(self): + r = pdal.Reader(os.path.join(DATADIRECTORY,"simple.laz")) + p = r.pipeline() + info = p.quickinfo + assert 'readers.las' in info.keys() + assert 'offset_x' in info['readers.las']['metadata'].keys() + assert 'scale_x' in info['readers.las']['metadata'].keys() + assert info['readers.las']['num_points'] == 1065 + def test_jsonkwarg(self): pipeline = pdal.Reader(os.path.join(DATADIRECTORY,"autzen-utm.las")).pipeline().toJSON() r = pdal.Pipeline(json=pipeline) @@ -541,6 +575,65 @@ def test_load(self): assert data["Intensity"].sum() == 57684 +class TestGeoDataFrame: + + @pytest.mark.skipif( + not pdal.pipeline.GeoDataFrame, + reason="geopandas is not available", + ) + def test_fetch(self): + r = pdal.Reader(os.path.join(DATADIRECTORY,"autzen-utm.las")) + p = r.pipeline() + p.execute() + record_count = p.arrays[0].shape[0] + dimension_count = len(p.arrays[0].dtype) + gdf = p.get_geodataframe(0) + gdf_xyz = p.get_geodataframe(0, xyz=True) + gdf_crs = p.get_geodataframe(0, crs="EPSG:4326") + assert len(gdf) == record_count + assert len(gdf.columns) == dimension_count + 1 + assert isinstance(gdf, pdal.pipeline.GeoDataFrame) + assert gdf.geometry.is_valid.all() + assert not gdf.geometry.is_empty.any() + assert gdf.crs is None + assert gdf.geometry.z.isna().all() + assert not gdf_xyz.geometry.z.isna().any() + assert gdf_crs.crs.srs == "EPSG:4326" + + @pytest.mark.skipif( + not pdal.pipeline.GeoDataFrame, + reason="geopandas is not available", + ) + def test_load(self): + r = pdal.Reader(os.path.join(DATADIRECTORY,"autzen-utm.las")) + p = r.pipeline() + p.execute() + data = p.arrays[0] + gdf = pdal.pipeline.GeoDataFrame( + data, + geometry=pdal.pipeline.points_from_xy(data["X"], data["Y"], data["Z"]) + ) + dataframes = [gdf, gdf, gdf] + filter_intensity = """{ + "pipeline":[ + { + "type":"filters.range", + "limits":"Intensity[100:300)" + } + ] + }""" + p = pdal.Pipeline(filter_intensity, dataframes = dataframes) + p.execute() + arrays = p.arrays + assert len(arrays) == 3 + + # We copied the array three times. Sum the Intensity values + # post filtering to see if we had our intended effect + for data in arrays: + assert len(data) == 387 + assert data["Intensity"].sum() == 57684 + + class TestPipelineIterator: @pytest.mark.parametrize("filename", ["sort.json", "sort.py"]) def test_non_streamable(self, filename): @@ -646,3 +739,149 @@ def test_multiple_iterators(self, filename): np.testing.assert_array_equal(a1, a2) assert next(it1, None) is None assert next(it2, None) is None + + +def gen_chunk(count, random_seed = 12345): + rng = np.random.RandomState(count*random_seed) + # Generate dummy data + result = np.zeros(count, dtype=[("X", float), ("Y", float), ("Z", float)]) + result['X'][:] = rng.uniform(-2, -1, count) + result['Y'][:] = rng.uniform(1, 2, count) + result['Z'][:] = rng.uniform(3, 4, count) + return result + + +class TestPipelineInputStreams(): + + # Test cases + ONE_ARRAY_FULL = [[gen_chunk(1234)]] + MULTI_ARRAYS_FULL = [*ONE_ARRAY_FULL, [gen_chunk(4321)]] + + ONE_ARRAY_STREAMED = [[gen_chunk(10), gen_chunk(7), gen_chunk(3), gen_chunk(5), gen_chunk(1)]] + MULTI_ARRAYS_STREAMED = [*ONE_ARRAY_STREAMED, [gen_chunk(5), gen_chunk(2), gen_chunk(3), gen_chunk(1)]] + + MULTI_ARRAYS_MIXED = [ + *MULTI_ARRAYS_STREAMED, + *MULTI_ARRAYS_FULL + ] + + @pytest.mark.parametrize("in_arrays_chunks, use_setter", [ + (arrays_chunks, use_setter) for arrays_chunks, use_setter in product([ + ONE_ARRAY_FULL, MULTI_ARRAYS_FULL, + ONE_ARRAY_STREAMED, MULTI_ARRAYS_STREAMED, + MULTI_ARRAYS_MIXED + ], ['False', 'True']) + ]) + def test_pipeline_run(self, in_arrays_chunks, use_setter): + """ + Test case to validate possible usages: + - Combining "full" arrays and "streamed" ones + - Setting input arrays through the Pipeline constructor or the setter + """ + # Assuming stream mode for lists that contain more than one chunk. + # And that first chunk is the biggest of all, to simplify input buffer size creation. + in_arrays = [ + np.zeros(chunks[0].shape, chunks[0].dtype) if len(chunks) > 1 else chunks[0] + for chunks in in_arrays_chunks + ] + + def get_stream_handler(in_array, in_array_chunks): + in_array_chunks_it = iter(in_array_chunks) + def load_next_chunk(): + try: + next_chunk = next(in_array_chunks_it) + except StopIteration: + return 0 + + chunk_size = next_chunk.size + in_array[:chunk_size]["X"] = next_chunk[:]["X"] + in_array[:chunk_size]["Y"] = next_chunk[:]["Y"] + in_array[:chunk_size]["Z"] = next_chunk[:]["Z"] + + return chunk_size + + return load_next_chunk + + stream_handlers = [ + get_stream_handler(arr, chunks) if len(chunks) > 1 else None + for arr, chunks in zip(in_arrays, in_arrays_chunks) + ] + + expected_count = sum([sum([len(c) for c in chunks]) for chunks in in_arrays_chunks]) + + pipeline = """ + { + "pipeline": [{ + "type": "filters.stats" + }] + } + """ + if use_setter: + p = pdal.Pipeline(pipeline) + p.inputs = [(a, h) for a, h in zip(in_arrays, stream_handlers)] + else: + p = pdal.Pipeline(pipeline, arrays=in_arrays, stream_handlers=stream_handlers) + + count = p.execute() + out_arrays = p.arrays + assert count == expected_count + assert len(out_arrays) == len(in_arrays) + + for in_array_chunks, out_array in zip(in_arrays_chunks, out_arrays): + np.testing.assert_array_equal(out_array, np.concatenate(in_array_chunks)) + + @pytest.mark.parametrize("in_arrays, use_setter", [ + (arrays, use_setter) for arrays, use_setter in product([ + [c[0] for c in ONE_ARRAY_FULL], + [c[0] for c in MULTI_ARRAYS_FULL] + ], ['False', 'True']) + ]) + def test_pipeline_run_backward_compat(self, in_arrays, use_setter: bool): + expected_count = sum([len(a) for a in in_arrays]) + + pipeline = """ + { + "pipeline": [{ + "type": "filters.stats" + }] + } + """ + if use_setter: + p = pdal.Pipeline(pipeline) + p.inputs = in_arrays + else: + p = pdal.Pipeline(pipeline, arrays=in_arrays) + + count = p.execute() + out_arrays = p.arrays + assert count == expected_count + assert len(out_arrays) == len(in_arrays) + + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(out_array, in_array) + + @pytest.mark.parametrize("in_array, invalid_chunk_size", [ + (in_array, invalid_chunk_size) for in_array, invalid_chunk_size in product( + [gen_chunk(1234)], + [-1, 12345]) + ]) + def test_pipeline_fail_with_invalid_chunk_size(self, in_array, invalid_chunk_size): + """ + Ensure execution fails when using an invalid stream handler: + - One that returns a negative chunk size + - One that returns a chunk size bigger than the buffer capacity + """ + was_called = False + def invalid_stream_handler(): + nonlocal was_called + if was_called: + # avoid infinite loop + raise ValueError("Invalid handler should not have been called a second time") + was_called = True + return invalid_chunk_size + + p = pdal.Pipeline(arrays=[in_array], stream_handlers=[invalid_stream_handler]) + with pytest.raises(RuntimeError, + match=f"Stream chunk size not in the range of array length: {invalid_chunk_size}"): + p.execute() +