Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit fc8e520

Browse filesBrowse files
authored
fix: fix flaky tests and non blocking semaphore (#1365)
* fix: fix race condition in non blocking semaphore * update comment
1 parent 3c45191 commit fc8e520
Copy full SHA for fc8e520

File tree

Expand file treeCollapse file tree

3 files changed

+42
-66
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+42
-66
lines changed

‎gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java

Copy file name to clipboardExpand all lines: gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
+12-13Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
/** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
3737
class NonBlockingSemaphore implements Semaphore64 {
38-
private AtomicLong availablePermits;
38+
private AtomicLong acquiredPermits;
3939
private AtomicLong limit;
4040

4141
private static void checkNotNegative(long l) {
@@ -44,17 +44,18 @@ private static void checkNotNegative(long l) {
4444

4545
NonBlockingSemaphore(long permits) {
4646
checkNotNegative(permits);
47-
this.availablePermits = new AtomicLong(permits);
47+
this.acquiredPermits = new AtomicLong(0);
4848
this.limit = new AtomicLong(permits);
4949
}
5050

5151
@Override
5252
public void release(long permits) {
5353
checkNotNegative(permits);
5454
while (true) {
55-
long old = availablePermits.get();
55+
long old = acquiredPermits.get();
5656
// TODO: throw exceptions when the permits overflow
57-
if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) {
57+
long newAcquired = Math.max(0, old - permits);
58+
if (acquiredPermits.compareAndSet(old, newAcquired)) {
5859
return;
5960
}
6061
}
@@ -64,11 +65,11 @@ public void release(long permits) {
6465
public boolean acquire(long permits) {
6566
checkNotNegative(permits);
6667
while (true) {
67-
long old = availablePermits.get();
68-
if (old < permits) {
68+
long old = acquiredPermits.get();
69+
if (old + permits > limit.get()) {
6970
return false;
7071
}
71-
if (availablePermits.compareAndSet(old, old - permits)) {
72+
if (acquiredPermits.compareAndSet(old, old + permits)) {
7273
return true;
7374
}
7475
}
@@ -79,13 +80,13 @@ public boolean acquirePartial(long permits) {
7980
checkNotNegative(permits);
8081
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
8182
// limit. This will allow individual large requests to be sent. Please note that this behavior
82-
// will result in availablePermits going negative.
83+
// will result in acquiredPermits going over limit.
8384
while (true) {
84-
long old = availablePermits.get();
85-
if (old < Math.min(limit.get(), permits)) {
85+
long old = acquiredPermits.get();
86+
if (old + permits > limit.get() && old > 0) {
8687
return false;
8788
}
88-
if (availablePermits.compareAndSet(old, old - permits)) {
89+
if (acquiredPermits.compareAndSet(old, old + permits)) {
8990
return true;
9091
}
9192
}
@@ -94,7 +95,6 @@ public boolean acquirePartial(long permits) {
9495
@Override
9596
public void increasePermitLimit(long permits) {
9697
checkNotNegative(permits);
97-
availablePermits.addAndGet(permits);
9898
limit.addAndGet(permits);
9999
}
100100

@@ -106,7 +106,6 @@ public void reducePermitLimit(long reduction) {
106106
long oldLimit = limit.get();
107107
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
108108
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
109-
availablePermits.addAndGet(-reduction);
110109
return;
111110
}
112111
}

‎gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java

Copy file name to clipboardExpand all lines: gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java
+6-21Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636

3737
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
3838
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
39-
import java.util.ArrayList;
40-
import java.util.List;
4139
import java.util.concurrent.TimeUnit;
4240
import org.junit.Test;
4341
import org.junit.runner.RunWith;
@@ -71,27 +69,14 @@ public void testCreateEvent() {
7169
}
7270

7371
@Test
74-
public void testGetLastEvent() throws InterruptedException {
75-
final FlowControlEventStats stats = new FlowControlEventStats();
76-
final long currentTime = System.currentTimeMillis();
72+
public void testGetLastEvent() {
73+
FlowControlEventStats stats = new FlowControlEventStats();
74+
long currentTime = System.currentTimeMillis();
7775

78-
List<Thread> threads = new ArrayList<>();
7976
for (int i = 1; i <= 10; i++) {
80-
final int timeElapsed = i;
81-
Thread t =
82-
new Thread() {
83-
@Override
84-
public void run() {
85-
stats.recordFlowControlEvent(
86-
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
87-
}
88-
};
89-
threads.add(t);
90-
t.start();
91-
}
92-
93-
for (Thread t : threads) {
94-
t.join(10);
77+
int timeElapsed = i;
78+
stats.recordFlowControlEvent(
79+
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
9580
}
9681

9782
assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());

‎gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java

Copy file name to clipboardExpand all lines: gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
+24-32Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@
4444
import java.util.ArrayList;
4545
import java.util.List;
4646
import java.util.Random;
47+
import java.util.concurrent.ExecutionException;
4748
import java.util.concurrent.ExecutorService;
4849
import java.util.concurrent.Executors;
4950
import java.util.concurrent.Future;
5051
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.TimeoutException;
5253
import java.util.concurrent.atomic.AtomicInteger;
53-
import org.junit.Ignore;
5454
import org.junit.Test;
5555
import org.junit.runner.RunWith;
5656
import org.junit.runners.JUnit4;
@@ -500,7 +500,6 @@ private void testRejectedReserveRelease(
500500
}
501501

502502
flowController.release(1, 1);
503-
504503
flowController.reserve(maxElementCount, maxNumBytes);
505504
flowController.release(maxElementCount, maxNumBytes);
506505
}
@@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
523522
final AtomicInteger totalDecreased = new AtomicInteger(0);
524523
final AtomicInteger releasedCounter = new AtomicInteger(0);
525524

526-
List<Thread> reserveThreads =
525+
List<Future> reserveThreads =
527526
testConcurrentUpdates(
528-
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
529-
for (Thread t : reserveThreads) {
530-
t.join(200);
527+
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
528+
for (Future t : reserveThreads) {
529+
t.get(200, TimeUnit.MILLISECONDS);
531530
}
532531
assertEquals(reserveThreads.size(), releasedCounter.get());
533532
assertTrue(totalIncreased.get() > 0);
@@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
539538
testBlockingReserveRelease(flowController, 0, expectedValue);
540539
}
541540

542-
// This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359
543-
// is fixed.
544-
@Ignore
545541
@Test
546542
public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
547543
int initialValue = 5000;
@@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
559555
AtomicInteger totalIncreased = new AtomicInteger(0);
560556
AtomicInteger totalDecreased = new AtomicInteger(0);
561557
AtomicInteger releasedCounter = new AtomicInteger(0);
562-
List<Thread> reserveThreads =
558+
List<Future> reserveThreads =
563559
testConcurrentUpdates(
564560
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
565-
for (Thread t : reserveThreads) {
566-
t.join(200);
561+
for (Future t : reserveThreads) {
562+
t.get(200, TimeUnit.MILLISECONDS);
567563
}
568564
assertEquals(reserveThreads.size(), releasedCounter.get());
569565
assertTrue(totalIncreased.get() > 0);
@@ -698,8 +694,7 @@ public void run() {
698694
};
699695
// blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
700696
flowController.reserve(5, 1);
701-
ExecutorService executor = Executors.newCachedThreadPool();
702-
Future<?> finished1 = executor.submit(runnable);
697+
Future<?> finished1 = Executors.newSingleThreadExecutor().submit(runnable);
703698
try {
704699
finished1.get(50, TimeUnit.MILLISECONDS);
705700
fail("reserve should block");
@@ -722,7 +717,7 @@ public void run() {
722717

723718
// Similar to blocked by element, test blocking by bytes.
724719
flowController.reserve(1, 5);
725-
Future<?> finished2 = executor.submit(runnable);
720+
Future<?> finished2 = Executors.newSingleThreadExecutor().submit(runnable);
726721
try {
727722
finished2.get(50, TimeUnit.MILLISECONDS);
728723
fail("reserve should block");
@@ -739,15 +734,15 @@ public void run() {
739734
.isAtLeast(currentTime);
740735
}
741736

742-
private List<Thread> testConcurrentUpdates(
737+
private List<Future> testConcurrentUpdates(
743738
final FlowController flowController,
744739
final int increaseStepRange,
745740
final int decreaseStepRange,
746741
final int reserve,
747742
final AtomicInteger totalIncreased,
748743
final AtomicInteger totalDecreased,
749744
final AtomicInteger releasedCounter)
750-
throws InterruptedException {
745+
throws InterruptedException, TimeoutException, ExecutionException {
751746
final Random random = new Random();
752747
Runnable increaseRunnable =
753748
new Runnable() {
@@ -779,22 +774,19 @@ public void run() {
779774
}
780775
}
781776
};
782-
List<Thread> updateThreads = new ArrayList<>();
783-
List<Thread> reserveReleaseThreads = new ArrayList<>();
784-
for (int i = 0; i < 20; i++) {
785-
Thread increase = new Thread(increaseRunnable);
786-
Thread decrease = new Thread(decreaseRunnable);
787-
Thread reserveRelease = new Thread(reserveReleaseRunnable);
788-
updateThreads.add(increase);
789-
updateThreads.add(decrease);
790-
reserveReleaseThreads.add(reserveRelease);
791-
increase.start();
792-
decrease.start();
793-
reserveRelease.start();
777+
List<Future> updateFuture = new ArrayList<>();
778+
List<Future> reserveReleaseFuture = new ArrayList<>();
779+
ExecutorService executors = Executors.newFixedThreadPool(10);
780+
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
781+
for (int i = 0; i < 5; i++) {
782+
updateFuture.add(executors.submit(increaseRunnable));
783+
updateFuture.add(executors.submit(decreaseRunnable));
784+
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
794785
}
795-
for (Thread t : updateThreads) {
796-
t.join(10);
786+
for (Future t : updateFuture) {
787+
t.get(50, TimeUnit.MILLISECONDS);
797788
}
798-
return reserveReleaseThreads;
789+
executors.shutdown();
790+
return reserveReleaseFuture;
799791
}
800792
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.