Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a69d851

Browse filesBrowse files
authored
Merge pull request #2429 from brendandburns/revit
Cherry-pick in two watch changes from the 0.x branch, clean up tests, fix list -> resourceVersion handling
2 parents 3bd21ad + 2a961a0 commit a69d851
Copy full SHA for a69d851

File tree

Expand file treeCollapse file tree

2 files changed

+122
-33
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+122
-33
lines changed

‎src/cache.ts

Copy file name to clipboardExpand all lines: src/cache.ts
+26-7Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,21 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3434
private readonly watch: Watch;
3535
private readonly listFn: ListPromise<T>;
3636
private readonly labelSelector?: string;
37+
private readonly fieldSelector?: string;
3738

3839
public constructor(
3940
path: string,
4041
watch: Watch,
4142
listFn: ListPromise<T>,
4243
autoStart: boolean = true,
4344
labelSelector?: string,
45+
fieldSelector?: string,
4446
) {
4547
this.path = path;
4648
this.watch = watch;
4749
this.listFn = listFn;
4850
this.labelSelector = labelSelector;
51+
this.fieldSelector = fieldSelector;
4952

5053
this.callbackCache[ADD] = [];
5154
this.callbackCache[UPDATE] = [];
@@ -140,7 +143,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
140143

141144
private async doneHandler(err: any): Promise<void> {
142145
this._stop();
143-
if (err && err.statusCode === 410) {
146+
if (
147+
err &&
148+
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
149+
) {
144150
this.resourceVersion = '';
145151
} else if (err) {
146152
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -162,17 +168,21 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
162168
}
163169
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
164170
this.addOrUpdateItems(list.items);
165-
this.resourceVersion = list.metadata!.resourceVersion || '';
171+
this.resourceVersion = list.metadata ? list.metadata!.resourceVersion || '' : '';
166172
}
167173
const queryParams = {
168174
resourceVersion: this.resourceVersion,
169175
} as {
170176
resourceVersion: string | undefined;
171177
labelSelector: string | undefined;
178+
fieldSelector: string | undefined;
172179
};
173180
if (this.labelSelector !== undefined) {
174181
queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string');
175182
}
183+
if (this.fieldSelector !== undefined) {
184+
queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string');
185+
}
176186
this.request = await this.watch.watch(
177187
this.path,
178188
queryParams,
@@ -182,6 +192,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
182192
}
183193

184194
private addOrUpdateItems(items: T[]): void {
195+
if (items === undefined || items === null) {
196+
return;
197+
}
185198
items.forEach((obj: T) => {
186199
addOrUpdateObject(
187200
this.objects,
@@ -192,13 +205,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
192205
});
193206
}
194207

195-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
208+
private async watchHandler(
209+
phase: string,
210+
obj: T,
211+
watchObj?: { type: string; object: KubernetesObject },
212+
): Promise<void> {
196213
switch (phase) {
197214
case 'ERROR':
198215
if ((obj as { code?: number }).code === 410) {
199216
this.resourceVersion = '';
200217
}
201-
break;
218+
// We don't restart here, because it should be handled by the watch exiting if necessary
219+
return;
202220
case 'ADDED':
203221
case 'MODIFIED':
204222
addOrUpdateObject(
@@ -215,15 +233,16 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
215233
// nothing to do, here for documentation, mostly.
216234
break;
217235
}
218-
if (watchObj && watchObj.metadata) {
219-
this.resourceVersion = watchObj.metadata.resourceVersion;
220-
}
236+
this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : '';
221237
}
222238
}
223239

224240
// exported for testing
225241
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
226242
const objects: CacheMap<T> = new Map();
243+
if (newObjects === undefined || newObjects === null) {
244+
return objects;
245+
}
227246
// build up the new list
228247
for (const obj of newObjects) {
229248
let namespaceObjects = objects.get(obj.metadata!.namespace || '');

‎src/cache_test.ts

Copy file name to clipboardExpand all lines: src/cache_test.ts
+96-26Lines changed: 96 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ describe('ListWatchCache', () => {
235235
watchHandler('ADDED', {
236236
metadata: {
237237
name: 'name3',
238+
resourceVersion: 'blah',
238239
} as V1ObjectMeta,
239240
} as V1Namespace);
240241

@@ -245,40 +246,28 @@ describe('ListWatchCache', () => {
245246
} as V1ObjectMeta,
246247
} as V1Namespace);
247248

248-
watchHandler(
249-
'DELETED',
250-
{
251-
metadata: {
252-
name: 'name2',
253-
resourceVersion: 'blah',
254-
} as V1ObjectMeta,
255-
} as V1Namespace,
256-
{
257-
metadata: {
258-
resourceVersion: '54321',
259-
},
260-
},
261-
);
249+
watchHandler('DELETED', {
250+
metadata: {
251+
name: 'name2',
252+
resourceVersion: '54321',
253+
} as V1ObjectMeta,
254+
} as V1Namespace);
262255

263256
const [addResult, updateResult, deleteResult] = await Promise.all([
264257
addPromise,
265258
updatePromise,
266259
deletePromise,
267260
]);
268-
deepStrictEqual(addResult.metadata, { name: 'name3' });
261+
deepStrictEqual(addResult.metadata, { name: 'name3', resourceVersion: 'blah' });
269262
deepStrictEqual(updateResult.metadata, { name: 'name3', resourceVersion: 'baz' });
270-
deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: 'blah' });
263+
deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: '54321' });
271264
strictEqual(informer.latestResourceVersion(), '54321');
272265

273-
watchHandler(
274-
'BOOKMARK',
275-
{},
276-
{
277-
metadata: {
278-
resourceVersion: '5454',
279-
},
266+
watchHandler('BOOKMARK', {
267+
metadata: {
268+
resourceVersion: '5454',
280269
},
281-
);
270+
});
282271
strictEqual(informer.latestResourceVersion(), '5454');
283272
});
284273

@@ -1205,9 +1194,10 @@ describe('ListWatchCache', () => {
12051194
{
12061195
metadata: {
12071196
name: 'name3',
1197+
resourceVersion: '23456',
12081198
} as V1ObjectMeta,
12091199
} as V1Namespace,
1210-
{ metadata: { resourceVersion: '23456' } },
1200+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12111201
);
12121202

12131203
await informer.stop();
@@ -1259,9 +1249,87 @@ describe('ListWatchCache', () => {
12591249
{
12601250
metadata: {
12611251
name: 'name3',
1252+
resourceVersion: '23456',
12621253
} as V1ObjectMeta,
12631254
} as V1Namespace,
1264-
{ metadata: { resourceVersion: '23456' } },
1255+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
1256+
);
1257+
1258+
await informer.stop();
1259+
1260+
let errorEmitted = false;
1261+
informer.on('error', () => (errorEmitted = true));
1262+
1263+
promise = new Promise((resolve) => {
1264+
mock.when(
1265+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1266+
).thenCall(() => {
1267+
resolve({});
1268+
});
1269+
});
1270+
1271+
informer.start();
1272+
await promise;
1273+
1274+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1275+
1276+
const object = {
1277+
kind: 'Status',
1278+
apiVersion: 'v1',
1279+
metadata: {},
1280+
status: 'Failure',
1281+
message: 'too old resource version: 12345 (1234)',
1282+
reason: 'Expired',
1283+
code: 410,
1284+
};
1285+
await watchHandler('ERROR', object, { type: 'ERROR', object });
1286+
await doneHandler(null);
1287+
1288+
mock.verify(
1289+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1290+
).thrice();
1291+
strictEqual(errorEmitted, false);
1292+
strictEqual(listCalls, 2);
1293+
});
1294+
1295+
it('should list if the watch errors from the last version', async () => {
1296+
const fakeWatch = mock.mock(Watch);
1297+
1298+
let listCalls = 0;
1299+
const listFn: ListPromise<V1Namespace> = function (): Promise<V1NamespaceList> {
1300+
return new Promise<V1NamespaceList>((resolve, reject) => {
1301+
listCalls++;
1302+
resolve({
1303+
metadata: {
1304+
resourceVersion: '12345',
1305+
} as V1ListMeta,
1306+
items: [],
1307+
} as V1NamespaceList);
1308+
});
1309+
};
1310+
let promise = new Promise((resolve) => {
1311+
mock.when(
1312+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1313+
).thenCall(() => {
1314+
resolve({});
1315+
});
1316+
});
1317+
1318+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1319+
1320+
informer.start();
1321+
await promise;
1322+
1323+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1324+
watchHandler(
1325+
'ADDED',
1326+
{
1327+
metadata: {
1328+
name: 'name3',
1329+
resourceVersion: '23456',
1330+
} as V1ObjectMeta,
1331+
} as V1Namespace,
1332+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12651333
);
12661334

12671335
await informer.stop();
@@ -1335,6 +1403,8 @@ describe('ListWatchCache', () => {
13351403
);
13361404

13371405
await informer.stop();
1406+
strictEqual(listCalls, 1);
1407+
listCalls = 0;
13381408

13391409
let errorEmitted = false;
13401410
informer.on('error', () => (errorEmitted = true));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.