@@ -4,6 +4,7 @@ import { ApisApi, HttpError, V1APIResource, V1APIResourceList, V1DeleteOptions,
4
4
import { KubeConfig } from './config' ;
5
5
import ObjectSerializer from './serializer' ;
6
6
import { KubernetesListObject , KubernetesObject } from './types' ;
7
+ import { RequestResult , Watch } from './watch' ;
7
8
8
9
/** Union type of body types returned by KubernetesObjectApi. */
9
10
type KubernetesObjectResponseBody =
@@ -44,6 +45,60 @@ enum KubernetesPatchStrategies {
44
45
StrategicMergePatch = 'application/strategic-merge-patch+json' ,
45
46
}
46
47
48
+ /**
49
+ * Describes the type of an watch event.
50
+ * Object is:
51
+ * - If Type is Added or Modified: the new state of the object.
52
+ * - If Type is Deleted: the state of the object immediately before deletion.
53
+ * - If Type is Bookmark: the object (instance of a type being watched) where
54
+ * only ResourceVersion field is set. On successful restart of watch from a
55
+ * bookmark resourceVersion, client is guaranteed to not get repeat event
56
+ * nor miss any events.
57
+ * - If Type is Error: *api.Status is recommended; other types may make sense
58
+ * depending on context.
59
+ */
60
+ export enum KubernetesEventType {
61
+ ADDED = 'ADDED' ,
62
+ MODIFIED = 'MODIFIED' ,
63
+ DELETED = 'DELETED' ,
64
+ BOOKMARK = 'BOOKMARK' ,
65
+ ERROR = 'ERROR' ,
66
+ }
67
+
68
+ export type WatchObject < T extends KubernetesObject | KubernetesObject > = {
69
+ type : KubernetesEventType ;
70
+ object : T ;
71
+ } ;
72
+
73
+ export type WatchCallback < T extends KubernetesObject | KubernetesObject > = (
74
+ phase : KubernetesEventType ,
75
+ apiObj : T ,
76
+ watchObj ?: WatchObject < T > ,
77
+ ) => void ;
78
+
79
+ export type WatchOptions = {
80
+ /**
81
+ * To mitigate the impact of short history window,
82
+ * the Kubernetes API provides a watch event named BOOKMARK.
83
+ * It is a special kind of event to mark that all changes
84
+ * up to a given resourceVersion the client is requesting
85
+ * have already been sent.
86
+ *
87
+ * See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
88
+ */
89
+ allowWatchBookmarks ?: boolean ;
90
+ /**
91
+ * Start watch at the given resource version.
92
+ * Starting at a specific resource version means that only events
93
+ * starting from that versions are included in the watch stream.
94
+ */
95
+ resourceVersion ?: string ;
96
+ } ;
97
+
98
+ export type WatchResult = {
99
+ abort : ( ) => void ;
100
+ } ;
101
+
47
102
/**
48
103
* Dynamically construct Kubernetes API request URIs so client does not have to know what type of object it is acting
49
104
* on.
@@ -60,6 +115,7 @@ export class KubernetesObjectApi extends ApisApi {
60
115
public static makeApiClient ( kc : KubeConfig ) : KubernetesObjectApi {
61
116
const client = kc . makeApiClient ( KubernetesObjectApi ) ;
62
117
client . setDefaultNamespace ( kc ) ;
118
+ client . watcher = new Watch ( kc ) ;
63
119
return client ;
64
120
}
65
121
@@ -69,6 +125,8 @@ export class KubernetesObjectApi extends ApisApi {
69
125
/** Cache resource API response. */
70
126
protected apiVersionResourceCache : Record < string , V1APIResourceList > = { } ;
71
127
128
+ protected watcher ?: Watch ;
129
+
72
130
/**
73
131
* Create any Kubernetes resource.
74
132
* @param spec Kubernetes resource spec.
@@ -474,6 +532,76 @@ export class KubernetesObjectApi extends ApisApi {
474
532
return this . requestPromise ( localVarRequestOptions ) ;
475
533
}
476
534
535
+ /**
536
+ * Watches the given resources and calls provided callback with the parsed json object
537
+ * upon event received over the watcher connection.
538
+ *
539
+ * @param resource defines the resources to watch. Namespace is optional.
540
+ * Undefined namespace means to watch all namespaces.
541
+ * @param options Optional options that are passed to the watch request.
542
+ * @param callback callback function that is called with the parsed json object upon event received.
543
+ * @param done callback is called either when connection is closed or when there
544
+ * is an error. In either case, watcher takes care of properly closing the
545
+ * underlaying connection so that it doesn't leak any resources.
546
+ *
547
+ * @returns WatchResult object that can be used to abort the watch.
548
+ */
549
+ public async watch < T extends KubernetesObject | KubernetesObject > ( {
550
+ resource,
551
+ options = { } ,
552
+ callback,
553
+ done,
554
+ } : {
555
+ resource : {
556
+ apiVersion : string ;
557
+ kind : string ;
558
+ namespace ?: string ;
559
+ } ;
560
+ options ?: WatchOptions ;
561
+ callback : WatchCallback < T > ;
562
+ done : ( err : unknown ) => void ;
563
+ } ) : Promise < WatchResult > {
564
+ if ( ! this . watcher ) {
565
+ throw new Error ( 'Watcher not initialized' ) ;
566
+ }
567
+ const resourcePath = new URL (
568
+ await this . specUriPath (
569
+ {
570
+ apiVersion : resource . apiVersion ,
571
+ kind : resource . kind ,
572
+ metadata : {
573
+ namespace : resource . namespace ,
574
+ } ,
575
+ } ,
576
+ 'list' ,
577
+ ) ,
578
+ ) . pathname ;
579
+ const type = await this . getSerializationType ( resource . apiVersion , resource . kind ) ;
580
+ const cb : WatchCallback < T > = ( phase : KubernetesEventType , apiObj : T , watchObj ?: WatchObject < T > ) => {
581
+ const obj = ObjectSerializer . deserialize ( apiObj , type ) ;
582
+ callback (
583
+ phase ,
584
+ obj ,
585
+ watchObj
586
+ ? {
587
+ ...watchObj ,
588
+ object : obj ,
589
+ }
590
+ : undefined ,
591
+ ) ;
592
+ } ;
593
+ const res : RequestResult = await this . watcher . watch (
594
+ resourcePath ,
595
+ options ,
596
+ // required to convert to less strict type.
597
+ cb as ( phase : string , apiObj : any , watchObj ?: any ) => void ,
598
+ done ,
599
+ ) ;
600
+ return {
601
+ abort : ( ) => res . abort ( ) ,
602
+ } ;
603
+ }
604
+
477
605
/** Set default namespace from current context, if available. */
478
606
protected setDefaultNamespace ( kc : KubeConfig ) : string {
479
607
if ( kc . currentContext ) {
0 commit comments