Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 8, 2026. It is now read-only.
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
Expand All @@ -98,6 +99,7 @@ public StreamingSubscriberConnection(
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
boolean useLegacyFlowControl,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -119,6 +121,7 @@ public StreamingSubscriberConnection(
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
this.useLegacyFlowControl = useLegacyFlowControl;
}

@Override
Expand Down Expand Up @@ -217,9 +220,13 @@ private void initialize() {
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(
valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
.setMaxOutstandingBytes(
valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac

private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
// The ExecutorProvider used to generate executors for processing messages.
Expand All @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
subscriptionName = builder.subscriptionName;

maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
Expand Down Expand Up @@ -336,6 +338,7 @@ private void startStreamingConnections() {
subStub,
i,
flowControlSettings,
useLegacyFlowControl,
flowController,
executor,
alarmsExecutor,
Expand Down Expand Up @@ -420,6 +423,7 @@ public static final class Builder {
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
Expand Down Expand Up @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Disables enforcing flow control settings at the Cloud PubSub server and uses the less
* accurate method of only enforcing flow control at the client side.
*/
public Builder setUseLegacyFlowControl(boolean value) {
this.useLegacyFlowControl = value;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.