Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 33011f1

Browse filesBrowse files
Merge pull request #109050 from MadhavJivrajani/client-go-retry
rest: Ensure response body is fully read and closed before retry Kubernetes-commit: 97bf2986cdeae0e7da70659d70375e0770b14a5e
2 parents 8a672f0 + 01ab7fb commit 33011f1
Copy full SHA for 33011f1

File tree

Expand file treeCollapse file tree

5 files changed

+154
-59
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+154
-59
lines changed

‎go.mod

Copy file name to clipboardExpand all lines: go.mod
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ require (
3434
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
3535
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
3636
google.golang.org/protobuf v1.27.1
37-
k8s.io/api v0.0.0-20220331140502-02c2207317b5
37+
k8s.io/api v0.0.0-20220402025220-2de699698342
3838
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444
3939
k8s.io/klog/v2 v2.60.1
4040
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
@@ -44,6 +44,6 @@ require (
4444
)
4545

4646
replace (
47-
k8s.io/api => k8s.io/api v0.0.0-20220331140502-02c2207317b5
47+
k8s.io/api => k8s.io/api v0.0.0-20220402025220-2de699698342
4848
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444
4949
)

‎go.sum

Copy file name to clipboardExpand all lines: go.sum
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -628,8 +628,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
628628
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
629629
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
630630
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
631-
k8s.io/api v0.0.0-20220331140502-02c2207317b5 h1:shLc1jkM9dNz8zPSm2YeE/XOpp1UP36AZOt5DjspI+0=
632-
k8s.io/api v0.0.0-20220331140502-02c2207317b5/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE=
631+
k8s.io/api v0.0.0-20220402025220-2de699698342 h1:xFpsdy7RmF2niTyB76yUyPubqMa5m9/L9sswkdyhONo=
632+
k8s.io/api v0.0.0-20220402025220-2de699698342/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE=
633633
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444 h1:whQmS3GtF822OUer+LPJnMFKn6kPfuJOCM/3xUuATIY=
634634
k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM=
635635
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=

‎rest/request.go

Copy file name to clipboardExpand all lines: rest/request.go
+11-14Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -614,15 +614,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
614614
}
615615
url := r.URL().String()
616616
for {
617-
req, err := r.newHTTPRequest(ctx)
618-
if err != nil {
619-
return nil, err
620-
}
621-
622617
if err := r.retry.Before(ctx, r); err != nil {
623618
return nil, r.retry.WrapPreviousError(err)
624619
}
625620

621+
req, err := r.newHTTPRequest(ctx)
622+
if err != nil {
623+
return nil, err
624+
}
626625
resp, err := client.Do(req)
627626
updateURLMetrics(ctx, r, resp, err)
628627
r.retry.After(ctx, r, resp, err)
@@ -722,18 +721,17 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
722721

723722
url := r.URL().String()
724723
for {
724+
if err := r.retry.Before(ctx, r); err != nil {
725+
return nil, err
726+
}
727+
725728
req, err := r.newHTTPRequest(ctx)
726729
if err != nil {
727730
return nil, err
728731
}
729732
if r.body != nil {
730733
req.Body = ioutil.NopCloser(r.body)
731734
}
732-
733-
if err := r.retry.Before(ctx, r); err != nil {
734-
return nil, err
735-
}
736-
737735
resp, err := client.Do(req)
738736
updateURLMetrics(ctx, r, resp, err)
739737
r.retry.After(ctx, r, resp, err)
@@ -859,14 +857,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
859857

860858
// Right now we make about ten retry attempts if we get a Retry-After response.
861859
for {
860+
if err := r.retry.Before(ctx, r); err != nil {
861+
return r.retry.WrapPreviousError(err)
862+
}
862863
req, err := r.newHTTPRequest(ctx)
863864
if err != nil {
864865
return err
865866
}
866-
867-
if err := r.retry.Before(ctx, r); err != nil {
868-
return r.retry.WrapPreviousError(err)
869-
}
870867
resp, err := client.Do(req)
871868
updateURLMetrics(ctx, r, resp, err)
872869
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.

‎rest/request_test.go

Copy file name to clipboardExpand all lines: rest/request_test.go
+118-9Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ func TestRequestWatch(t *testing.T) {
938938
},
939939
Err: true,
940940
ErrFn: func(err error) bool {
941-
return apierrors.IsInternalError(err)
941+
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
942942
},
943943
},
944944
{
@@ -954,7 +954,10 @@ func TestRequestWatch(t *testing.T) {
954954
serverReturns: []responseErr{
955955
{response: nil, err: io.EOF},
956956
},
957-
Empty: true,
957+
Err: true,
958+
ErrFn: func(err error) bool {
959+
return !apierrors.IsInternalError(err)
960+
},
958961
},
959962
{
960963
name: "max retries 2, server always returns a response with Retry-After header",
@@ -1130,7 +1133,7 @@ func TestRequestStream(t *testing.T) {
11301133
},
11311134
Err: true,
11321135
ErrFn: func(err error) bool {
1133-
return apierrors.IsInternalError(err)
1136+
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
11341137
},
11351138
},
11361139
{
@@ -1371,8 +1374,6 @@ func (b *testBackoffManager) Sleep(d time.Duration) {
13711374
}
13721375

13731376
func TestCheckRetryClosesBody(t *testing.T) {
1374-
// unblock CI until http://issue.k8s.io/108906 is resolved in 1.24
1375-
t.Skip("http://issue.k8s.io/108906")
13761377
count := 0
13771378
ch := make(chan struct{})
13781379
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@@ -2435,6 +2436,7 @@ func TestRequestWithRetry(t *testing.T) {
24352436
body io.Reader
24362437
serverReturns responseErr
24372438
errExpected error
2439+
errContains string
24382440
transformFuncInvokedExpected int
24392441
roundTripInvokedExpected int
24402442
}{
@@ -2451,7 +2453,7 @@ func TestRequestWithRetry(t *testing.T) {
24512453
body: &readSeeker{err: io.EOF},
24522454
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
24532455
errExpected: nil,
2454-
transformFuncInvokedExpected: 1,
2456+
transformFuncInvokedExpected: 0,
24552457
roundTripInvokedExpected: 1,
24562458
},
24572459
{
@@ -2474,7 +2476,7 @@ func TestRequestWithRetry(t *testing.T) {
24742476
name: "server returns retryable err, request body Seek returns error, retry aborted",
24752477
body: &readSeeker{err: io.EOF},
24762478
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2477-
errExpected: io.ErrUnexpectedEOF,
2479+
errContains: "failed to reset the request body while retrying a request: EOF",
24782480
transformFuncInvokedExpected: 0,
24792481
roundTripInvokedExpected: 1,
24802482
},
@@ -2517,8 +2519,15 @@ func TestRequestWithRetry(t *testing.T) {
25172519
if test.transformFuncInvokedExpected != transformFuncInvoked {
25182520
t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
25192521
}
2520-
if test.errExpected != unWrap(err) {
2521-
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2522+
switch {
2523+
case test.errExpected != nil:
2524+
if test.errExpected != unWrap(err) {
2525+
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2526+
}
2527+
case len(test.errContains) > 0:
2528+
if !strings.Contains(err.Error(), test.errContains) {
2529+
t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
2530+
}
25222531
}
25232532
})
25242533
}
@@ -3531,3 +3540,103 @@ func TestTransportConcurrency(t *testing.T) {
35313540
})
35323541
}
35333542
}
3543+
3544+
// TODO: see if we can consolidate the other trackers into one.
3545+
type requestBodyTracker struct {
3546+
io.ReadSeeker
3547+
f func(string)
3548+
}
3549+
3550+
func (t *requestBodyTracker) Read(p []byte) (int, error) {
3551+
t.f("Request.Body.Read")
3552+
return t.ReadSeeker.Read(p)
3553+
}
3554+
3555+
func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) {
3556+
t.f("Request.Body.Seek")
3557+
return t.ReadSeeker.Seek(offset, whence)
3558+
}
3559+
3560+
type responseBodyTracker struct {
3561+
io.ReadCloser
3562+
f func(string)
3563+
}
3564+
3565+
func (t *responseBodyTracker) Read(p []byte) (int, error) {
3566+
t.f("Response.Body.Read")
3567+
return t.ReadCloser.Read(p)
3568+
}
3569+
3570+
func (t *responseBodyTracker) Close() error {
3571+
t.f("Response.Body.Close")
3572+
return t.ReadCloser.Close()
3573+
}
3574+
3575+
type recorder struct {
3576+
order []string
3577+
}
3578+
3579+
func (r *recorder) record(call string) {
3580+
r.order = append(r.order, call)
3581+
}
3582+
3583+
func TestRequestBodyResetOrder(t *testing.T) {
3584+
recorder := &recorder{}
3585+
respBodyTracker := &responseBodyTracker{
3586+
ReadCloser: nil, // the server will fill it
3587+
f: recorder.record,
3588+
}
3589+
3590+
var attempts int
3591+
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3592+
defer func() {
3593+
attempts++
3594+
}()
3595+
3596+
// read the request body.
3597+
ioutil.ReadAll(req.Body)
3598+
3599+
// first attempt, we send a retry-after
3600+
if attempts == 0 {
3601+
resp := retryAfterResponse()
3602+
respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{}))
3603+
resp.Body = respBodyTracker
3604+
return resp, nil
3605+
}
3606+
3607+
return &http.Response{StatusCode: http.StatusOK}, nil
3608+
})
3609+
3610+
reqBodyTracker := &requestBodyTracker{
3611+
ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most.
3612+
f: recorder.record,
3613+
}
3614+
req := &Request{
3615+
verb: "POST",
3616+
body: reqBodyTracker,
3617+
c: &RESTClient{
3618+
content: defaultContentConfig(),
3619+
Client: client,
3620+
},
3621+
backoff: &noSleepBackOff{},
3622+
retry: &withRetry{maxRetries: 1},
3623+
}
3624+
3625+
req.Do(context.Background())
3626+
3627+
expected := []string{
3628+
// 1st attempt: the server handler reads the request body
3629+
"Request.Body.Read",
3630+
// the server sends a retry-after, client reads the
3631+
// response body, and closes it
3632+
"Response.Body.Read",
3633+
"Response.Body.Close",
3634+
// client retry logic seeks to the beginning of the request body
3635+
"Request.Body.Seek",
3636+
// 2nd attempt: the server reads the request body
3637+
"Request.Body.Read",
3638+
}
3639+
if !reflect.DeepEqual(expected, recorder.order) {
3640+
t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order))
3641+
}
3642+
}

‎rest/with_retry.go

Copy file name to clipboardExpand all lines: rest/with_retry.go
+21-32Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,12 @@ type WithRetry interface {
7878
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
7979

8080
// Before should be invoked prior to each attempt, including
81-
// the first one. if an error is returned, the request
82-
// should be aborted immediately.
81+
// the first one. If an error is returned, the request should
82+
// be aborted immediately.
83+
//
84+
// Before may also be additionally responsible for preparing
85+
// the request for the next retry, namely in terms of resetting
86+
// the request body in case it has been read.
8387
Before(ctx context.Context, r *Request) error
8488

8589
// After should be invoked immediately after an attempt is made.
@@ -194,46 +198,18 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
194198
r.retryAfter.Wait = time.Duration(seconds) * time.Second
195199
r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
196200

197-
if err := r.prepareForNextRetry(ctx, restReq); err != nil {
198-
klog.V(4).Infof("Could not retry request - %v", err)
199-
return false
200-
}
201-
202201
return true
203202
}
204203

205-
// prepareForNextRetry is responsible for carrying out operations that need
206-
// to be completed before the next retry is initiated:
207-
// - if the request context is already canceled there is no need to
208-
// retry, the function will return ctx.Err().
209-
// - we need to seek to the beginning of the request body before we
210-
// initiate the next retry, the function should return an error if
211-
// it fails to do so.
212-
func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error {
213-
if ctx.Err() != nil {
214-
return ctx.Err()
215-
}
216-
217-
// Ensure the response body is fully read and closed before
218-
// we reconnect, so that we reuse the same TCP connection.
219-
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
220-
if _, err := seeker.Seek(0, 0); err != nil {
221-
return fmt.Errorf("can't Seek() back to beginning of body for %T", request)
222-
}
223-
}
224-
225-
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
226-
return nil
227-
}
228-
229204
func (r *withRetry) Before(ctx context.Context, request *Request) error {
205+
// If the request context is already canceled there
206+
// is no need to retry.
230207
if ctx.Err() != nil {
231208
r.trackPreviousError(ctx.Err())
232209
return ctx.Err()
233210
}
234211

235212
url := request.URL()
236-
237213
// r.retryAfter represents the retry after parameters calculated
238214
// from the (response, err) tuple from the last attempt, so 'Before'
239215
// can apply these retry after parameters prior to the next attempt.
@@ -245,6 +221,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
245221
return nil
246222
}
247223

224+
// At this point we've made atleast one attempt, post which the response
225+
// body should have been fully read and closed in order for it to be safe
226+
// to reset the request body before we reconnect, in order for us to reuse
227+
// the same TCP connection.
228+
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
229+
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
230+
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
231+
r.trackPreviousError(err)
232+
return err
233+
}
234+
}
235+
248236
// if we are here, we have made attempt(s) al least once before.
249237
if request.backoff != nil {
250238
// TODO(tkashem) with default set to use exponential backoff
@@ -263,6 +251,7 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
263251
return err
264252
}
265253

254+
klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
266255
return nil
267256
}
268257

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.