Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

README.md

Outline

Pub/Sub Lite with Cloud Dataflow

Open in Cloud Shell

Samples showing how to use Pub/Sub Lite with Cloud Dataflow.

Pub/Sub Lite to Cloud Storage sample

PubsubliteToGcs.java

This sample shows how to create an Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, group the messages using a fixed-sized windowing function, and writes them to Cloud Storage.

Resources needed for this example:

  1. A pair of Pub/Sub Lite topic and subscription.
  2. A Cloud Storage bucket.

Setting up

  1. Enable the APIs: Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.

When you enable Cloud Dataflow, which uses Compute Engine, a default Compute Engine service account with the Editor role (roles/editor) is created.

  1. You can skip this step if you are trying this example in a Google Cloud environment like Cloud Shell.

    Otherwise, create a user-managed service account and grant it the following roles on your project:

    • roles/dataflow.admin
    • roles/pubsublite.viewer
    • roles/pubsublite.subscriber
    • roles/logging.viewer

    Then create a service account key and point GOOGLE_APPLICATION_CREDNETIALS to your downloaded key file.

export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
  1. Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-gcs-bucket

gsutil mb gs://$BUCKET
  1. Create a Pub/Sub Lite topic and subscription. Set LITE_LOCATION to a Pub/Sub Lite location.
export TOPIC=your-lite-topic
export SUBSCRIPTION=your-lite-subscription
export LITE_LOCATION=your-lite-location

gcloud pubsub lite-topics create $TOPIC \
    --zone=$LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
    --zone=$LITE_LOCATION \
    --topic=$TOPIC
  1. Set DATAFLOW_REGION to a Dataflow region close to your Pub/Sub Lite location.
export DATAFLOW_REGION=your-dateflow-region

Running the example

PubsubliteToGcs.java

The following example runs a streaming pipeline. Choose DirectRunner to test it locally or DataflowRunner to run it on Dataflow.

  • --subscription: the Pub/Sub Lite subscription to read messages from
  • --output: the full filepath of the output files
  • --windowSize [optional]: the window size in minutes, defaults to 1
  • --runner [optional]: DataflowRunner or DirectRunner
  • --project [optional]: your project ID, optional if using DirectRunner
  • --region [optional]: the Dataflow region, optional if using DirectRunner
  • --tempLocation: a Cloud Storage location for temporary files, optional if using DirectRunner

Gradle:

gradle execute -Dexec.args="\
    --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
    --output=gs://$BUCKET/samples/output \
    --windowSize=1 \
    --runner=DataflowRunner \
    --project=$PROJECT_ID \
    --region=$DATAFLOW_REGION \
    --tempLocation=gs://$BUCKET/temp"

Maven:

mvn compile exec:java \
  -Dexec.mainClass=examples.PubsubliteToGcs \
  -Dexec.args="\
    --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
    --output=gs://$BUCKET/samples/output \
    --windowSize=1 \
    --runner=DataflowRunner \
    --project=$PROJECT_ID \
    --region=$DATAFLOW_REGION \
    --tempLocation=gs://$BUCKET/temp"

Publish some messages to your Lite topic. Then check for files in your Cloud Storage bucket.

gsutil ls "gs://$BUCKET/samples/output*"

(Optional) Creating a custom Dataflow template

With a metadata.md, you can create a Dataflow Flex template. Custom Dataflow Flex templates can be shared. You can run them with different input parameters.

  1. Create a fat JAR. You should see target/pubsublite-streaming-bundled-1.0.jar as an output.
mvn clean package -DskipTests=true
ls -lh
  1. Provide names and locations for your template file and template container image.
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
  1. Build a custom Flex template.
gcloud dataflow flex-template build $TEMPLATE_PATH \
  --image-gcr-path "$TEMPLATE_IMAGE" \
  --sdk-language "JAVA" \
  --flex-template-base-image JAVA11 \
  --metadata-file "metadata.json" \
  --jar "target/pubsublite-streaming-bundled-1.0.jar" \
  --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  1. Run a job with the custom Flex template using gcloud or in Cloud Console.

Note: Pub/Sub Lite allows only one subscriber to pull messages from one partition. If your Pub/Sub Lite topic has only one partition and you use a subscription attached to that topic in more than one Dataflow jobs, only one of them will get messages.

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
  --template-file-gcs-location "$TEMPLATE_PATH" \
  --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
  --parameters output="gs://$BUCKET/samples/template-output" \
  --parameters windowSize=1 \
  --region "$DATAFLOW_REGION" 

Cleaning up

  1. Stop the pipeline. If you use DirectRunner, Ctrl+C to cancel. If you use DataflowRunner, click on the job you want to stop, then choose "Cancel".

  2. Delete the Lite topic and subscription.

gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscription delete $SUBSCRIPTION
  1. Delete the Cloud Storage objects:
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
  1. Delete the template image in Cloud Registry and delete the Flex template if you have created them.
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
  1. Delete the Cloud Storage bucket:
gsutil rb "gs://$BUCKET"
Morty Proxy This is a proxified and sanitized view of the page, visit original site.