Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
176 commits
Select commit Hold shift + click to select a range
b0a93f3
feat: add parquet source specific configs
Meghajit Feb 16, 2022
d27ce82
feat: extract enums into their own file
Meghajit Feb 17, 2022
099287c
feat: create a KafkaSource factory
Meghajit Feb 17, 2022
3419983
feat: create a common interface for all serializers
Meghajit Feb 17, 2022
2cba26a
feat: create a ParquetFileSource and its builder
Meghajit Feb 17, 2022
480603c
feat: add ParquetFileSourceFactory
Meghajit Feb 17, 2022
0e6a3bc
feat: add SourceFactory to create sources
Meghajit Feb 17, 2022
02867bd
feat: create skeleton for a basic parquet file reader
Meghajit Feb 18, 2022
c264de7
feat: create skeleton for two split assigners
Meghajit Feb 18, 2022
e9d0d4c
feat: create skeleton for ParquetFileRecordFormat
Meghajit Feb 18, 2022
56315b1
feat: add more methods in ParquetFileSourceFactory
Meghajit Feb 18, 2022
c82d0be
feat: add runtime dependency for parquet-column into dagger-common
Meghajit Feb 21, 2022
0ed5f41
feat: add an interface for parquet data type parser
Meghajit Feb 21, 2022
1a6f12d
feat: implement parquet parsers for some primitive data types
Meghajit Feb 21, 2022
d9ae93d
feat: handle null args for ParquetDataTypeParser
Meghajit Feb 22, 2022
7690b89
feat: implement parquet parser for float
Meghajit Feb 22, 2022
189fb7a
feat: implement parquet parsers for binary string
Meghajit Feb 22, 2022
b83aba3
feat: implement parquet parsers for double primitive type
Meghajit Feb 22, 2022
1bd9598
feat: implement parquet parsers for enum
Meghajit Feb 22, 2022
e73d736
feat: implement parquet parser for timestamp
Meghajit Feb 24, 2022
ab33372
feat: add check for missing logical type annotation
Meghajit Feb 24, 2022
d99e1ea
feat: add validation for missing logical type annotation
Meghajit Feb 24, 2022
b2fd657
feat: add a validation factory for parquet schema checks
Meghajit Feb 24, 2022
a3223a2
feat: add more checks to SimpleGroupValidation
Meghajit Feb 25, 2022
dc45553
feat: apply validations to ParquetBoolean parser
Meghajit Feb 28, 2022
2f96b4c
feat: apply validations in ParquetTimestampParser
Meghajit Feb 28, 2022
374f98c
feat: apply validations in ParquetInt64Parser
Meghajit Feb 28, 2022
c7d4ea7
feat: apply validations in ParquetInt32Parser
Meghajit Feb 28, 2022
cd2a143
feat: apply validations in ParquetFloatParser
Meghajit Feb 28, 2022
5b52214
feat: apply validations in ParquetDoubleParser
Meghajit Feb 28, 2022
3cc0828
feat: apply validations in ParquetEnumParser
Meghajit Feb 28, 2022
359cbef
feat: apply validations in ParquetStringParser
Meghajit Feb 28, 2022
bfde8a4
feat: implement parquet data type identifier
Meghajit Feb 28, 2022
45f9789
feat: add canHandle as an abstract method in ParquetDataTypeParser
Meghajit Feb 28, 2022
bc829d3
feat: change signature of ParquetDataTypeParser.getValueOrDefault()
Meghajit Feb 28, 2022
fde86ac
bugfix: use Objects.equals instead of ==`
Meghajit Feb 28, 2022
55ce9fc
refactor: opt to use safer Objects.equals
Meghajit Feb 28, 2022
1a243a5
feat: mark constructor as public for ParquetDataTypeID
Meghajit Mar 3, 2022
2e073bc
feat: add factory method for producing ParquetDataTypeParser
Meghajit Mar 3, 2022
8ccb516
feat: add method to deserialize from simple group in BooleanPrimitive…
Meghajit Mar 4, 2022
e82bea0
feat: add method to deserialize from simple group in DoublePrimitiveT…
Meghajit Mar 4, 2022
f70ee38
feat: add method to deserialize from simple group in FloatPrimitiveTy…
Meghajit Mar 4, 2022
a490dbd
feat: add method to deserialize from simple group in IntegerPrimitive…
Meghajit Mar 4, 2022
0c43843
feat: add method to deserialize from simple group in LongPrimitiveTyp…
Meghajit Mar 4, 2022
9b89593
feat: add method to deserialize from simple group in StringPrimitiveT…
Meghajit Mar 4, 2022
a2a8c2c
feat: add method to deserialize from simple group in ByteStringPrimit…
Meghajit Mar 4, 2022
a85b07e
feat: rename interface method and its usages
Meghajit Mar 6, 2022
17a7284
feat: add support for transforming simple group in EnumProtoHandler
Meghajit Mar 6, 2022
1f32b54
feat: add support for transforming simple group in TimestampProtoHandler
Meghajit Mar 6, 2022
d01fb95
feat: undo adding factory method to create ParquetDataTypeParser
Meghajit Mar 6, 2022
e0705be
feat: add factory method in SimpleGroupValidation
Meghajit Mar 6, 2022
960886c
feat: add a test proto for testing parquet source
Meghajit Mar 6, 2022
071122f
feat: support transforming of simple group in PrimitiveProtoHandler
Meghajit Mar 7, 2022
4c4dbe2
feat: add factory method to convert simple group to row
Meghajit Mar 7, 2022
5200fbd
feat: add a simple group deserializer
Meghajit Mar 8, 2022
9ba9c2c
feat: rename interface method to transformFromKafka
Meghajit Mar 8, 2022
255814e
feat: add and implement transformFromParquet
Meghajit Mar 8, 2022
4b375c6
feat: move SimpleGroupValidation to another package
Meghajit Mar 8, 2022
bec88e9
feat: delete io.odpf.dagger.common.serde.parquet.parser package
Meghajit Mar 8, 2022
20890f6
feat: call transformFromParquet instead in RowFactory method
Meghajit Mar 10, 2022
9a30c92
feat: throw exception for bounded kafka source
Meghajit Mar 11, 2022
c4a7651
feat: stop passing stencil to ParquetFileSource and its downstream co…
Meghajit Mar 11, 2022
7104a05
feat: throw exception for unbounded parquet source
Meghajit Mar 11, 2022
c847670
feat: add constructor to SourceDetails
Meghajit Mar 11, 2022
8b3bd8d
feat: make both JsonDeserializer and ProtoDeserializer implement Dagg…
Meghajit Mar 11, 2022
a28e7a7
feat: Add a deserializer factory
Meghajit Mar 11, 2022
651583f
feat: add new field and constructor to Stream POJO
Meghajit Mar 11, 2022
f2de48a
feat: add method to create data sources with deserializer in StreamBu…
Meghajit Mar 11, 2022
4cb77f7
feat: add a 2 argument constructor for StreamBuilder
Meghajit Mar 11, 2022
2ef46ec
feat: call StreamBuilder with stencil from ProtoDataStreamBuilder
Meghajit Mar 11, 2022
5360b14
feat: add tests to check cases related to new stream config SourceDet…
Meghajit Mar 11, 2022
91afc8f
feat: enable Mockito plugin for mocking of final methods
Meghajit Mar 11, 2022
6037afc
feat: move DeserializerFactory to another package and add tests
Meghajit Mar 11, 2022
a236b3b
feat: add a class for populating source specific stream metrics
Meghajit Mar 14, 2022
a8d3278
feat: delegate metrics away from StreamBuilder to StreamMetrics
Meghajit Mar 14, 2022
3f7582c
feat: create stream with source details from SourceFactory
Meghajit Mar 14, 2022
07e86c5
feat: migrate to using buildStream in StreamBuilder implementations
Meghajit Mar 14, 2022
09cb1af
feat: rename buildStream to build
Meghajit Mar 14, 2022
8375565
feat: add tests for StreamMetrics
Meghajit Mar 14, 2022
cd83662
feat: refactor StreamsFactory
Meghajit Mar 14, 2022
13cac7e
feat: refactor StreamMetricsTest
Meghajit Mar 14, 2022
69c5cb9
feat: add tests for DeserializerFactory
Meghajit Mar 14, 2022
2d566df
feat: stop sending StreamConfig to ParquetFileRecordFormat
Meghajit Mar 14, 2022
908a7aa
feat: cleanup test and edit exception message
Meghajit Mar 14, 2022
dc7cff9
feat: refactor SourceFactory and add tests
Meghajit Mar 14, 2022
0ff1de6
feat: refactor DeserializerFactory
Meghajit Mar 15, 2022
357a4f5
feat: refactor StreamBuilder
Meghajit Mar 15, 2022
5a03736
feat: add custom exception for simple group parsing failures
Meghajit Mar 15, 2022
56d1dcc
feat: throw SimpleGroupParsingException from SimpleGroupDeserializer
Meghajit Mar 15, 2022
0a64f11
feat: change exception thrown during simple group parsing in Timestam…
Meghajit Mar 15, 2022
e3321d8
feat: build FileSource from ParquetFileSource in SourceFactory
Meghajit Mar 15, 2022
95e334c
feat: add a Builder nested class inside ParquetFileRecordFormat
Meghajit Mar 15, 2022
437ecc1
feat: delete factories for kafka and parquet source
Meghajit Mar 15, 2022
0aaf743
feat: use Google precondition checks instead of flink ones
Meghajit Mar 15, 2022
dc1286c
feat: Create StreamType and its implementations
Meghajit Mar 29, 2022
057f46f
feat: make abstract methods public in StreamType and its implementations
Meghajit Mar 29, 2022
66727f8
feat: add tests for KafkaSourceJsonType
Meghajit Mar 29, 2022
c9b773a
feat: return StreamType from build method
Meghajit Mar 29, 2022
945d21c
feat: refactor tests
Meghajit Mar 29, 2022
8be0e78
feat: add tests for ParquetSourceProtoType
Meghajit Mar 29, 2022
cfa4990
feat: add tests for KafkaSourceProtoType
Meghajit Mar 29, 2022
0972a1f
feat: add a POJO for holding parquet file split with its instant
Meghajit Mar 30, 2022
cba471b
feat: implement chronology ordered split assigner and add tests
Meghajit Mar 30, 2022
9606844
feat: rename files and tests for streamtype
Meghajit Mar 30, 2022
b51fbaa
feat: move the builder class as a nested class inside ParquetFileSource
Meghajit Mar 30, 2022
c5ff670
feat: allow chronology ordered split assigner to be initialized with …
Meghajit Mar 30, 2022
ab992c3
feat: add tests for ParquetFileSource and add more validations
Meghajit Mar 30, 2022
e732769
feat: make provider class as nested class inside PrimitiveReader
Meghajit Mar 30, 2022
e372b31
feat: add tests for ParquetFileRecordFormat
Meghajit Mar 30, 2022
4830b1a
feat: add few tests for PrimitiveReader
Meghajit Mar 30, 2022
35736bb
feat: implement PrimitiveReader and add tests
Meghajit Mar 31, 2022
fadee0b
feat: throw runtime exception if error in creating parquet reader
Meghajit Mar 31, 2022
daebbfe
Merge remote-tracking branch 'origin/main' into feat/add-parquet-data…
Meghajit Apr 1, 2022
f2abffb
feat: make the type information provider and reader provider function…
Meghajit Apr 1, 2022
19612ed
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
ae6bf6b
feat: fix checkstyle issues in split assigner
Meghajit Apr 4, 2022
d61f6e3
feat: fix checkstyle formatting issues in stream type impls
Meghajit Apr 4, 2022
6cad4ee
feat: make ParquetFileSource serializable
Meghajit Apr 4, 2022
a0cb47d
feat: remove serialization from PrimitiveReader
Meghajit Apr 4, 2022
c518d0f
feat: make SourceDetails and StreamType serializable
Meghajit Apr 4, 2022
bde26e2
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
05763f9
feat: fix checkstyle formatting errors
Meghajit Apr 4, 2022
6d58b98
feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified
Meghajit Apr 4, 2022
6523779
feat: return StreamType instead of Stream from StreamsFactory
Meghajit Apr 5, 2022
404f476
feat: delete Stream, StreamBuilder and its implementations
Meghajit Apr 5, 2022
5eb3b55
feat: delete SourceFactory,DeserializerFactory and StreamMetrics
Meghajit Apr 5, 2022
556ccec
feat: delete Mockito extension
Meghajit Apr 5, 2022
642a0ea
feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType
Meghajit Apr 5, 2022
0a44559
feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType
Meghajit Apr 5, 2022
514793d
feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStre…
Meghajit Apr 5, 2022
e298aed
feat: throw runtime exception for unsupported split assigner
Meghajit Apr 5, 2022
5cae0f4
feat: return default timestamp when not set or present in simple group
Meghajit Apr 5, 2022
66dcf95
feat: delete unused methods from SimpleGroupValidation
Meghajit Apr 5, 2022
539dbd0
feat: inject file path regex into chronology split assigner from stre…
Meghajit Apr 5, 2022
f6cf116
feat: edit the exception message for regex no match in chronology spl…
Meghajit Apr 5, 2022
7b51bb6
Revert "feat: edit the exception message for regex no match in chrono…
Meghajit Apr 5, 2022
a2c0c5c
Revert "feat: inject file path regex into chronology split assigner f…
Meghajit Apr 5, 2022
1850a61
feat: rename functions of ProtoHandler as per review
Meghajit Apr 5, 2022
83489dd
feat: create row from millis inline instead of calling RowFactory
Meghajit Apr 5, 2022
4fa0986
feat: remove unused imports
Meghajit Apr 7, 2022
3877689
feat: add support for deprecated FlinkKafkaConsumer
Meghajit Apr 13, 2022
1714c4d
test: add tests for StreamsFactory
Meghajit Apr 18, 2022
18d2752
test: add tests for StreamType
Meghajit Apr 18, 2022
4034cba
test: add tests for DaggerDeserializerFactory
Meghajit Apr 19, 2022
0cef54c
test: add tests for DaggerSourceFactory
Meghajit Apr 19, 2022
ad796c5
test: add tests for JsonDeserializerProvider
Meghajit Apr 19, 2022
4a2fb73
test: add tests for ProtoDeserializerProvider
Meghajit Apr 19, 2022
711e95b
test: add tests for SimpleGroupDeserializerProvider
Meghajit Apr 19, 2022
9564169
test: add tests for KafkaDaggerSource
Meghajit Apr 19, 2022
8514ef0
test: add tests for FlinkKafkaConsumerDaggerSource
Meghajit Apr 19, 2022
a698cbb
test: add tests for ParquetDaggerSource
Meghajit Apr 19, 2022
93615e7
test: add tests for FlinkKafkaConsumerCustom
Meghajit Apr 19, 2022
bfed30b
feat: update constants and usages
Meghajit Apr 19, 2022
3809d3a
feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when m…
Meghajit Apr 19, 2022
bd5006d
test: add and update tests for StreamManager
Meghajit Apr 19, 2022
4e2452c
Merge remote-tracking branch 'upstream/main' into feat/add-parquet-da…
Meghajit Apr 19, 2022
7d90975
feat: revert changes to local.properties
Meghajit Apr 19, 2022
05a0d7a
feat: delete unused files and tests
Meghajit Apr 19, 2022
2beddc0
feat: fix checkstyle formatting errors
Meghajit Apr 19, 2022
cb17f2c
feat: delete duplicate files and unused configs
Meghajit Apr 19, 2022
10f20ee
feat: add new interface method to parse from SimpleGroup
Meghajit Apr 20, 2022
2dfb01b
feat: remove redundant hadoop file path
Meghajit Apr 20, 2022
18ee55d
feat: refactor constructor of PrimitiveReader
Meghajit Apr 20, 2022
814e4dd
feat: remove unused initMocks
Meghajit Apr 20, 2022
f106599
test: use normal values instead of limits
Meghajit Apr 20, 2022
cc97d43
test: use normal values instead of limits
Meghajit Apr 20, 2022
26961c1
test: use normal values instead of limits
Meghajit Apr 20, 2022
26b77f6
test: use normal values instead of limits
Meghajit Apr 20, 2022
01bacb8
feat: remove unused constructor from StreamType.Builder
Meghajit Apr 20, 2022
4a82762
feat: rename StreamType to Stream
Meghajit Apr 21, 2022
fcac8d4
feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respe…
Meghajit Apr 21, 2022
a867b9e
feat: rename PrimitiveReader to ParquetReader
Meghajit Apr 21, 2022
3d29d9a
feat: change access to package-private for Stream constructor
Meghajit Apr 21, 2022
21331f5
feat: make properties private
Meghajit Apr 21, 2022
0c25068
feat: move initialization of RecordReader out of class constructor
Meghajit Apr 25, 2022
9f02b87
feat: refactor ParquetReader
Meghajit Apr 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: implement parquet parser for float
[#99]
  • Loading branch information
Meghajit committed Feb 22, 2022
commit 7690b89207cad7e31e59d13c09893913da4d2680
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.odpf.dagger.common.serde.parquet.parser.primitive;

import io.odpf.dagger.common.serde.parquet.parser.ParquetDataTypeParser;
import org.apache.parquet.example.data.simple.SimpleGroup;
import java.util.function.Supplier;

public class ParquetFloatParser implements ParquetDataTypeParser {
private static final float DEFAULT_DESERIALIZED_VALUE = 0.0f;

@Override
public Object deserialize(SimpleGroup simpleGroup, String fieldName) {
Supplier<Object> valueSupplier = () -> {
int columnIndex = simpleGroup.getType().getFieldIndex(fieldName);
return simpleGroup.getFloat(columnIndex, 0);
};
return ParquetDataTypeParser.getValueOrDefault(simpleGroup, valueSupplier, DEFAULT_DESERIALIZED_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.odpf.dagger.common.serde.parquet.parser.primitive;

import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Types;
import org.junit.Test;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
import static org.junit.Assert.*;

public class ParquetFloatParserTest {

@Test
public void deserializeShouldParseParquetFloatToJavaFloatType() {
GroupType parquetSchema = Types.requiredGroup()
.required(FLOAT).named("column-with-max-float-value")
.required(FLOAT).named("column-with-min-float-value")
.named("TestGroupType");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema) ;
simpleGroup.add("column-with-max-float-value", Float.MAX_VALUE);
simpleGroup.add("column-with-min-float-value", Float.MIN_VALUE);

ParquetFloatParser floatParser = new ParquetFloatParser();
Object actualValueForMaxFloatColumn = floatParser.deserialize(simpleGroup, "column-with-max-float-value");
Object actualValueForMinFloatColumn = floatParser.deserialize(simpleGroup, "column-with-min-float-value");

assertEquals(Float.MAX_VALUE, actualValueForMaxFloatColumn);
assertEquals(Float.MIN_VALUE, actualValueForMinFloatColumn);
}

@Test(expected = ClassCastException.class)
public void deserializeShouldThrowExceptionWhenTypeExceeded() {
GroupType parquetSchema = Types.requiredGroup()
.required(FLOAT).named("column-with-min-double-value")
.named("TestGroupType");
SimpleGroup simpleGroup = new SimpleGroup(parquetSchema) ;
simpleGroup.add("column-with-min-double-value", Double.MIN_VALUE);


ParquetFloatParser floatParser = new ParquetFloatParser();
floatParser.deserialize(simpleGroup, "column-with-min-double-value");
}

@Test
public void deserializeShouldReturnDefaultValueWhenSimpleGroupArgumentIsNull() {
ParquetFloatParser parquetFloatParser = new ParquetFloatParser();
Object actualValue = parquetFloatParser.deserialize(null, "some-random-field");

assertEquals(0.0f, actualValue);
}
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.