diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f4e330ef1..1587afb91 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final MessageDispatcher messageDispatcher; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); @@ -98,6 +99,7 @@ public StreamingSubscriberConnection( SubscriberStub stub, int channelAffinity, FlowControlSettings flowControlSettings, + boolean useLegacyFlowControl, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -119,6 +121,7 @@ public StreamingSubscriberConnection( systemExecutor, clock); this.flowControlSettings = flowControlSettings; + this.useLegacyFlowControl = useLegacyFlowControl; } @Override @@ -217,9 +220,13 @@ private void initialize() { .setStreamAckDeadlineSeconds(60) .setClientId(clientId) .setMaxOutstandingMessages( - valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + this.useLegacyFlowControl + ? 0 + : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) .setMaxOutstandingBytes( - valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) + this.useLegacyFlowControl + ? 0 + : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 35c50fdb6..6d5946276 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final Duration maxAckExtensionPeriod; private final Duration maxDurationPerAckExtension; // The ExecutorProvider used to generate executors for processing messages. @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; + useLegacyFlowControl = builder.useLegacyFlowControl; subscriptionName = builder.subscriptionName; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; @@ -336,6 +338,7 @@ private void startStreamingConnections() { subStub, i, flowControlSettings, + useLegacyFlowControl, flowController, executor, alarmsExecutor, @@ -420,6 +423,7 @@ public static final class Builder { private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; private Duration maxDurationPerAckExtension = Duration.ofMillis(0); + private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1000L) @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } + /** + * Disables enforcing flow control settings at the Cloud PubSub server and uses the less + * accurate method of only enforcing flow control at the client side. + */ + public Builder setUseLegacyFlowControl(boolean value) { + this.useLegacyFlowControl = value; + return this; + } + /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. *