Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: drop publish, publishReplay, and ConnectableObservable #1638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
Loading
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ export class RxStateParentCompositionComponent implements OnDestroy {
shareReplay(1)
);

constructor(private source: SourceService) {
// this.hotComposition1$ = this.composition1$.pipe(publishReplay(1)) as ConnectableObservable<any>
// this.subscription = this.hotComposition1$.connect();
}
constructor(private source: SourceService) {}

ngOnDestroy(): void {
this.subscription.unsubscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,6 @@ export class RxStateParentSubscriptionLessComponent implements OnDestroy {
});
}

/*
(this.state$ as any).connect();
this.stateSources$.next(this.source1$.pipe(tap(console.log)));

state$ = this.stateSources$.pipe(
map(o => isObservable(o) ? o : of(o)),
mergeAll(),
scan((state: ComponentState, slices: Partial<ComponentState>) => ({...state, ...slices}), {}),
publishReplay(1)
);
*/

ngOnDestroy(): void {
this.subscription.unsubscribe();
}
Expand Down
47 changes: 29 additions & 18 deletions 47 apps/demos/src/app/rx-angular-pocs/cdk/render-aware/render-aware.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import { ChangeDetectorRef } from '@angular/core';
import { RxCustomStrategyCredentials, RxStrategyCredentials, strategyHandling } from '@rx-angular/cdk/render-strategies';
import { ConnectableObservable, EMPTY, isObservable, Observable, of, ReplaySubject, Subject } from 'rxjs';
import {
RxCustomStrategyCredentials,
RxStrategyCredentials,
strategyHandling,
} from '@rx-angular/cdk/render-strategies';
import {
Connectable,
connectable,
EMPTY,
isObservable,
Observable,
of,
ReplaySubject,
Subject,
} from 'rxjs';
import {
catchError,
distinctUntilChanged,
map,
merge as mergeWith,
mergeWith,
mergeAll,
publishReplay,
switchAll,
} from 'rxjs/operators';

Expand Down Expand Up @@ -39,34 +51,33 @@ export function createRenderAware<U>(cfg: {
getCdRef: (k: RxNotification<U>) => ChangeDetectorRef;
getContext: (k?: RxNotification<U>) => any;
}): RenderAware<U | undefined | null> {

const strategyName$ = new ReplaySubject<Observable<string>>(1);
const strategyHandling$ = strategyHandling(
cfg.defaultStrategyName,
cfg.strategies
);
const templateTriggerSubject = new Subject<Observable<RxNotification<U>>>();
const templateTrigger$ = templateTriggerSubject.pipe(
mergeAll()
);
const templateTrigger$ = templateTriggerSubject.pipe(mergeAll());

const observablesFromTemplate$ = new ReplaySubject<Observable<U>>(1);
const renderingEffect$ =
const renderingEffect$ = connectable(
observablesFromTemplate$.pipe(
map(o => isObservable(o) ? o : of(o)),
map((o) => (isObservable(o) ? o : of(o))),
distinctUntilChanged(),
switchAll(),
distinctUntilChanged(),
rxMaterialize(),
mergeWith(templateTrigger$ || EMPTY),
/*observeTemplateByNotificationKind(cfg.templateObserver),
applyStrategy(strategy$, cfg.getContext, cfg.getCdRef),*/
catchError(e => {
catchError((e) => {
console.error(e);
return EMPTY;
}),
publishReplay()
);
})
),
{
connector: () => new ReplaySubject(),
hoebbelsB marked this conversation as resolved.
Show resolved Hide resolved
resetOnDisconnect: false,
}
);

return {
strategy$: strategyHandling$.strategy$,
Expand All @@ -80,8 +91,8 @@ export function createRenderAware<U>(cfg: {
templateTriggerSubject.next(trigger$);
},
subscribe: () => {
return (renderingEffect$ as ConnectableObservable<any>).connect();
return (renderingEffect$ as Connectable<any>).connect();
},
rendered$: renderingEffect$ as any
rendered$: renderingEffect$ as any,
};
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {
BehaviorSubject,
ConnectableObservable,
connectable,
Connectable,
EMPTY,
merge,
Observable,
queueScheduler,
ReplaySubject,
Subject,
Subscribable,
Subscription,
Expand All @@ -14,8 +16,6 @@ import {
distinctUntilChanged,
mergeAll,
observeOn,
publish,
publishReplay,
scan,
tap,
withLatestFrom,
Expand All @@ -41,28 +41,35 @@ export function createAccumulationObservable<T extends object>(
stateSlices = new Subject<Partial<T>>(),
accumulatorObservable = new BehaviorSubject(defaultAccumulator)
): Accumulator<T> {
const signal$ = merge(
stateObservables.pipe(
distinctUntilChanged(),
mergeAll(),
observeOn(queueScheduler)
const signal$ = connectable(
merge(
stateObservables.pipe(
distinctUntilChanged(),
mergeAll(),
observeOn(queueScheduler)
),
stateSlices.pipe(observeOn(queueScheduler))
).pipe(
withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))),
scan(
(state, [slice, stateAccumulator]) => stateAccumulator(state, slice),
{} as T
),
tap({
next: (newState) => (compositionObservable.state = newState),
error: (error) => console.error(error),
}),
catchError((e) => EMPTY)
),
stateSlices.pipe(observeOn(queueScheduler))
).pipe(
withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))),
scan(
(state, [slice, stateAccumulator]) => stateAccumulator(state, slice),
{} as T
),
tap(
(newState) => (compositionObservable.state = newState),
(error) => console.error(error)
),
// @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to
catchError((e) => EMPTY),
publish()
{
connector: () => new Subject<T>(),
resetOnDisconnect: false,
}
);
const state$: Observable<T> = signal$.pipe(publishReplay(1));
const state$: Observable<T> = connectable(signal$, {
connector: () => new ReplaySubject<T>(1),
resetOnDisconnect: false,
});
const compositionObservable: Accumulator<T> = {
state: {} as T,
signal$,
Expand Down Expand Up @@ -92,10 +99,8 @@ export function createAccumulationObservable<T extends object>(
}

function subscribe(): Subscription {
const sub = (compositionObservable.signal$ as ConnectableObservable<T>).connect();
sub.add(
(compositionObservable.state$ as ConnectableObservable<T>).connect()
);
const sub = (compositionObservable.signal$ as Connectable<T>).connect();
sub.add((compositionObservable.state$ as Connectable<T>).connect());
sub.add(() => {
accumulatorObservable.complete();
stateObservables.complete();
Expand Down
2 changes: 1 addition & 1 deletion 2 libs/state/effects/src/lib/effects.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class RxEffects implements OnDestroy, OnDestroy$ {
private static nextId = 0;
readonly _hooks$ = new Subject<DestroyProp>();
private readonly observables$ = new Subject<Observable<unknown>>();
// we have to use publish here to make it hot (composition happens without subscriber)
// we have to use share here to make it hot (composition happens without subscriber)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment even true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably it is meant in a different way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it is not true? share will multicast the stream (hot), so the computation is only made once even if there are multiple subscribers. Here is an example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

composition happens without subscriber is not true for share, only for connectable after calling the connect method :)

private readonly effects$ = this.observables$.pipe(mergeAll(), share());
private readonly subscription = this.effects$.subscribe();
onDestroy$: Observable<boolean> = this._hooks$.pipe(toHook('destroy'));
Expand Down
61 changes: 32 additions & 29 deletions 61 libs/state/selections/src/lib/accumulation-observable.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {
BehaviorSubject,
ConnectableObservable,
Connectable,
connectable,
EMPTY,
merge,
Observable,
queueScheduler,
ReplaySubject,
Subject,
Subscription,
} from 'rxjs';
Expand All @@ -13,8 +15,6 @@ import {
distinctUntilChanged,
mergeAll,
observeOn,
publish,
publishReplay,
scan,
tap,
withLatestFrom,
Expand All @@ -30,28 +30,35 @@ export function createAccumulationObservable<T extends object>(
stateSlices = new Subject<Partial<T>>(),
accumulatorObservable = new BehaviorSubject(defaultAccumulator)
): Accumulator<T> {
const signal$ = merge(
stateObservables.pipe(
distinctUntilChanged(),
mergeAll(),
observeOn(queueScheduler)
const signal$ = connectable(
merge(
stateObservables.pipe(
distinctUntilChanged(),
mergeAll(),
observeOn(queueScheduler)
),
stateSlices.pipe(observeOn(queueScheduler))
).pipe(
withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))),
scan(
(state, [slice, stateAccumulator]) => stateAccumulator(state, slice),
{} as T
),
tap({
next: (newState) => (compositionObservable.state = newState),
error: (error) => console.error(error),
}),
catchError((e) => EMPTY)
),
stateSlices.pipe(observeOn(queueScheduler))
).pipe(
withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))),
scan(
(state, [slice, stateAccumulator]) => stateAccumulator(state, slice),
{} as T
),
tap(
(newState) => (compositionObservable.state = newState),
(error) => console.error(error)
),
// @Notice We catch the error here as it get lost in between `publish` and `publishReplay`. We return empty to
catchError((e) => EMPTY),
publish()
{
connector: () => new Subject<T>(),
resetOnDisconnect: false,
hoebbelsB marked this conversation as resolved.
Show resolved Hide resolved
}
);
const state$: Observable<T> = signal$.pipe(publishReplay(1));
const state$: Observable<T> = connectable(signal$, {
connector: () => new ReplaySubject<T>(1),
resetOnDisconnect: false,
});
const compositionObservable: Accumulator<T> = {
state: {} as T,
signal$,
Expand Down Expand Up @@ -81,12 +88,8 @@ export function createAccumulationObservable<T extends object>(
}

function subscribe(): Subscription {
const sub = (
compositionObservable.signal$ as ConnectableObservable<T>
).connect();
sub.add(
(compositionObservable.state$ as ConnectableObservable<T>).connect()
);
const sub = (compositionObservable.signal$ as Connectable<T>).connect();
sub.add((compositionObservable.state$ as Connectable<T>).connect());
sub.add(() => {
accumulatorObservable.complete();
stateObservables.complete();
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.