Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4f728ab

Browse filesBrowse files
authored
CopyExampleGen tests (#251)
* Initialize test file and add comments * add a couple tests and pull out functions * lint * PR comments * add another ValueError * Add back the mkdir step * Add component to version.py * change to protected functions
1 parent 015e5ae commit 4f728ab
Copy full SHA for 4f728ab

File tree

4 files changed

+169
-42
lines changed
Filter options

4 files changed

+169
-42
lines changed

‎tfx_addons/copy_example_gen/README.md

Copy file name to clipboardExpand all lines: tfx_addons/copy_example_gen/README.md
+13-13
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
**Project name:** CopyExampleGen component
1111

1212
## Project Description
13-
CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution for downstream components to use. Lastly, tfrecord file(s) in uri must resemble same `.gz` file format as the output of ExampleGen component.
13+
CopyExampleGen will allow the user to copy pre-existing TFRecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution for downstream components to use. Lastly, TFRecord file(s) in URI must resemble same `.gz` file format as the output of ExampleGen component.
1414

1515
Example of pipeline component definition:
1616
```python
@@ -19,9 +19,9 @@ tfrecord_dict : Dict[str, str] = {
1919
"eval" : "gs://path/to/tfrecords/examples/Split-eval/"
2020
}
2121

22-
copy_example_gen = component.CopyExampleGen(
23-
input_dict = json.dumps(tfrecords_dict)
24-
)
22+
copy_example_gen = component.CopyExampleGen(
23+
input_dict = json.dumps(tfrecords_dict)
24+
)
2525
```
2626

2727
As of April 10th, 2023, tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using `json.dumps()` by adding 'tfrecords_dict' in as an argument.
@@ -31,9 +31,9 @@ As of April 10th, 2023, tfx.dsl.components.Parameter only supports primitive typ
3131
Addon Component
3232

3333
## Project Use-Case(s)
34-
CopyExampleGen will replace ExampleGen when tfrecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the tfrecords be shuffled and/ or randomized saving data ingestion pipeline process time.
34+
CopyExampleGen will replace ExampleGen when TFRecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the TFRecords be shuffled and/ or randomized saving data ingestion pipeline process time.
3535

36-
Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Some challenges users have had:
36+
Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a Beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Some challenges users have had:
3737

3838
1. “Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam”.
3939

@@ -47,29 +47,29 @@ Custom Python function component: CopyExampleGen
4747

4848
- `input_json_str`: will be the input parameter for CopyExampleGen of type `tfx.dsl.components.Parameter[str]`, where the user will assign their Dict[str, str] input, tfrecords_dict. However, because Python custom component development only supports primitive types, we must assign `input_json_str` to `json.dumps(tfrecords_dict)` and place the tfrecords_dict in as an argument.
4949

50-
- `output_example`: Output artifact can be referenced as an object of its' specified type ArtifactType in the component function being declared. For example, if the ArtifactType is Examples, one can reference properties in an Examples ArtifactType (span, version, split_names, etc.) by calling the OutputArtifact object. This will be the variable we reference to build and register our Examples Artifact after pasrsing the tfrecords_dict input.
50+
- `output_example`: Output artifact can be referenced as an object of its specified type ArtifactType in the component function being declared. For example, if the ArtifactType is Examples, one can reference properties in an Examples ArtifactType (span, version, split_names, etc.) by calling the OutputArtifact object. This will be the variable we reference to build and register our Examples Artifact after pasrsing the tfrecords_dict input.
5151

5252

5353
### Python Custom Component Implementation Details
5454

55-
Using fileio.mkdir and fileio.copy, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder.
55+
Using fileio.mkdir and fileio.copy, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the URI path will then be copied into the designated `Split-name` folder.
5656

5757
Thoughts from original implementation in phase 1:
5858
This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function:
5959
Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact.
6060

6161
Using the keys and values from `tfrecords_dict`:
6262
Parse the input_dict.keys() to a str to resemble the necessary format of property `split-names` i.e. '["train","eval"]'
63-
63+
6464
## Possible Future Development Directions
65-
1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused?
66-
65+
1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused?
66+
6767
- If it's not easy to detect the execution environment without also performing a copy, will the user have to specify the execution environment and therefore how to do the copy (e.g., local copy, GCS, S3). And then what's the best way to handle that?
68-
68+
6969
2. Should the dictionary of file inputs take a path to a folder? Globs? Lists of individual files?
7070
3. Assuming file copying is done entirely separately, [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) be used as is to register the artifacts, or does some separate code using [MLMD](https://www.tensorflow.org/tfx/guide/mlmd) in a different way need to be written
7171

7272

7373
## Project Team
7474
Alex Ho, alexanderho@google.com, @alxndrnh
75-
75+
Zi Yang, zya@google.com, @zyang7

‎tfx_addons/copy_example_gen/component.py

Copy file name to clipboardExpand all lines: tfx_addons/copy_example_gen/component.py
+70-28
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
# limitations under the License.
1414
"""CopyExampleGen custom component.
1515
16-
This component will accept tfrecord files and register them as an
16+
This component will accept TFRecord files and register them as an
1717
Examples Artifact for downstream components to use. CopyExampleGen accepts
1818
a dictionary where keys are the split-names and their respective value is a
19-
uri to the folder that contains the tfrecords file(s).
19+
URI to the folder that contains the TFRecords file(s).
2020
21-
Tfrecord file(s) in uri must resemble same `.gz` file format as the output of
21+
TFRecord file(s) in URI must resemble same `.gz` file format as the output of
2222
ExampleGen component.
2323
2424
User will need to create a dictionary of type Dict[str, str], in this case
@@ -40,7 +40,9 @@
4040
4141
"""
4242
import json
43+
import logging
4344
import os
45+
from typing import Dict
4446

4547
from tfx import v1 as tfx
4648
from tfx.dsl.component.experimental.decorators import component
@@ -53,42 +55,82 @@ def CopyExampleGen( # pylint: disable=C0103
5355
input_json_str: tfx.dsl.components.Parameter[str],
5456
output_example: tfx.dsl.components.OutputArtifact[Examples]
5557
) -> tfx.dsl.components.OutputDict():
56-
"""
58+
"""Copies the TFRecords from input Split directories to reuse.
59+
5760
CopyExampleGen first converts the string input to a type Dict and extracts
5861
the keys from the dictionary, input_dict, and creates a string containing
59-
the names. This string is assigned to the output_example.split_uri property
60-
to register split_names property.
61-
62-
This component then creates a directory folder for each name in split_name.
63-
Following the creation of the `Split-name` folder, the files in the uri path
64-
will then be copied into the designated `Split-name` folder.
65-
62+
the names. This string is assigned to the `output_example.split_uri` property
63+
to register `split_names` property.
64+
65+
This component then creates a directory folder for each `name` in
66+
`split_name`. Following the creation of the `Split-{name}` folder, the files
67+
in the URI path will then be copied into the designated `Split-{name}` folder.
68+
69+
Args:
70+
input_json_str: JSON string containing the split labels (key) and URIs
71+
containing the split label's TFRecords (value). These TFRecords are copied
72+
to `output_example`.
73+
output_example: Output Examples object containing the output `uri` and
74+
`split_names`. This value specifies an OutputArtifact, and does not need
75+
to be provided by the caller.
6676
"""
77+
logging.getLogger().setLevel(logging.INFO)
78+
input_dict = _create_input_dictionary(input_json_str)
6779

68-
# Convert primitive type str to Dict[str, str].
69-
input_dict = json.loads(input_json_str)
70-
71-
# Creates directories from the split-names and tfrecord uris provided into
72-
# output_example.split_names property.
73-
tfrecords_list = []
7480
output_example_uri = output_example.uri
7581

7682
for split_label, split_tfrecords_uri in input_dict.items():
7783
# Create Split-name folder name and create directory.
78-
# output_example_uri = output_example.uri
79-
split_value = (f"/Split-{split_label}/")
80-
fileio.mkdir(f"{output_example_uri}{split_value}")
81-
82-
# Pull all files from uri.
83-
tfrecords_list = fileio.glob(f"{split_tfrecords_uri}*.gz")
84+
split_value_uri = f"{output_example_uri}/Split-{split_label}/"
85+
fileio.mkdir(f"{split_value_uri}")
8486

85-
# Copy files into folder directories.
86-
for tfrecord in tfrecords_list:
87-
file_name = os.path.basename(os.path.normpath(tfrecord))
88-
file_destination = (f"{output_example_uri}{split_value}{file_name}")
89-
fileio.copy(tfrecord, file_destination, True)
87+
_copy_examples(split_tfrecords_uri, split_value_uri)
9088

9189
# Build split_names in required Examples Artifact properties format.
9290
example_properties_split_names = "[\"{}\"]".format('","'.join(
9391
input_dict.keys()))
9492
output_example.split_names = example_properties_split_names
93+
94+
95+
def _create_input_dictionary(input_json_str: str) -> Dict[str, str]:
96+
"""Creates a dictionary from input JSON string.
97+
98+
Args:
99+
input_json_str: JSON string with Split label (key) to Split URI (value).
100+
"""
101+
# Convert primitive type str to Dict[str, str].
102+
if len(input_json_str) == 0:
103+
raise ValueError(
104+
"Input string is not provided. Expected format is Split label (key) "
105+
"and Split URI (value).")
106+
107+
input_dict = json.loads(input_json_str)
108+
if not isinstance(input_dict, dict):
109+
raise ValueError(
110+
f"Input string {input_json_str} is not provided as a dictionary. "
111+
"Expected format is Split label (key) and Split URI (value).")
112+
if len(input_dict.items()) == 0:
113+
raise ValueError(
114+
"Input dictionary is empty. Expected format is Split label (key) "
115+
"and Split URI (value).")
116+
return input_dict
117+
118+
119+
def _copy_examples(split_tfrecords_uri: str, split_value_uri: str) -> None:
120+
"""Copies files from `split_tfrecords_uri` to the output `split_value_uri`.
121+
122+
Args:
123+
split_tfrecords_uri: Source URI where TFRecords to be copied are located.
124+
split_value_uri: Destination URI where the TFRecords should be copied to.
125+
"""
126+
# Pull all files from URI.
127+
tfrecords_list = fileio.glob(f"{split_tfrecords_uri}*.gz")
128+
if len(tfrecords_list) == 0:
129+
logging.warning("Directory %s does not contain files with .gz suffix.",
130+
split_tfrecords_uri)
131+
132+
# Copy files into folder directories.
133+
for tfrecord in tfrecords_list:
134+
file_name = os.path.basename(os.path.normpath(tfrecord))
135+
file_destination = f"{split_value_uri}{file_name}"
136+
fileio.copy(tfrecord, file_destination, True)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Copyright 2023 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
"""
16+
Tests for tfx_addons.copy_example_gen.component.
17+
"""
18+
from unittest import mock
19+
20+
import tensorflow as tf
21+
22+
from tfx_addons.copy_example_gen import component
23+
24+
25+
class TestCopyExampleGen(tf.test.TestCase):
26+
"""Test module for CopyExampleGen."""
27+
def setUp(self):
28+
self.input_json_str = """
29+
{
30+
"label1": "fakeuri",
31+
"label2": "fakeuri2",
32+
}
33+
"""
34+
35+
def test_empty_input(self) -> None:
36+
empty_input_json_str = ""
37+
expected_error = (
38+
"Input string is not provided. Expected format is Split label (key) "
39+
"and Split URI (value).")
40+
41+
with self.assertRaises(ValueError, msg=expected_error):
42+
# pylint: disable=protected-access
43+
component._create_input_dictionary(input_json_str=empty_input_json_str)
44+
45+
def test_non_dictionary_input(self) -> None:
46+
non_dictionary_input = "'a', 'b', 'c'"
47+
expected_error = (
48+
f"Input string {non_dictionary_input} is not provided as a dictionary. "
49+
"Expected format is Split label (key) and Split URI (value).")
50+
51+
with self.assertRaises(ValueError, msg=expected_error):
52+
# pylint: disable=protected-access
53+
component._create_input_dictionary(input_json_str=non_dictionary_input)
54+
55+
def test_empty_dictionary(self) -> None:
56+
empty_input_json_str = "{}"
57+
expected_error = (
58+
"Input dictionary is empty. Expected format is Split label (key) "
59+
"and Split URI (value).")
60+
61+
with self.assertRaises(ValueError, msg=expected_error):
62+
# pylint: disable=protected-access
63+
component._create_input_dictionary(input_json_str=empty_input_json_str)
64+
65+
def test_valid_input(self) -> None:
66+
with mock.patch('tfx_addons.copy_example_gen.component.fileio'):
67+
# pylint: disable=protected-access
68+
component.CopyExampleGen(input_json_str=self.input_json_str)
69+
70+
def test_empty_gcs_directory(self) -> None:
71+
with mock.patch(
72+
'tfx_addons.copy_example_gen.component.fileio') as mock_fileio:
73+
# Returns an empty list indicating no matching files in that location.
74+
mock_fileio.glob.return_value = []
75+
with self.assertLogs() as warning_msg:
76+
# pylint: disable=protected-access
77+
component._copy_examples(split_tfrecords_uri="mock_uri",
78+
split_value_uri="mock_uri_2")
79+
expected_msg = (
80+
"WARNING:root:Directory mock_uri does not contain files with .gz "
81+
"suffix.")
82+
self.assertEqual(warning_msg.output, [expected_msg])

‎tfx_addons/version.py

Copy file name to clipboardExpand all lines: tfx_addons/version.py
+4-1
Original file line numberDiff line numberDiff line change
@@ -80,5 +80,8 @@
8080
[f"tfx{_TFXVERSION_CONSTRAINT}", "huggingface-hub>=0.10.0,<1.0.0"],
8181
"model_card_generator":
8282
[f"tfx{_TFXVERSION_CONSTRAINT}", "model-card-toolkit>=2.0.0,<3.0.0"],
83-
"predictions_to_bigquery": [f"tfx{_TFXVERSION_CONSTRAINT}"]
83+
"predictions_to_bigquery": [f"tfx{_TFXVERSION_CONSTRAINT}"],
84+
"copy_example_gen": [
85+
f"tfx{_TFXVERSION_CONSTRAINT}",
86+
],
8487
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.