Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0658c3f
ALP: Add AlpConstants with encoding tables and format constants
sfc-gh-pgaur Mar 7, 2026
d656aaf
ALP: Add AlpEncoderDecoder with core encode/decode logic
sfc-gh-pgaur Mar 7, 2026
388d2a9
ALP: Add AlpCompression for single-vector compress/decompress
sfc-gh-pgaur Mar 7, 2026
9bb8a86
ALP: Add AlpSampler for sampling-based encoding preset generation
sfc-gh-pgaur Mar 7, 2026
60607ed
ALP: Add AlpWrapper for page-level encode/decode with 7-byte header
sfc-gh-pgaur Mar 7, 2026
a2c3992
ALP: Add incremental AlpValuesWriter for float and double columns
sfc-gh-pgaur Mar 7, 2026
27508d9
ALP: Add lazy AlpValuesReader for float and double columns
sfc-gh-pgaur Mar 7, 2026
1036d09
ALP: Add cross-implementation tests and fix encode/decode to match C++
sfc-gh-pgaur Mar 7, 2026
1d4e46d
ALP: Add encoding benchmark for float and double throughput
sfc-gh-pgaur Mar 7, 2026
1cb5ea5
ALP: Wire up Encoding.ALP in column Encoding enum and AlpValuesWriter
sfc-gh-pgaur Mar 7, 2026
0d7cd79
ALP: Apply spotless formatting fixes
sfc-gh-pgaur Mar 7, 2026
09ca686
ALP: Add cross-language interop test reading C++ ALP-encoded parquet …
sfc-gh-pgaur Mar 7, 2026
6d0443d
ALP: Add writer pipeline integration and bidirectional cross-language…
sfc-gh-pgaur Mar 7, 2026
00dbf38
ALP: Add float32 cross-language tests and generator support
sfc-gh-pgaur Mar 8, 2026
385d053
ALP: Add codec-level and pipeline throughput benchmarks
sfc-gh-pgaur Mar 8, 2026
abbe36d
ALP: Add pipeline integration, reader buffer reuse, and validation
sfc-gh-pgaur Mar 8, 2026
05996ba
ALP: Add encoding comparison benchmark (ALP vs ZSTD vs BSS+ZSTD)
sfc-gh-pgaur Mar 8, 2026
8f4cb19
ALP: Pre-allocate long[] encoded buffer for double vector decodes
sfc-gh-pgaur Mar 8, 2026
78cb02d
ALP: Use unpack32Values for long bit unpacking
sfc-gh-pgaur Mar 8, 2026
af73f5e
ALP: Inline decode with hoisted multipliers for double decompression
sfc-gh-pgaur Mar 9, 2026
9c1526b
ALP: Use real Spotify dataset in codec throughput benchmark
sfc-gh-pgaur Mar 9, 2026
a598134
ALP: Remove duplicate CSV test data from parquet-column
sfc-gh-pgaur Mar 9, 2026
5c3d7fa
ALP: Gzip compress CSV test data and regenerate float parquet files
sfc-gh-pgaur Mar 9, 2026
48d5e73
ALP: Use real Spotify dataset in encoding comparison benchmark
sfc-gh-pgaur Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ALP: Add lazy AlpValuesReader for float and double columns
Implements AlpValuesReader (abstract), AlpValuesReaderForFloat, and
AlpValuesReaderForDouble. Uses lazy per-vector decoding: initFromPage
reads only the header and offset array, vectors are decoded on first
access. skip() is O(1) with no decoding.
  • Loading branch information
sfc-gh-pgaur committed Mar 11, 2026
commit 27508d93bf59e23300f78172e753abdd41436ecc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.alp;

import static org.apache.parquet.column.values.alp.AlpConstants.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;

/**
* Abstract base class for ALP values readers with lazy per-vector decoding.
*
* <p>On {@link #initFromPage}, reads the 7-byte header and offset array but does NOT
* decode any vectors. Vectors are decoded on demand when values are accessed.
* {@link #skip()} is O(1) — it just advances the index.
*/
abstract class AlpValuesReader extends ValuesReader {

protected int vectorSize;
protected int totalCount;
protected int numVectors;
protected int currentIndex;
protected int decodedVectorIndex = -1;
protected int[] vectorOffsets;
protected byte[] rawData; // all data after header

@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
int available = (int) stream.available();
if (available < HEADER_SIZE) {
throw new ParquetDecodingException(
"ALP page too small for header: " + available + " bytes");
}

// Read header
byte[] headerBytes = new byte[HEADER_SIZE];
stream.read(headerBytes);
ByteBuffer header = ByteBuffer.wrap(headerBytes).order(ByteOrder.LITTLE_ENDIAN);

int compressionMode = header.get() & 0xFF;
int integerEncoding = header.get() & 0xFF;
int logVectorSize = header.get() & 0xFF;
totalCount = header.getInt();

if (compressionMode != COMPRESSION_MODE_ALP) {
throw new ParquetDecodingException(
"Unsupported ALP compression mode: " + compressionMode);
}
if (integerEncoding != INTEGER_ENCODING_FOR) {
throw new ParquetDecodingException(
"Unsupported ALP integer encoding: " + integerEncoding);
}

vectorSize = 1 << logVectorSize;
numVectors = (totalCount + vectorSize - 1) / vectorSize;
currentIndex = 0;
decodedVectorIndex = -1;

if (numVectors == 0) {
vectorOffsets = new int[0];
rawData = new byte[0];
return;
}

// Read remaining data (offsets + vectors)
int remaining = (int) stream.available();
rawData = new byte[remaining];
stream.read(rawData);

// Parse offsets from rawData
ByteBuffer body = ByteBuffer.wrap(rawData, 0, numVectors * OFFSET_SIZE)
.order(ByteOrder.LITTLE_ENDIAN);
vectorOffsets = new int[numVectors];
for (int i = 0; i < numVectors; i++) {
vectorOffsets[i] = body.getInt();
}
}

@Override
public void skip() {
currentIndex++;
}

@Override
public void skip(int n) {
currentIndex += n;
}

/** Number of elements in the given vector (last vector may be partial). */
protected int elementsInVector(int vectorIdx) {
if (vectorIdx < totalCount / vectorSize) {
return vectorSize;
}
int rem = totalCount % vectorSize;
return (rem == 0) ? vectorSize : rem;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.alp;

import org.apache.parquet.io.ParquetDecodingException;

/**
* ALP values reader for double columns with lazy per-vector decoding.
*/
public class AlpValuesReaderForDouble extends AlpValuesReader {

private double[] decodedBuffer;

@Override
public double readDouble() {
if (currentIndex >= totalCount) {
throw new ParquetDecodingException("ALP double reader exhausted at index " + currentIndex);
}
int vectorIdx = currentIndex / vectorSize;
int posInVector = currentIndex % vectorSize;
ensureVectorDecoded(vectorIdx);
currentIndex++;
return decodedBuffer[posInVector];
}

private void ensureVectorDecoded(int vectorIdx) {
if (vectorIdx == decodedVectorIndex) {
return;
}
int numElements = elementsInVector(vectorIdx);
int dataOffset = vectorOffsets[vectorIdx];
AlpCompression.DoubleCompressedVector cv =
AlpCompression.DoubleCompressedVector.load(rawData, dataOffset, numElements);
decodedBuffer = new double[numElements];
AlpCompression.decompressDoubleVector(cv, decodedBuffer);
decodedVectorIndex = vectorIdx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.alp;

import org.apache.parquet.io.ParquetDecodingException;

/**
* ALP values reader for float columns with lazy per-vector decoding.
*/
public class AlpValuesReaderForFloat extends AlpValuesReader {

private float[] decodedBuffer;

@Override
public float readFloat() {
if (currentIndex >= totalCount) {
throw new ParquetDecodingException("ALP float reader exhausted at index " + currentIndex);
}
int vectorIdx = currentIndex / vectorSize;
int posInVector = currentIndex % vectorSize;
ensureVectorDecoded(vectorIdx);
currentIndex++;
return decodedBuffer[posInVector];
}

private void ensureVectorDecoded(int vectorIdx) {
if (vectorIdx == decodedVectorIndex) {
return;
}
int numElements = elementsInVector(vectorIdx);
int dataOffset = vectorOffsets[vectorIdx];
AlpCompression.FloatCompressedVector cv =
AlpCompression.FloatCompressedVector.load(rawData, dataOffset, numElements);
decodedBuffer = new float[numElements];
AlpCompression.decompressFloatVector(cv, decodedBuffer);
decodedVectorIndex = vectorIdx;
}
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.