diff --git a/apps/demos/src/app/features/experiments/state/composition/parent.component.ts b/apps/demos/src/app/features/experiments/state/composition/parent.component.ts index 75877ae644..111f0b18a6 100644 --- a/apps/demos/src/app/features/experiments/state/composition/parent.component.ts +++ b/apps/demos/src/app/features/experiments/state/composition/parent.component.ts @@ -26,10 +26,7 @@ export class RxStateParentCompositionComponent implements OnDestroy { shareReplay(1) ); - constructor(private source: SourceService) { - // this.hotComposition1$ = this.composition1$.pipe(publishReplay(1)) as ConnectableObservable - // this.subscription = this.hotComposition1$.connect(); - } + constructor(private source: SourceService) {} ngOnDestroy(): void { this.subscription.unsubscribe(); diff --git a/apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts b/apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts index ba1662a573..6fe6cfb775 100644 --- a/apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts +++ b/apps/demos/src/app/features/experiments/state/subscription-less-interaction/parent.component.ts @@ -42,18 +42,6 @@ export class RxStateParentSubscriptionLessComponent implements OnDestroy { }); } - /* - (this.state$ as any).connect(); - this.stateSources$.next(this.source1$.pipe(tap(console.log))); - - state$ = this.stateSources$.pipe( - map(o => isObservable(o) ? o : of(o)), - mergeAll(), - scan((state: ComponentState, slices: Partial) => ({...state, ...slices}), {}), - publishReplay(1) - ); - */ - ngOnDestroy(): void { this.subscription.unsubscribe(); } diff --git a/apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts b/apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts index 7a22c4afb4..a5958a45f2 100644 --- a/apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts +++ b/apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts @@ -1,13 +1,25 @@ import { ChangeDetectorRef } from '@angular/core'; -import { RxCustomStrategyCredentials, RxStrategyCredentials, strategyHandling } from '@rx-angular/cdk/render-strategies'; -import { ConnectableObservable, EMPTY, isObservable, Observable, of, ReplaySubject, Subject } from 'rxjs'; +import { + RxCustomStrategyCredentials, + RxStrategyCredentials, + strategyHandling, +} from '@rx-angular/cdk/render-strategies'; +import { + Connectable, + connectable, + EMPTY, + isObservable, + Observable, + of, + ReplaySubject, + Subject, +} from 'rxjs'; import { catchError, distinctUntilChanged, map, - merge as mergeWith, + mergeWith, mergeAll, - publishReplay, switchAll, } from 'rxjs/operators'; @@ -39,34 +51,33 @@ export function createRenderAware(cfg: { getCdRef: (k: RxNotification) => ChangeDetectorRef; getContext: (k?: RxNotification) => any; }): RenderAware { - const strategyName$ = new ReplaySubject>(1); const strategyHandling$ = strategyHandling( cfg.defaultStrategyName, cfg.strategies ); const templateTriggerSubject = new Subject>>(); - const templateTrigger$ = templateTriggerSubject.pipe( - mergeAll() - ); + const templateTrigger$ = templateTriggerSubject.pipe(mergeAll()); const observablesFromTemplate$ = new ReplaySubject>(1); - const renderingEffect$ = + const renderingEffect$ = connectable( observablesFromTemplate$.pipe( - map(o => isObservable(o) ? o : of(o)), + map((o) => (isObservable(o) ? o : of(o))), distinctUntilChanged(), switchAll(), distinctUntilChanged(), rxMaterialize(), mergeWith(templateTrigger$ || EMPTY), - /*observeTemplateByNotificationKind(cfg.templateObserver), - applyStrategy(strategy$, cfg.getContext, cfg.getCdRef),*/ - catchError(e => { + catchError((e) => { console.error(e); return EMPTY; - }), - publishReplay() - ); + }) + ), + { + connector: () => new ReplaySubject(), + resetOnDisconnect: false, + } + ); return { strategy$: strategyHandling$.strategy$, @@ -80,8 +91,8 @@ export function createRenderAware(cfg: { templateTriggerSubject.next(trigger$); }, subscribe: () => { - return (renderingEffect$ as ConnectableObservable).connect(); + return (renderingEffect$ as Connectable).connect(); }, - rendered$: renderingEffect$ as any + rendered$: renderingEffect$ as any, }; } diff --git a/apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts b/apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts index 581a4ca6ad..710f725e80 100644 --- a/apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts +++ b/apps/demos/src/app/rx-angular-pocs/cdk/utils/rxjs/observable/stateful-observable.ts @@ -1,10 +1,12 @@ import { BehaviorSubject, - ConnectableObservable, + connectable, + Connectable, EMPTY, merge, Observable, queueScheduler, + ReplaySubject, Subject, Subscribable, Subscription, @@ -14,8 +16,6 @@ import { distinctUntilChanged, mergeAll, observeOn, - publish, - publishReplay, scan, tap, withLatestFrom, @@ -41,28 +41,35 @@ export function createAccumulationObservable( stateSlices = new Subject>(), accumulatorObservable = new BehaviorSubject(defaultAccumulator) ): Accumulator { - const signal$ = merge( - stateObservables.pipe( - distinctUntilChanged(), - mergeAll(), - observeOn(queueScheduler) + const signal$ = connectable( + merge( + stateObservables.pipe( + distinctUntilChanged(), + mergeAll(), + observeOn(queueScheduler) + ), + stateSlices.pipe(observeOn(queueScheduler)) + ).pipe( + withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))), + scan( + (state, [slice, stateAccumulator]) => stateAccumulator(state, slice), + {} as T + ), + tap({ + next: (newState) => (compositionObservable.state = newState), + error: (error) => console.error(error), + }), + catchError((e) => EMPTY) ), - stateSlices.pipe(observeOn(queueScheduler)) - ).pipe( - withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))), - scan( - (state, [slice, stateAccumulator]) => stateAccumulator(state, slice), - {} as T - ), - tap( - (newState) => (compositionObservable.state = newState), - (error) => console.error(error) - ), - // @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to - catchError((e) => EMPTY), - publish() + { + connector: () => new Subject(), + resetOnDisconnect: false, + } ); - const state$: Observable = signal$.pipe(publishReplay(1)); + const state$: Observable = connectable(signal$, { + connector: () => new ReplaySubject(1), + resetOnDisconnect: false, + }); const compositionObservable: Accumulator = { state: {} as T, signal$, @@ -92,10 +99,8 @@ export function createAccumulationObservable( } function subscribe(): Subscription { - const sub = (compositionObservable.signal$ as ConnectableObservable).connect(); - sub.add( - (compositionObservable.state$ as ConnectableObservable).connect() - ); + const sub = (compositionObservable.signal$ as Connectable).connect(); + sub.add((compositionObservable.state$ as Connectable).connect()); sub.add(() => { accumulatorObservable.complete(); stateObservables.complete(); diff --git a/libs/state/effects/src/lib/effects.service.ts b/libs/state/effects/src/lib/effects.service.ts index f477d7100f..62f476242d 100644 --- a/libs/state/effects/src/lib/effects.service.ts +++ b/libs/state/effects/src/lib/effects.service.ts @@ -74,7 +74,7 @@ export class RxEffects implements OnDestroy, OnDestroy$ { private static nextId = 0; readonly _hooks$ = new Subject(); private readonly observables$ = new Subject>(); - // we have to use publish here to make it hot (composition happens without subscriber) + // we have to use share here to make it hot (composition happens without subscriber) private readonly effects$ = this.observables$.pipe(mergeAll(), share()); private readonly subscription = this.effects$.subscribe(); onDestroy$: Observable = this._hooks$.pipe(toHook('destroy')); diff --git a/libs/state/selections/src/lib/accumulation-observable.ts b/libs/state/selections/src/lib/accumulation-observable.ts index 0243734f24..79e9d968a0 100644 --- a/libs/state/selections/src/lib/accumulation-observable.ts +++ b/libs/state/selections/src/lib/accumulation-observable.ts @@ -1,10 +1,12 @@ import { BehaviorSubject, - ConnectableObservable, + Connectable, + connectable, EMPTY, merge, Observable, queueScheduler, + ReplaySubject, Subject, Subscription, } from 'rxjs'; @@ -13,8 +15,6 @@ import { distinctUntilChanged, mergeAll, observeOn, - publish, - publishReplay, scan, tap, withLatestFrom, @@ -30,28 +30,35 @@ export function createAccumulationObservable( stateSlices = new Subject>(), accumulatorObservable = new BehaviorSubject(defaultAccumulator) ): Accumulator { - const signal$ = merge( - stateObservables.pipe( - distinctUntilChanged(), - mergeAll(), - observeOn(queueScheduler) + const signal$ = connectable( + merge( + stateObservables.pipe( + distinctUntilChanged(), + mergeAll(), + observeOn(queueScheduler) + ), + stateSlices.pipe(observeOn(queueScheduler)) + ).pipe( + withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))), + scan( + (state, [slice, stateAccumulator]) => stateAccumulator(state, slice), + {} as T + ), + tap({ + next: (newState) => (compositionObservable.state = newState), + error: (error) => console.error(error), + }), + catchError((e) => EMPTY) ), - stateSlices.pipe(observeOn(queueScheduler)) - ).pipe( - withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))), - scan( - (state, [slice, stateAccumulator]) => stateAccumulator(state, slice), - {} as T - ), - tap( - (newState) => (compositionObservable.state = newState), - (error) => console.error(error) - ), - // @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to - catchError((e) => EMPTY), - publish() + { + connector: () => new Subject(), + resetOnDisconnect: false, + } ); - const state$: Observable = signal$.pipe(publishReplay(1)); + const state$: Observable = connectable(signal$, { + connector: () => new ReplaySubject(1), + resetOnDisconnect: false, + }); const compositionObservable: Accumulator = { state: {} as T, signal$, @@ -81,12 +88,8 @@ export function createAccumulationObservable( } function subscribe(): Subscription { - const sub = ( - compositionObservable.signal$ as ConnectableObservable - ).connect(); - sub.add( - (compositionObservable.state$ as ConnectableObservable).connect() - ); + const sub = (compositionObservable.signal$ as Connectable).connect(); + sub.add((compositionObservable.state$ as Connectable).connect()); sub.add(() => { accumulatorObservable.complete(); stateObservables.complete();