Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cfd1541

Browse filesBrowse files
committed
refactor: drop publish, publishReplay, and ConnectableObservable
1 parent 1d0a032 commit cfd1541
Copy full SHA for cfd1541

File tree

6 files changed

+52
-56
lines changed
Filter options

6 files changed

+52
-56
lines changed

‎apps/demos/src/app/features/experiments/state/composition/parent.component.ts

Copy file name to clipboardExpand all lines: apps/demos/src/app/features/experiments/state/composition/parent.component.ts
+1-4Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ export class RxStateParentCompositionComponent implements OnDestroy {
2626
shareReplay(1)
2727
);
2828

29-
constructor(private source: SourceService) {
30-
// this.hotComposition1$ = this.composition1$.pipe(publishReplay(1)) as ConnectableObservable<any>
31-
// this.subscription = this.hotComposition1$.connect();
32-
}
29+
constructor(private source: SourceService) {}
3330

3431
ngOnDestroy(): void {
3532
this.subscription.unsubscribe();

‎apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts

Copy file name to clipboardExpand all lines: apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts
-12Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,6 @@ export class RxStateParentSubscriptionLessComponent implements OnDestroy {
4242
});
4343
}
4444

45-
/*
46-
(this.state$ as any).connect();
47-
this.stateSources$.next(this.source1$.pipe(tap(console.log)));
48-
49-
state$ = this.stateSources$.pipe(
50-
map(o => isObservable(o) ? o : of(o)),
51-
mergeAll(),
52-
scan((state: ComponentState, slices: Partial<ComponentState>) => ({...state, ...slices}), {}),
53-
publishReplay(1)
54-
);
55-
*/
56-
5745
ngOnDestroy(): void {
5846
this.subscription.unsubscribe();
5947
}

‎apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts

Copy file name to clipboardExpand all lines: apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts
+28-17Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
import { ChangeDetectorRef } from '@angular/core';
2-
import { RxCustomStrategyCredentials, RxStrategyCredentials, strategyHandling } from '@rx-angular/cdk/render-strategies';
3-
import { ConnectableObservable, EMPTY, isObservable, Observable, of, ReplaySubject, Subject } from 'rxjs';
2+
import {
3+
RxCustomStrategyCredentials,
4+
RxStrategyCredentials,
5+
strategyHandling,
6+
} from '@rx-angular/cdk/render-strategies';
7+
import {
8+
Connectable,
9+
connectable,
10+
EMPTY,
11+
isObservable,
12+
Observable,
13+
of,
14+
ReplaySubject,
15+
Subject,
16+
} from 'rxjs';
417
import {
518
catchError,
619
distinctUntilChanged,
720
map,
821
merge as mergeWith,
922
mergeAll,
10-
publishReplay,
1123
switchAll,
1224
} from 'rxjs/operators';
1325

@@ -39,34 +51,33 @@ export function createRenderAware<U>(cfg: {
3951
getCdRef: (k: RxNotification<U>) => ChangeDetectorRef;
4052
getContext: (k?: RxNotification<U>) => any;
4153
}): RenderAware<U | undefined | null> {
42-
4354
const strategyName$ = new ReplaySubject<Observable<string>>(1);
4455
const strategyHandling$ = strategyHandling(
4556
cfg.defaultStrategyName,
4657
cfg.strategies
4758
);
4859
const templateTriggerSubject = new Subject<Observable<RxNotification<U>>>();
49-
const templateTrigger$ = templateTriggerSubject.pipe(
50-
mergeAll()
51-
);
60+
const templateTrigger$ = templateTriggerSubject.pipe(mergeAll());
5261

5362
const observablesFromTemplate$ = new ReplaySubject<Observable<U>>(1);
54-
const renderingEffect$ =
63+
const renderingEffect$ = connectable(
5564
observablesFromTemplate$.pipe(
56-
map(o => isObservable(o) ? o : of(o)),
65+
map((o) => (isObservable(o) ? o : of(o))),
5766
distinctUntilChanged(),
5867
switchAll(),
5968
distinctUntilChanged(),
6069
rxMaterialize(),
6170
mergeWith(templateTrigger$ || EMPTY),
62-
/*observeTemplateByNotificationKind(cfg.templateObserver),
63-
applyStrategy(strategy$, cfg.getContext, cfg.getCdRef),*/
64-
catchError(e => {
71+
catchError((e) => {
6572
console.error(e);
6673
return EMPTY;
67-
}),
68-
publishReplay()
69-
);
74+
})
75+
),
76+
{
77+
connector: () => new ReplaySubject(),
78+
resetOnDisconnect: false,
79+
}
80+
);
7081

7182
return {
7283
strategy$: strategyHandling$.strategy$,
@@ -80,8 +91,8 @@ export function createRenderAware<U>(cfg: {
8091
templateTriggerSubject.next(trigger$);
8192
},
8293
subscribe: () => {
83-
return (renderingEffect$ as ConnectableObservable<any>).connect();
94+
return (renderingEffect$ as Connectable<any>).connect();
8495
},
85-
rendered$: renderingEffect$ as any
96+
rendered$: renderingEffect$ as any,
8697
};
8798
}

‎apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts

Copy file name to clipboardExpand all lines: apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts
+11-10Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import {
22
BehaviorSubject,
3-
ConnectableObservable,
3+
connectable,
4+
Connectable,
45
EMPTY,
56
merge,
67
Observable,
78
queueScheduler,
9+
ReplaySubject,
810
Subject,
911
Subscribable,
1012
Subscription,
@@ -14,9 +16,8 @@ import {
1416
distinctUntilChanged,
1517
mergeAll,
1618
observeOn,
17-
publish,
18-
publishReplay,
1919
scan,
20+
share,
2021
tap,
2122
withLatestFrom,
2223
} from 'rxjs/operators';
@@ -58,11 +59,13 @@ export function createAccumulationObservable<T extends object>(
5859
(newState) => (compositionObservable.state = newState),
5960
(error) => console.error(error)
6061
),
61-
// @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to
6262
catchError((e) => EMPTY),
63-
publish()
63+
share()
6464
);
65-
const state$: Observable<T> = signal$.pipe(publishReplay(1));
65+
const state$: Observable<T> = connectable(signal$, {
66+
connector: () => new ReplaySubject<T>(1),
67+
resetOnDisconnect: false,
68+
});
6669
const compositionObservable: Accumulator<T> = {
6770
state: {} as T,
6871
signal$,
@@ -92,10 +95,8 @@ export function createAccumulationObservable<T extends object>(
9295
}
9396

9497
function subscribe(): Subscription {
95-
const sub = (compositionObservable.signal$ as ConnectableObservable<T>).connect();
96-
sub.add(
97-
(compositionObservable.state$ as ConnectableObservable<T>).connect()
98-
);
98+
const sub = (compositionObservable.signal$ as Connectable<T>).connect();
99+
sub.add((compositionObservable.state$ as Connectable<T>).connect());
99100
sub.add(() => {
100101
accumulatorObservable.complete();
101102
stateObservables.complete();

‎libs/state/effects/src/lib/effects.service.ts

Copy file name to clipboardExpand all lines: libs/state/effects/src/lib/effects.service.ts
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export class RxEffects implements OnDestroy, OnDestroy$ {
7474
private static nextId = 0;
7575
readonly _hooks$ = new Subject<DestroyProp>();
7676
private readonly observables$ = new Subject<Observable<unknown>>();
77-
// we have to use publish here to make it hot (composition happens without subscriber)
77+
// we have to use share here to make it hot (composition happens without subscriber)
7878
private readonly effects$ = this.observables$.pipe(mergeAll(), share());
7979
private readonly subscription = this.effects$.subscribe();
8080
onDestroy$: Observable<boolean> = this._hooks$.pipe(toHook('destroy'));

‎libs/state/selections/src/lib/accumulation-observable.ts

Copy file name to clipboardExpand all lines: libs/state/selections/src/lib/accumulation-observable.ts
+11-12Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import {
22
BehaviorSubject,
3-
ConnectableObservable,
3+
Connectable,
4+
connectable,
45
EMPTY,
56
merge,
67
Observable,
78
queueScheduler,
9+
ReplaySubject,
810
Subject,
911
Subscription,
1012
} from 'rxjs';
@@ -13,9 +15,8 @@ import {
1315
distinctUntilChanged,
1416
mergeAll,
1517
observeOn,
16-
publish,
17-
publishReplay,
1818
scan,
19+
share,
1920
tap,
2021
withLatestFrom,
2122
} from 'rxjs/operators';
@@ -47,11 +48,13 @@ export function createAccumulationObservable<T extends object>(
4748
(newState) => (compositionObservable.state = newState),
4849
(error) => console.error(error)
4950
),
50-
// @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to
5151
catchError((e) => EMPTY),
52-
publish()
52+
share()
5353
);
54-
const state$: Observable<T> = signal$.pipe(publishReplay(1));
54+
const state$: Observable<T> = connectable(signal$, {
55+
connector: () => new ReplaySubject<T>(1),
56+
resetOnDisconnect: false,
57+
});
5558
const compositionObservable: Accumulator<T> = {
5659
state: {} as T,
5760
signal$,
@@ -81,12 +84,8 @@ export function createAccumulationObservable<T extends object>(
8184
}
8285

8386
function subscribe(): Subscription {
84-
const sub = (
85-
compositionObservable.signal$ as ConnectableObservable<T>
86-
).connect();
87-
sub.add(
88-
(compositionObservable.state$ as ConnectableObservable<T>).connect()
89-
);
87+
const sub = (compositionObservable.signal$ as Connectable<T>).connect();
88+
sub.add((compositionObservable.state$ as Connectable<T>).connect());
9089
sub.add(() => {
9190
accumulatorObservable.complete();
9291
stateObservables.complete();

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.