Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2799861

Browse filesBrowse files
Support cancellation of NodeServices invocations
1 parent f358d8e commit 2799861
Copy full SHA for 2799861

File tree

Expand file treeCollapse file tree

7 files changed

+86
-26
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+86
-26
lines changed
Open diff view settings
Collapse file

‎src/Microsoft.AspNetCore.NodeServices/HostingModels/HttpNodeInstance.cs‎

Copy file name to clipboardExpand all lines: src/Microsoft.AspNetCore.NodeServices/HostingModels/HttpNodeInstance.cs
+9-6Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Net.Http;
55
using System.Text;
66
using System.Text.RegularExpressions;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Microsoft.Extensions.Logging;
910
using Newtonsoft.Json;
@@ -57,15 +58,17 @@ private static string MakeCommandLineOptions(int port)
5758
return $"--port {port}";
5859
}
5960

60-
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
61+
protected override async Task<T> InvokeExportAsync<T>(
62+
NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
6163
{
6264
var payloadJson = JsonConvert.SerializeObject(invocationInfo, jsonSerializerSettings);
6365
var payload = new StringContent(payloadJson, Encoding.UTF8, "application/json");
64-
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload);
66+
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload, cancellationToken);
6567

6668
if (!response.IsSuccessStatusCode)
6769
{
68-
var responseErrorString = await response.Content.ReadAsStringAsync();
70+
// Unfortunately there's no true way to cancel ReadAsStringAsync calls, hence AbandonIfCancelled
71+
var responseErrorString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
6972
throw new Exception("Call to Node module failed with error: " + responseErrorString);
7073
}
7174

@@ -81,11 +84,11 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
8184
typeof(T).FullName);
8285
}
8386

84-
var responseString = await response.Content.ReadAsStringAsync();
87+
var responseString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
8588
return (T)(object)responseString;
8689

8790
case "application/json":
88-
var responseJson = await response.Content.ReadAsStringAsync();
91+
var responseJson = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
8992
return JsonConvert.DeserializeObject<T>(responseJson, jsonSerializerSettings);
9093

9194
case "application/octet-stream":
@@ -97,7 +100,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
97100
typeof(T).FullName + ". Instead you must use the generic type System.IO.Stream.");
98101
}
99102

100-
return (T)(object)(await response.Content.ReadAsStreamAsync());
103+
return (T)(object)(await response.Content.ReadAsStreamAsync().OrThrowOnCancellation(cancellationToken));
101104

102105
default:
103106
throw new InvalidOperationException("Unexpected response content type: " + responseContentType.MediaType);
Collapse file
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace Microsoft.AspNetCore.NodeServices.HostingModels
56
{
67
public interface INodeInstance : IDisposable
78
{
8-
Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args);
9+
Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args);
910
}
1011
}
Collapse file

‎src/Microsoft.AspNetCore.NodeServices/HostingModels/OutOfProcessNodeInstance.cs‎

Copy file name to clipboardExpand all lines: src/Microsoft.AspNetCore.NodeServices/HostingModels/OutOfProcessNodeInstance.cs
+11-5Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Diagnostics;
44
using System.IO;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.Extensions.Logging;
89

@@ -67,7 +68,8 @@ public OutOfProcessNodeInstance(
6768
ConnectToInputOutputStreams();
6869
}
6970

70-
public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args)
71+
public async Task<T> InvokeExportAsync<T>(
72+
CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args)
7173
{
7274
if (_nodeProcess.HasExited || _nodeProcessNeedsRestart)
7375
{
@@ -79,15 +81,17 @@ public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOr
7981
throw new NodeInvocationException(message, null, nodeInstanceUnavailable: true);
8082
}
8183

82-
// Wait until the connection is established. This will throw if the connection fails to initialize.
83-
await _connectionIsReadySource.Task;
84+
// Wait until the connection is established. This will throw if the connection fails to initialize,
85+
// or if cancellation is requested first. Note that we can't really cancel the "establishing connection"
86+
// task because that's shared with all callers, but we can stop waiting for it if this call is cancelled.
87+
await _connectionIsReadySource.Task.OrThrowOnCancellation(cancellationToken);
8488

8589
return await InvokeExportAsync<T>(new NodeInvocationInfo
8690
{
8791
ModuleName = moduleName,
8892
ExportedFunctionName = exportNameOrNull,
8993
Args = args
90-
});
94+
}, cancellationToken);
9195
}
9296

9397
public void Dispose()
@@ -96,7 +100,9 @@ public void Dispose()
96100
GC.SuppressFinalize(this);
97101
}
98102

99-
protected abstract Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo);
103+
protected abstract Task<T> InvokeExportAsync<T>(
104+
NodeInvocationInfo invocationInfo,
105+
CancellationToken cancellationToken);
100106

101107
// This method is virtual, as it provides a way to override the NODE_PATH or the path to node.exe
102108
protected virtual ProcessStartInfo PrepareNodeProcessStartInfo(
Collapse file

‎src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs‎

Copy file name to clipboardExpand all lines: src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs
+15-10Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public SocketNodeInstance(string projectPath, string[] watchFileExtensions, stri
5757
_socketAddress = socketAddress;
5858
}
5959

60-
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
60+
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
6161
{
6262
if (_connectionHasFailed)
6363
{
@@ -70,7 +70,12 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
7070

7171
if (_virtualConnectionClient == null)
7272
{
73-
await EnsureVirtualConnectionClientCreated();
73+
// Although we could pass the cancellationToken into EnsureVirtualConnectionClientCreated and
74+
// have it signal cancellations upstream, that would be a bad thing to do, because all callers
75+
// wait for the same connection task. There's no reason why the first caller should have the
76+
// special ability to cancel the connection process in a way that would affect subsequent
77+
// callers. So, each caller just independently stops awaiting connection if that call is cancelled.
78+
await EnsureVirtualConnectionClientCreated().OrThrowOnCancellation(cancellationToken);
7479
}
7580

7681
// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
@@ -83,7 +88,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
8388
virtualConnection = _virtualConnectionClient.OpenVirtualConnection();
8489

8590
// Send request
86-
await WriteJsonLineAsync(virtualConnection, invocationInfo);
91+
await WriteJsonLineAsync(virtualConnection, invocationInfo, cancellationToken);
8792

8893
// Determine what kind of response format is expected
8994
if (typeof(T) == typeof(Stream))
@@ -96,7 +101,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
96101
else
97102
{
98103
// Parse and return non-streamed JSON response
99-
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection);
104+
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection, cancellationToken);
100105
if (response.ErrorMessage != null)
101106
{
102107
throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails);
@@ -163,27 +168,27 @@ protected override void Dispose(bool disposing)
163168
base.Dispose(disposing);
164169
}
165170

166-
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject)
171+
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject, CancellationToken cancellationToken)
167172
{
168173
var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings);
169174
var bytes = Encoding.UTF8.GetBytes(json + '\n');
170-
await stream.WriteAsync(bytes, 0, bytes.Length);
175+
await stream.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
171176
}
172177

173-
private static async Task<T> ReadJsonAsync<T>(Stream stream)
178+
private static async Task<T> ReadJsonAsync<T>(Stream stream, CancellationToken cancellationToken)
174179
{
175-
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream));
180+
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream, cancellationToken));
176181
return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
177182
}
178183

179-
private static async Task<byte[]> ReadAllBytesAsync(Stream input)
184+
private static async Task<byte[]> ReadAllBytesAsync(Stream input, CancellationToken cancellationToken)
180185
{
181186
byte[] buffer = new byte[16 * 1024];
182187

183188
using (var ms = new MemoryStream())
184189
{
185190
int read;
186-
while ((read = await input.ReadAsync(buffer, 0, buffer.Length)) > 0)
191+
while ((read = await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
187192
{
188193
ms.Write(buffer, 0, read);
189194
}
Collapse file

‎src/Microsoft.AspNetCore.NodeServices/INodeServices.cs‎

Copy file name to clipboardExpand all lines: src/Microsoft.AspNetCore.NodeServices/INodeServices.cs
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace Microsoft.AspNetCore.NodeServices
56
{
67
public interface INodeServices : IDisposable
78
{
89
Task<T> InvokeAsync<T>(string moduleName, params object[] args);
10+
Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args);
911

1012
Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args);
13+
Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args);
14+
1115

1216
[Obsolete("Use InvokeAsync instead")]
1317
Task<T> Invoke<T>(string moduleName, params object[] args);
Collapse file

‎src/Microsoft.AspNetCore.NodeServices/NodeServicesImpl.cs‎

Copy file name to clipboardExpand all lines: src/Microsoft.AspNetCore.NodeServices/NodeServicesImpl.cs
+15-4Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using Microsoft.AspNetCore.NodeServices.HostingModels;
45

@@ -34,19 +35,29 @@ public Task<T> InvokeAsync<T>(string moduleName, params object[] args)
3435
return InvokeExportAsync<T>(moduleName, null, args);
3536
}
3637

38+
public Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args)
39+
{
40+
return InvokeExportAsync<T>(cancellationToken, moduleName, null, args);
41+
}
42+
3743
public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args)
3844
{
39-
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: true);
45+
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, CancellationToken.None);
46+
}
47+
48+
public Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args)
49+
{
50+
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, cancellationToken);
4051
}
4152

42-
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry)
53+
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry, CancellationToken cancellationToken)
4354
{
4455
ThrowAnyOutstandingDelayedDisposalException();
4556
var nodeInstance = GetOrCreateCurrentNodeInstance();
4657

4758
try
4859
{
49-
return await nodeInstance.InvokeExportAsync<T>(moduleName, exportedFunctionName, args);
60+
return await nodeInstance.InvokeExportAsync<T>(cancellationToken, moduleName, exportedFunctionName, args);
5061
}
5162
catch (NodeInvocationException ex)
5263
{
@@ -69,7 +80,7 @@ public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, st
6980
// One the next call, don't allow retries, because we could get into an infinite retry loop, or a long retry
7081
// loop that masks an underlying problem. A newly-created Node instance should be able to accept invocations,
7182
// or something more serious must be wrong.
72-
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: false);
83+
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ false, cancellationToken);
7384
}
7485
else
7586
{
Collapse file
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace Microsoft.AspNetCore.NodeServices
5+
{
6+
internal static class TaskExtensions
7+
{
8+
public static Task OrThrowOnCancellation(this Task task, CancellationToken cancellationToken)
9+
{
10+
return task.IsCompleted
11+
? task // If the task is already completed, no need to wrap it in a further layer of task
12+
: task.ContinueWith(
13+
_ => {}, // If the task completes, allow execution to continue
14+
cancellationToken,
15+
TaskContinuationOptions.ExecuteSynchronously,
16+
TaskScheduler.Default);
17+
}
18+
19+
public static Task<T> OrThrowOnCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
20+
{
21+
return task.IsCompleted
22+
? task // If the task is already completed, no need to wrap it in a further layer of task
23+
: task.ContinueWith(
24+
t => t.Result, // If the task completes, pass through its result
25+
cancellationToken,
26+
TaskContinuationOptions.ExecuteSynchronously,
27+
TaskScheduler.Default);
28+
}
29+
}
30+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.