Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 8, 2026. It is now read-only.
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,17 @@ private void processBatch(List<OutstandingMessage> batch) {
// This should be a blocking flow controller and never throw an exception.
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
return PubsubMessage.newBuilder(receivedMessage.getMessage())
.putAttributes(
"googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt()))
Comment thread
hannahrogers-google marked this conversation as resolved.
.build();
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
final AckReplyConsumer consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -205,6 +206,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
return new Builder(subscription, receiver);
}

/** Returns the delivery attempt count for a received {@link PubsubMessage} */
public static int getDeliveryAttempt(PubsubMessage message) {
return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0"));
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscriptionNameString() {
return subscriptionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
private static final int DELIVERY_INFO_COUNT = 3;
private static final ReceivedMessage TEST_MESSAGE =
ReceivedMessage.newBuilder()
.setAckId("ackid")
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();
private static final Runnable NOOP_RUNNABLE =
new Runnable() {
Expand Down Expand Up @@ -78,6 +81,9 @@ public void setUp() {
new MessageReceiver() {
@Override
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
assertThat(message.getData()).isEqualTo(MESSAGE_DATA);
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
consumers.add(consumer);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ public void tearDown() throws Exception {
testChannel.shutdown();
}

@Test
public void testDeliveryAttemptHelper() {
int deliveryAttempt = 3;
PubsubMessage message =
PubsubMessage.newBuilder()
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt);

PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0);
}

@Test
public void testOpenedChannels() throws Exception {
int expectedChannelCount = 1;
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.