feat: Enable UDF access to SQLTransformers#202
Merged
Meghajit merged 3 commits intoraystack:mainraystack/dagger:mainfrom Oct 28, 2022
Shreyansh228:enable-udf-in-sqltransformerShreyansh228/dagger:enable-udf-in-sqltransformerCopy head branch name to clipboard
Merged
feat: Enable UDF access to SQLTransformers#202Meghajit merged 3 commits intoraystack:mainraystack/dagger:mainfrom Shreyansh228:enable-udf-in-sqltransformerShreyansh228/dagger:enable-udf-in-sqltransformerCopy head branch name to clipboard
Meghajit merged 3 commits intoraystack:mainraystack/dagger:mainfrom
Shreyansh228:enable-udf-in-sqltransformerShreyansh228/dagger:enable-udf-in-sqltransformerCopy head branch name to clipboard
Conversation
Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer). The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible. We can solve this by two approaches as below. Approach-1: We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java. We can call the DaggerContext object as a static method call in the Transformer.java interface. With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier. Approach-2: In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized. Here we have followed Approach-1.
Meghajit
reviewed
Oct 11, 2022
dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java
Outdated
Show resolved
Hide resolved
Meghajit
reviewed
Oct 11, 2022
dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java
Outdated
Show resolved
Hide resolved
KafkaProtoSQLProcessor -> StreamManager -> PostProcessorFactory -> ParentPostProcessor -> TransformProcessor and refactoring related code
Meghajit
reviewed
Oct 26, 2022
Member
There was a problem hiding this comment.
Any reason this import been commented ?
I can see its still used in Line 50 below
Contributor
Author
There was a problem hiding this comment.
This has been un-commented and handled.
|
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).
The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.
We can solve this by two approaches as below.
Approach-1:
We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.
We can call the DaggerContext object as a static method call in the Transformer.java interface. With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.
Approach-2:
In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.
Here we have followed Approach-1.