Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

add generic and typed watch to KubernetesObjectApi #1831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions 128 src/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ApisApi, HttpError, V1APIResource, V1APIResourceList, V1DeleteOptions,
import { KubeConfig } from './config';
import ObjectSerializer from './serializer';
import { KubernetesListObject, KubernetesObject } from './types';
import { RequestResult, Watch } from './watch';

/** Union type of body types returned by KubernetesObjectApi. */
type KubernetesObjectResponseBody =
Expand Down Expand Up @@ -44,6 +45,60 @@ enum KubernetesPatchStrategies {
StrategicMergePatch = 'application/strategic-merge-patch+json',
}

/**
* Describes the type of an watch event.
* Object is:
* - If Type is Added or Modified: the new state of the object.
* - If Type is Deleted: the state of the object immediately before deletion.
* - If Type is Bookmark: the object (instance of a type being watched) where
* only ResourceVersion field is set. On successful restart of watch from a
* bookmark resourceVersion, client is guaranteed to not get repeat event
* nor miss any events.
* - If Type is Error: *api.Status is recommended; other types may make sense
* depending on context.
*/
export enum KubernetesEventType {
ADDED = 'ADDED',
MODIFIED = 'MODIFIED',
DELETED = 'DELETED',
BOOKMARK = 'BOOKMARK',
ERROR = 'ERROR',
}

export type WatchObject<T extends KubernetesObject | KubernetesObject> = {
type: KubernetesEventType;
object: T;
};

export type WatchCallback<T extends KubernetesObject | KubernetesObject> = (
phase: KubernetesEventType,
apiObj: T,
watchObj?: WatchObject<T>,
) => void;

export type WatchOptions = {
schrodit marked this conversation as resolved.
Show resolved Hide resolved
/**
* To mitigate the impact of short history window,
* the Kubernetes API provides a watch event named BOOKMARK.
* It is a special kind of event to mark that all changes
* up to a given resourceVersion the client is requesting
* have already been sent.
*
* See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
*/
allowWatchBookmarks?: boolean;
/**
* Start watch at the given resource version.
* Starting at a specific resource version means that only events
* starting from that versions are included in the watch stream.
*/
resourceVersion?: string;
};

export type WatchResult = {
abort: () => void;
};

/**
* Dynamically construct Kubernetes API request URIs so client does not have to know what type of object it is acting
* on.
Expand All @@ -60,6 +115,7 @@ export class KubernetesObjectApi extends ApisApi {
public static makeApiClient(kc: KubeConfig): KubernetesObjectApi {
const client = kc.makeApiClient(KubernetesObjectApi);
client.setDefaultNamespace(kc);
client.watcher = new Watch(kc);
return client;
}

Expand All @@ -69,6 +125,8 @@ export class KubernetesObjectApi extends ApisApi {
/** Cache resource API response. */
protected apiVersionResourceCache: Record<string, V1APIResourceList> = {};

protected watcher?: Watch;

/**
* Create any Kubernetes resource.
* @param spec Kubernetes resource spec.
Expand Down Expand Up @@ -474,6 +532,76 @@ export class KubernetesObjectApi extends ApisApi {
return this.requestPromise(localVarRequestOptions);
}

/**
* Watches the given resources and calls provided callback with the parsed json object
* upon event received over the watcher connection.
*
* @param resource defines the resources to watch. Namespace is optional.
* Undefined namespace means to watch all namespaces.
* @param options Optional options that are passed to the watch request.
* @param callback callback function that is called with the parsed json object upon event received.
* @param done callback is called either when connection is closed or when there
* is an error. In either case, watcher takes care of properly closing the
* underlaying connection so that it doesn't leak any resources.
*
* @returns WatchResult object that can be used to abort the watch.
*/
public async watch<T extends KubernetesObject | KubernetesObject>({
resource,
options = {},
callback,
done,
}: {
resource: {
apiVersion: string;
kind: string;
namespace?: string;
};
options?: WatchOptions;
callback: WatchCallback<T>;
done: (err: unknown) => void;
schrodit marked this conversation as resolved.
Show resolved Hide resolved
}): Promise<WatchResult> {
if (!this.watcher) {
throw new Error('Watcher not initialized');
}
const resourcePath = new URL(
await this.specUriPath(
{
apiVersion: resource.apiVersion,
kind: resource.kind,
metadata: {
namespace: resource.namespace,
},
},
'list',
),
).pathname;
const type = await this.getSerializationType(resource.apiVersion, resource.kind);
const cb: WatchCallback<T> = (phase: KubernetesEventType, apiObj: T, watchObj?: WatchObject<T>) => {
const obj = ObjectSerializer.deserialize(apiObj, type);
callback(
phase,
obj,
watchObj
? {
...watchObj,
object: obj,
}
: undefined,
);
};
const res: RequestResult = await this.watcher.watch(
resourcePath,
options,
// required to convert to less strict type.
cb as (phase: string, apiObj: any, watchObj?: any) => void,
done,
);
return {
abort: () => res.abort(),
};
}

/** Set default namespace from current context, if available. */
protected setDefaultNamespace(kc: KubeConfig): string {
if (kc.currentContext) {
Expand Down
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.