forked from ringcentral/pubnub-jtools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSubscribeWorker.java
More file actions
114 lines (104 loc) · 4.54 KB
/
SubscribeWorker.java
File metadata and controls
114 lines (104 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.pubnub.api;
import java.net.SocketTimeoutException;
import java.util.Hashtable;
import java.util.Vector;
import static com.pubnub.api.PubnubError.*;
class SubscribeWorker extends AbstractSubscribeWorker {
private Exception excp = null;
SubscribeWorker(Vector _requestQueue, int connectionTimeout,
int requestTimeout, int maxRetries, int retryInterval, int windowInterval, Hashtable headers) {
super(_requestQueue, connectionTimeout, requestTimeout,
maxRetries, retryInterval, windowInterval, headers);
}
void process(HttpRequest hreq) {
HttpResponse hresp = null;
int currentRetryAttempt = (hreq.isDar())?1:maxRetries;
log.verbose("disconnectAndResubscribe is " + hreq.isDar());
if (hreq.getWorker() != null) {
log.verbose("Request placed by worker " + hreq.getWorker().getThread().getName());
if (hreq.getWorker()._die) {
log.verbose("The thread which placed the request has died, so ignore the request : " + hreq.getWorker().getThread().getName());
return;
}
}
hreq.setWorker(this);
if (!hreq.isSubzero() && windowInterval != 0) {
try {
Thread.sleep(windowInterval);
} catch (InterruptedException e) {
}
}
boolean sleep = false;
while (!_die && currentRetryAttempt <= maxRetries) {
if (sleep) {
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
}
}
sleep = true;
try {
log.debug(hreq.getUrl());
hresp = httpclient.fetch(hreq.getUrl(), hreq.getHeaders());
if (hresp != null
&& HttpUtil.checkResponseSuccess(hresp
.getStatusCode())) {
currentRetryAttempt = 1;
break;
}
} catch (SocketTimeoutException e) {
log.verbose("No Traffic , Read Timeout Exception in Fetch : " + e.toString());
if (_die) {
log.verbose("Asked to Die, Don't do back from DAR processing");
break;
}
if (hreq.isDar()) {
hreq.getResponseHandler().handleBackFromDar(hreq);
return;
}
hreq.getResponseHandler().handleError(hreq, getErrorObject(PNERROBJ_SUBSCRIBE_TIMEOUT, 1));
return;
} catch (PubnubException e) {
excp = e;
switch (e.getPubnubError().errorCode) {
case PNERR_FORBIDDEN:
case PNERR_UNAUTHORIZED:
log.verbose("Authentication Failure : " + e.toString());
currentRetryAttempt = maxRetries + 1;
break;
default:
log.verbose("Retry Attempt : " + ((currentRetryAttempt == maxRetries)?"last":currentRetryAttempt)
+ " Exception in Fetch : " + e.toString());
currentRetryAttempt++;
break;
}
} catch (Exception e) {
excp = e;
log.verbose("Retry Attempt : " + ((currentRetryAttempt == maxRetries)?"last":currentRetryAttempt)
+ " Exception in Fetch : " + e.toString());
currentRetryAttempt++;
}
}
if (!_die) {
if (hresp == null) {
log.debug("Error in fetching url : " + hreq.getUrl());
if (hreq.isDar()) {
log.verbose("Exhausted number of retries");
hreq.getResponseHandler().handleTimeout(hreq);
} else {
if (excp != null && excp instanceof PubnubException && ((PubnubException) excp).getPubnubError() != null) {
hreq.getResponseHandler().handleError(hreq, ((PubnubException) excp).getPubnubError());
} else {
hreq.getResponseHandler().handleError(hreq, getErrorObject(PNERROBJ_HTTP_ERROR, 1));
}
}
return;
}
log.debug(hresp.getResponse());
hreq.getResponseHandler().handleResponse(hreq, hresp.getResponse());
}
}
public void shutdown() {
if (httpclient != null) httpclient.shutdown();
}
}