Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Pub / Sub doesn't work with multiprocessing #3890

Copy link
Copy link
@theacodes

Description

@theacodes
Issue body actions

This code fails with a pickle error:

def multiprocessing_callback(message):
    print('Received message: {}'.format(message))
    message.ack()


def receive_messages_with_multiprocessing(
        project, topic_name, subscription_name):
    """Receives messages from a pull subscription using multiprocessing."""
    import concurrent.futures
    import functools
    import multiprocessing

    import google.cloud.pubsub_v1.subscriber.policy.thread

    # Create a process pool and a queue for sending messages. This will
    # be used by the subscriber policy to execute callbacks.
    executor = concurrent.futures.ProcessPoolExecutor()
    manager = multiprocessing.Manager()
    queue = manager.Queue()

    policy_factory = functools.partial(
        google.cloud.pubsub_v1.subscriber.policy.thread.Policy,
        executor=executor,
        queue=queue)
    subscriber = pubsub_v1.SubscriberClient(policy_class=policy_factory)
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    subscriber.subscribe(subscription_path, callback=multiprocessing_callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
soaxelbrooke

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.Important issue which blocks shipping the next release. Will be fixed prior to next release.release blockingRequired feature/issue must be fixed prior to next release.Required feature/issue must be fixed prior to next release.triaged for GAtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    Morty Proxy This is a proxified and sanitized view of the page, visit original site.