From b8d4a0196313cde44ea72c36206e32ec85830b07 Mon Sep 17 00:00:00 2001 From: Adrian Vogelsgesang Date: Fri, 17 Sep 2021 16:43:12 -0700 Subject: [PATCH] Add support for scheduling Data Update jobs This commit adds support for the `datasources//data` endpoint through which one can schedule jobs to update the data within a published live-to-Hyper datasource on the server. The new `datasources.update_data` expects the arguments: * a datasource or a connection: If the datasource only contains a single connections, the datasource is sufficient to identify which Hyper file should be updated. Otherwise, for datasources with multiple connections, the connections has to be provided. This distinction happens on the server, so the client library only needs to provide a way to specify either of both. * a `request_id` which will be used to ensure idempotency on the server. This parameter is simply passed as a HTTP header . * an `actions` list, specifying how exactly the data on the server should be modified. We expect the caller to provide list following the structure documented in the REST API documentation. TSC does not validate this object and simply passes it through to the server. * an optional `payload` file: For actions like `insert`, one can provide a Hyper file which contains the newly inserted tuples or other payload data. TSC will upload this file to the server and then hand it over to the update-API endpoint. Besides the addition of the `datasources.update_data` itself, this commit also adds some infrastructure changes, e.g., to enable sending PATCH requests and HTTP headers. The documentation for the REST endpoint can be found at https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_how_to_update_data_to_hyper.htm --- samples/update_datasource_data.py | 74 +++++++++++++++++ .../server/endpoint/datasources_endpoint.py | 29 +++++++ .../server/endpoint/endpoint.py | 17 +++- test/assets/datasource_data_update.xml | 9 ++ test/test_datasource.py | 82 +++++++++++++++++++ 5 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 samples/update_datasource_data.py create mode 100644 test/assets/datasource_data_update.xml diff --git a/samples/update_datasource_data.py b/samples/update_datasource_data.py new file mode 100644 index 000000000..9465ae9ee --- /dev/null +++ b/samples/update_datasource_data.py @@ -0,0 +1,74 @@ +#### +# This script demonstrates how to update the data within a published +# live-to-Hyper datasource on server. +# +# The sample is hardcoded against the `World Indicators` dataset and +# expects to receive the LUID of a published datasource containing +# that data. To create such a published datasource, you can use: +# ./publish_datasource.py --file ../test/assets/World\ Indicators.hyper +# which will print you the LUID of the datasource. +# +# Before running this script, the datasource will contain a region `Europe`. +# After running this script, that region will be gone. +# +#### + +import argparse +import uuid +import logging + +import tableauserverclient as TSC + + +def main(): + parser = argparse.ArgumentParser(description='Delete the `Europe` region from a published `World Indicators` datasource.') + # Common options; please keep those in sync across all samples + parser.add_argument('--server', '-s', required=True, help='server address') + parser.add_argument('--site', '-S', help='site name') + parser.add_argument('--token-name', '-p', required=True, + help='name of the personal access token used to sign into the server') + parser.add_argument('--token-value', '-v', required=True, + help='value of the personal access token used to sign into the server') + parser.add_argument('--logging-level', '-l', choices=['debug', 'info', 'error'], default='error', + help='desired logging level (set to error by default)') + # Options specific to this sample + parser.add_argument('datasource_id', help="The LUID of the `World Indicators` datasource") + + args = parser.parse_args() + + # Set logging level based on user input, or error by default + logging_level = getattr(logging, args.logging_level.upper()) + logging.basicConfig(level=logging_level) + + tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site) + server = TSC.Server(args.server, use_server_version=True) + with server.auth.sign_in(tableau_auth): + # We use a unique `request_id` for every request. + # In case the submission of the update job fails, we won't know wether the job was submitted + # or not. It could be that the server received the request, changed the data, but then the + # network connection broke down. + # If you want to have a way to retry, e.g., inserts while making sure they aren't duplicated, + # you need to use `request_id` for that purpose. + # In our case, we don't care about retries. And the delete is idempotent anyway. + # Hence, we simply use a randomly generated request id. + request_id = str(uuid.uuid4()) + + # This action will delete all rows with `Region=Europe` from the published data source. + # Other actions (inserts, updates, ...) are also available. For more information see + # https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_how_to_update_data_to_hyper.htm + actions = [ + { + "action": "delete", + "target-table": "Extract", + "target-schema": "Extract", + "condition": {"op": "eq", "target-col": "Region", "const": {"type": "string", "v": "Europe"}} + } + ] + + job = server.datasources.update_data(args.datasource_id, request_id=request_id, actions=actions) + + # TODO: Add a flag that will poll and wait for the returned job to be done + print(job) + +if __name__ == '__main__': + main() diff --git a/tableauserverclient/server/endpoint/datasources_endpoint.py b/tableauserverclient/server/endpoint/datasources_endpoint.py index b67332f7d..997921312 100644 --- a/tableauserverclient/server/endpoint/datasources_endpoint.py +++ b/tableauserverclient/server/endpoint/datasources_endpoint.py @@ -18,6 +18,7 @@ import copy import cgi from contextlib import closing +import json # The maximum size of a file that can be published in a single request is 64MB FILESIZE_LIMIT = 1024 * 1024 * 64 # 64MB @@ -282,6 +283,34 @@ def publish( logger.info("Published {0} (ID: {1})".format(filename, new_datasource.id)) return new_datasource + @api(version="3.13") + def update_data(self, datasource_or_connection_item, *, request_id, actions, payload = None): + if isinstance(datasource_or_connection_item, DatasourceItem): + datasource_id = datasource_or_connection_item.id + url = "{0}/{1}/data".format(self.baseurl, datasource_id) + elif isinstance(datasource_or_connection_item, ConnectionItem): + datasource_id = datasource_or_connection_item.datasource_id + connection_id = datasource_or_connection_item.id + url = "{0}/{1}/connections/{2}/data".format(self.baseurl, datasource_id, connection_id) + else: + assert isinstance(datasource_or_connection_item, str) + url = "{0}/{1}/data".format(self.baseurl, datasource_or_connection_item) + + if payload is not None: + if not os.path.isfile(payload): + error = "File path does not lead to an existing file." + raise IOError(error) + + logger.info("Uploading {0} to server with chunking method for Update job".format(payload)) + upload_session_id = self.parent_srv.fileuploads.upload(payload) + url = "{0}?uploadSessionId={1}".format(url, upload_session_id) + + json_request = json.dumps({"actions": actions}) + parameters = {"headers": {"requestid": request_id}} + server_response = self.patch_request(url, json_request, "application/json", parameters=parameters) + new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] + return new_job + @api(version="2.0") def populate_permissions(self, item): self._permissions.populate(item) diff --git a/tableauserverclient/server/endpoint/endpoint.py b/tableauserverclient/server/endpoint/endpoint.py index f7d88b0e6..31291abc9 100644 --- a/tableauserverclient/server/endpoint/endpoint.py +++ b/tableauserverclient/server/endpoint/endpoint.py @@ -55,7 +55,9 @@ def _make_request( ): parameters = parameters or {} parameters.update(self.parent_srv.http_options) - parameters["headers"] = Endpoint._make_common_headers(auth_token, content_type) + if not "headers" in parameters: + parameters["headers"] = {} + parameters["headers"].update(Endpoint._make_common_headers(auth_token, content_type)) if content is not None: parameters["data"] = content @@ -118,13 +120,14 @@ def delete_request(self, url): # We don't return anything for a delete self._make_request(self.parent_srv.session.delete, url, auth_token=self.parent_srv.auth_token) - def put_request(self, url, xml_request=None, content_type="text/xml"): + def put_request(self, url, xml_request=None, content_type="text/xml", parameters=None): return self._make_request( self.parent_srv.session.put, url, content=xml_request, auth_token=self.parent_srv.auth_token, content_type=content_type, + parameters=parameters, ) def post_request(self, url, xml_request, content_type="text/xml", parameters=None): @@ -137,6 +140,16 @@ def post_request(self, url, xml_request, content_type="text/xml", parameters=Non parameters=parameters, ) + def patch_request(self, url, xml_request, content_type="text/xml", parameters=None): + return self._make_request( + self.parent_srv.session.patch, + url, + content=xml_request, + auth_token=self.parent_srv.auth_token, + content_type=content_type, + parameters=parameters, + ) + def api(version): """Annotate the minimum supported version for an endpoint. diff --git a/test/assets/datasource_data_update.xml b/test/assets/datasource_data_update.xml new file mode 100644 index 000000000..305caaf0b --- /dev/null +++ b/test/assets/datasource_data_update.xml @@ -0,0 +1,9 @@ + + + + + + 7ecaccd8-39b0-4875-a77d-094f6e930019 + + + diff --git a/test/test_datasource.py b/test/test_datasource.py index 42d1dfade..e4ef01a29 100644 --- a/test/test_datasource.py +++ b/test/test_datasource.py @@ -1,3 +1,4 @@ +from tableauserverclient.server.endpoint.fileuploads_endpoint import Fileuploads import unittest from io import BytesIO import os @@ -22,6 +23,7 @@ PUBLISH_XML_ASYNC = 'datasource_publish_async.xml' REFRESH_XML = 'datasource_refresh.xml' UPDATE_XML = 'datasource_update.xml' +UPDATE_DATA_XML = 'datasource_data_update.xml' UPDATE_CONNECTION_XML = 'datasource_connection_update.xml' @@ -355,6 +357,86 @@ def test_refresh_object(self): # We only check the `id`; remaining fields are already tested in `test_refresh_id` self.assertEqual('7c3d599e-949f-44c3-94a1-f30ba85757e4', new_job.id) + def test_update_data_datasource_object(self): + """Calling `update_data` with a `DatasourceItem` should update that datasource""" + self.server.version = "3.13" + self.baseurl = self.server.datasources.baseurl + + datasource = TSC.DatasourceItem('') + datasource._id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb' + response_xml = read_xml_asset(UPDATE_DATA_XML) + with requests_mock.mock() as m: + m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data', + status_code=202, headers={"requestid": "test_id"}, text=response_xml) + new_job = self.server.datasources.update_data(datasource, request_id="test_id", actions=[]) + + self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id) + self.assertEqual('UpdateUploadedFile', new_job.type) + self.assertEqual(None, new_job.progress) + self.assertEqual('2021-09-18T09:40:12Z', format_datetime(new_job.created_at)) + self.assertEqual(-1, new_job.finish_code) + + def test_update_data_connection_object(self): + """Calling `update_data` with a `ConnectionItem` should update that connection""" + self.server.version = "3.13" + self.baseurl = self.server.datasources.baseurl + + connection = TSC.ConnectionItem() + connection._datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb' + connection._id = '7ecaccd8-39b0-4875-a77d-094f6e930019' + response_xml = read_xml_asset(UPDATE_DATA_XML) + with requests_mock.mock() as m: + m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/connections/7ecaccd8-39b0-4875-a77d-094f6e930019/data', + status_code=202, headers={"requestid": "test_id"}, text=response_xml) + new_job = self.server.datasources.update_data(connection, request_id="test_id", actions=[]) + + # We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object` + self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id) + + def test_update_data_datasource_string(self): + """For convenience, calling `update_data` with a `str` should update the datasource with the corresponding UUID""" + self.server.version = "3.13" + self.baseurl = self.server.datasources.baseurl + + datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb' + response_xml = read_xml_asset(UPDATE_DATA_XML) + with requests_mock.mock() as m: + m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data', + status_code=202, headers={"requestid": "test_id"}, text=response_xml) + new_job = self.server.datasources.update_data(datasource_id, request_id="test_id", actions=[]) + + # We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object` + self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id) + + def test_update_data_datasource_payload_file(self): + """If `payload` is present, we upload it and associate the job with it""" + self.server.version = "3.13" + self.baseurl = self.server.datasources.baseurl + + datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb' + mock_upload_id = '10051:c3e56879876842d4b3600f20c1f79876-0:0' + response_xml = read_xml_asset(UPDATE_DATA_XML) + with requests_mock.mock() as rm, \ + unittest.mock.patch.object(Fileuploads, "upload", return_value=mock_upload_id): + rm.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data?uploadSessionId=' + mock_upload_id, + status_code=202, headers={"requestid": "test_id"}, text=response_xml) + new_job = self.server.datasources.update_data(datasource_id, request_id="test_id", + actions=[], payload=asset('World Indicators.hyper')) + + # We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object` + self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id) + + def test_update_data_datasource_invalid_payload_file(self): + """If `payload` points to a non-existing file, we report an error""" + self.server.version = "3.13" + self.baseurl = self.server.datasources.baseurl + datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb' + with self.assertRaises(IOError) as cm: + self.server.datasources.update_data(datasource_id, request_id="test_id", + actions=[], payload='no/such/file.missing') + exception = cm.exception + self.assertEqual(str(exception), "File path does not lead to an existing file.") + def test_delete(self): with requests_mock.mock() as m: m.delete(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb', status_code=204)