From 1047da59c6a4f434d493a798bd147d3dd481c5a9 Mon Sep 17 00:00:00 2001 From: Vitaliy Date: Wed, 2 Aug 2017 11:09:38 +0300 Subject: [PATCH 01/21] Fixed incorrect result output Fixed incorrect result output for exampleSubscribeOn() --- tests/java/itrx/chapter4/scheduling/SubscribeOnExample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/java/itrx/chapter4/scheduling/SubscribeOnExample.java b/tests/java/itrx/chapter4/scheduling/SubscribeOnExample.java index ebae61d..b2ea853 100644 --- a/tests/java/itrx/chapter4/scheduling/SubscribeOnExample.java +++ b/tests/java/itrx/chapter4/scheduling/SubscribeOnExample.java @@ -74,10 +74,10 @@ public void exampleSubscribeOn() { System.out.println("Finished main: " + Thread.currentThread().getId()); // Main: 1 - // Created on 1 + // Created on 11 // Received 1 on 11 // Received 2 on 11 - // Finished main: 11 + // Finished main: 1 } public void exampleIntervalThread() { From 135d65f7c657233128fdfb0eb4bf09081a9d2db4 Mon Sep 17 00:00:00 2001 From: Vitaliy Date: Wed, 2 Aug 2017 11:56:22 +0300 Subject: [PATCH 02/21] Fixed result output Fixed result output for method exampleRefcount(), also little bit improved. --- .../hotandcold/ConnectableObservableExample.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/java/itrx/chapter3/hotandcold/ConnectableObservableExample.java b/tests/java/itrx/chapter3/hotandcold/ConnectableObservableExample.java index 37212b4..4c522eb 100644 --- a/tests/java/itrx/chapter3/hotandcold/ConnectableObservableExample.java +++ b/tests/java/itrx/chapter3/hotandcold/ConnectableObservableExample.java @@ -114,7 +114,7 @@ public void exampleRefcount() throws InterruptedException { Thread.sleep(500); Subscription s2 = cold.subscribe(i -> System.out.println("Second: " + i)); Thread.sleep(500); - System.out.println("Unsubscribe first"); + System.out.println("Unsubscribe second"); s2.unsubscribe(); Thread.sleep(500); System.out.println("Unsubscribe first"); @@ -123,6 +123,9 @@ public void exampleRefcount() throws InterruptedException { System.out.println("First connection again"); Thread.sleep(500); s1 = cold.subscribe(i -> System.out.println("First: " + i)); + Thread.sleep(900); + System.out.println("Unsubscribe first again"); + s1.unsubscribe(); // First: 0 // First: 1 @@ -130,8 +133,9 @@ public void exampleRefcount() throws InterruptedException { // Second: 2 // First: 3 // Second: 3 - // Unsubscribe first // First: 4 + // Second: 4 + // Unsubscribe second // First: 5 // First: 6 // Unsubscribe first @@ -140,7 +144,7 @@ public void exampleRefcount() throws InterruptedException { // First: 1 // First: 2 // First: 3 - // First: 4 + // Unsubscribe first again } From a4cc48d6ba1551f77a1a38d19daf23f21fe528e4 Mon Sep 17 00:00:00 2001 From: Vitaliy Date: Wed, 2 Aug 2017 12:21:25 +0300 Subject: [PATCH 03/21] Fixed result output Fixed result output in method exampleMutlicast(), made some improvements. --- .../itrx/chapter3/hotandcold/MulticastExample.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/java/itrx/chapter3/hotandcold/MulticastExample.java b/tests/java/itrx/chapter3/hotandcold/MulticastExample.java index eca36ef..60a512e 100644 --- a/tests/java/itrx/chapter3/hotandcold/MulticastExample.java +++ b/tests/java/itrx/chapter3/hotandcold/MulticastExample.java @@ -46,7 +46,7 @@ public void exampleMutlicast() throws InterruptedException { Thread.sleep(500); Subscription s2 = cold.subscribe(i -> System.out.println("Second: " + i)); Thread.sleep(500); - System.out.println("Unsubscribe first"); + System.out.println("Unsubscribe second"); s2.unsubscribe(); Thread.sleep(500); System.out.println("Unsubscribe first"); @@ -55,6 +55,9 @@ public void exampleMutlicast() throws InterruptedException { System.out.println("First connection again"); Thread.sleep(500); s1 = cold.subscribe(i -> System.out.println("First: " + i)); + Thread.sleep(900); + System.out.println("Unsubscribe first again"); + s1.unsubscribe(); // First: 0 // First: 1 @@ -62,8 +65,9 @@ public void exampleMutlicast() throws InterruptedException { // Second: 2 // First: 3 // Second: 3 - // Unsubscribe first // First: 4 + // Second: 4 + // Unsubscribe second // First: 5 // First: 6 // Unsubscribe first @@ -72,7 +76,7 @@ public void exampleMutlicast() throws InterruptedException { // First: 1 // First: 2 // First: 3 - // First: 4 + // Unsubscribe first again } From d02e6fa9654c38571c430d5cca75ed58ef6eb3ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justin=20In=C3=A1cio?= Date: Sun, 8 Oct 2017 15:48:03 +0200 Subject: [PATCH 04/21] - Corrected spelling mistake - "Output" was written as "outout" --- Part 3 - Taming the sequence/6. Hot and Cold observables.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Part 3 - Taming the sequence/6. Hot and Cold observables.md b/Part 3 - Taming the sequence/6. Hot and Cold observables.md index 4f87b03..272f9bb 100644 --- a/Part 3 - Taming the sequence/6. Hot and Cold observables.md +++ b/Part 3 - Taming the sequence/6. Hot and Cold observables.md @@ -306,7 +306,7 @@ obs.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(300); obs.subscribe(i -> System.out.println("Second: " + i)); ``` -[Outout](/tests/java/itrx/chapter3/hotandcold/CacheExample.java) +[Output](/tests/java/itrx/chapter3/hotandcold/CacheExample.java) ``` First: 0 First: 1 @@ -336,7 +336,7 @@ Subscription subscription = obs.subscribe(); Thread.sleep(150); subscription.unsubscribe(); ``` -[Outout](/tests/java/itrx/chapter3/hotandcold/CacheExample.java) +[Output](/tests/java/itrx/chapter3/hotandcold/CacheExample.java) ``` Subscribed 0 From a81d6919e77c7e6411d96df92a75fde1cb981631 Mon Sep 17 00:00:00 2001 From: Vinh Nguyen Date: Wed, 8 Nov 2017 12:08:02 +0700 Subject: [PATCH 05/21] Fix wrong results on exampleModifyReference() --- tests/java/itrx/chapter3/sideeffects/AsObservableExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/java/itrx/chapter3/sideeffects/AsObservableExample.java b/tests/java/itrx/chapter3/sideeffects/AsObservableExample.java index 76a8fad..88dc72a 100644 --- a/tests/java/itrx/chapter3/sideeffects/AsObservableExample.java +++ b/tests/java/itrx/chapter3/sideeffects/AsObservableExample.java @@ -85,7 +85,7 @@ public void exampleModifyReference() { service.play(); // Before: Greet - // After: Greet + // After: Later // After: Hello // After: and // After: goodbye From 92648790e2527605f3305ac0ac7cf795c058b99f Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Tue, 13 Mar 2018 19:08:15 -0600 Subject: [PATCH 06/21] Fix typos in Sequence Basics (Part 2.1) --- Part 2 - Sequence Basics/1. Creating a sequence.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Part 2 - Sequence Basics/1. Creating a sequence.md b/Part 2 - Sequence Basics/1. Creating a sequence.md index a71693e..ea070c8 100644 --- a/Part 2 - Sequence Basics/1. Creating a sequence.md +++ b/Part 2 - Sequence Basics/1. Creating a sequence.md @@ -10,7 +10,7 @@ In previous examples we used `Subject`s and manually pushed values into them to ### Observable.just -The `just` method creates an `Observable` that will emit a predifined sequence of values, supplied on creation, and then terminate. +The `just` method creates an `Observable` that will emit a predefined sequence of values, supplied on creation, and then terminate. ```java Observable values = Observable.just("one", "two", "three"); @@ -112,7 +112,7 @@ now.subscribe(System.out::println); ### Observable.create -`create` is a very powerful function for creating observables. Let have a look at the signature. +`create` is a very powerful function for creating observables. Let's have a look at the signature. ```java static Observable create(Observable.OnSubscribe f) @@ -226,7 +226,7 @@ The example above waits 2 seconds, then starts counting every 1 second. ## Transitioning into Observable -There are well established tools for dealing with sequences, collections and asychronous events, which may not be directly compatible with Rx. Here we will discuss ways to turn their output into input for your Rx code. +There are well established tools for dealing with sequences, collections and asynchronous events, which may not be directly compatible with Rx. Here we will discuss ways to turn their output into input for your Rx code. If you are using an asynchronous tool that uses event handlers, like JavaFX, you can use `Observable.create` to turn the streams into an observable From 46e54ad73eee4afbcd98fb127ce5fe103ac2facd Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Thu, 15 Mar 2018 15:38:23 -0600 Subject: [PATCH 07/21] Fix a broken link in Aggregation (Part 2.4) --- Part 2 - Sequence Basics/4. Aggregation.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 2 - Sequence Basics/4. Aggregation.md b/Part 2 - Sequence Basics/4. Aggregation.md index 081324f..829a3e2 100644 --- a/Part 2 - Sequence Basics/4. Aggregation.md +++ b/Part 2 - Sequence Basics/4. Aggregation.md @@ -559,7 +559,7 @@ Observable.range(0, 3) 2 ``` -Nesting observables to consume them doesn't make much sense. Towards the end of the pipeline, you'd rather flatten and simplify your observables, rather than nest them. Nesting is useful when you need to make a non-nested observable be of the same type as a nested observable that you have from elsewhere. Once they are of the same type, you can combine them, as we will see in the chapter about [combining sequences](/Part 3 - Taming the sequence/4. Combining sequences.md). +Nesting observables to consume them doesn't make much sense. Towards the end of the pipeline, you'd rather flatten and simplify your observables, rather than nest them. Nesting is useful when you need to make a non-nested observable be of the same type as a nested observable that you have from elsewhere. Once they are of the same type, you can combine them, as we will see in the chapter about [combining sequences](/Part%203%20-%20Taming%20the%20sequence/4.%20Combining%20sequences.md). From bde57649fc776aaae574bb916ce26ffc2e245d36 Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Fri, 16 Mar 2018 12:03:45 -0600 Subject: [PATCH 08/21] Correct class name in example in Part 3.1 --- Part 3 - Taming the sequence/1. Side effects.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Part 3 - Taming the sequence/1. Side effects.md b/Part 3 - Taming the sequence/1. Side effects.md index a4bcdeb..c0a4eda 100644 --- a/Part 3 - Taming the sequence/1. Side effects.md +++ b/Part 3 - Taming the sequence/1. Side effects.md @@ -236,7 +236,7 @@ Subscription over Rx is designed in the style of functional programming, but it exists within an object-oriented environment. We also have to protect against object-oriented dangers. Consider this naive implementation for a service that returns an observable. ```java -public class BrakeableService { +public class BreakableService { public BehaviorSubject items = BehaviorSubject.create("Greet"); public void play() { items.onNext("Hello"); @@ -249,7 +249,7 @@ public class BrakeableService { The code above does not prevent a naughty consumer from changing your `items` with one of their own. After that happens, subscriptions done before the change will no longer receive items, because you are not calling `onNext` on the right `Subject` any more. We obviously need to hide access to our `Subject` ```java -public class BrakeableService { +public class BreakableService { private final BehaviorSubject items = BehaviorSubject.create("Greet"); public BehaviorSubject getValues() { From f9e5f7c91848255781f243c9f89340cbccfc15b7 Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Fri, 16 Mar 2018 12:04:48 -0600 Subject: [PATCH 09/21] Fix typo in Part 3.1 --- Part 3 - Taming the sequence/1. Side effects.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Part 3 - Taming the sequence/1. Side effects.md b/Part 3 - Taming the sequence/1. Side effects.md index a4bcdeb..f909d8b 100644 --- a/Part 3 - Taming the sequence/1. Side effects.md +++ b/Part 3 - Taming the sequence/1. Side effects.md @@ -236,7 +236,7 @@ Subscription over Rx is designed in the style of functional programming, but it exists within an object-oriented environment. We also have to protect against object-oriented dangers. Consider this naive implementation for a service that returns an observable. ```java -public class BrakeableService { +public class BreakableService { public BehaviorSubject items = BehaviorSubject.create("Greet"); public void play() { items.onNext("Hello"); @@ -249,7 +249,7 @@ public class BrakeableService { The code above does not prevent a naughty consumer from changing your `items` with one of their own. After that happens, subscriptions done before the change will no longer receive items, because you are not calling `onNext` on the right `Subject` any more. We obviously need to hide access to our `Subject` ```java -public class BrakeableService { +public class BreakableService { private final BehaviorSubject items = BehaviorSubject.create("Greet"); public BehaviorSubject getValues() { @@ -320,7 +320,7 @@ data.subscribe(d -> System.out.println(d.id + ": " + d.name)); 2: Garbage ``` -The first subscriber is the first to be called for each item. Its action is to modify the data. Once the first subscriber is done, the same reference is also passed to the second subscriber, only now the data is changed in a way that was not declared in the producer. A developer needs to have a deep understanding of Rx, Java and their environment in order to reason about the sequence of modifications, and then argue that such code would run according to a plan. It is simpler to avoid mutable state altogether. Observables should be seen as a sequence notifications about resolved events. +The first subscriber is the first to be called for each item. Its action is to modify the data. Once the first subscriber is done, the same reference is also passed to the second subscriber, only now the data is changed in a way that was not declared in the producer. A developer needs to have a deep understanding of Rx, Java and their environment in order to reason about the sequence of modifications, and then argue that such code would run according to a plan. It is simpler to avoid mutable state altogether. Observables should be seen as a sequence of notifications about resolved events. From 31ce87bf3557458af3c83633d15379a550a7b779 Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Fri, 16 Mar 2018 12:07:03 -0600 Subject: [PATCH 10/21] Fix broken link in Part 3.2 --- Part 3 - Taming the sequence/2. Leaving the monad.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 3 - Taming the sequence/2. Leaving the monad.md b/Part 3 - Taming the sequence/2. Leaving the monad.md index 78746ce..c053594 100644 --- a/Part 3 - Taming the sequence/2. Leaving the monad.md +++ b/Part 3 - Taming the sequence/2. Leaving the monad.md @@ -1,6 +1,6 @@ # Leaving the monad -A [monad] (https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. It is beyond the scope of this guide teaching monads. In www.introtorx.com we find a short definition: +A [monad](https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. It is beyond the scope of this guide teaching monads. In www.introtorx.com we find a short definition: > Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model. Monads are of interest to us, because the observable is a monad. Rx code declares what needs to be done but the actual processing happens not when Rx statements are executed, but rather when values are emitted. Readers may find it interesting to read more about monads in general. For this guide, when refering to monads the reader only needs to think about the observable. From e1371276ecad7d9c171e04239e9f73c0b4100aca Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Fri, 16 Mar 2018 12:09:50 -0600 Subject: [PATCH 11/21] Correct a grammatical error in Part 3.2 --- Part 3 - Taming the sequence/2. Leaving the monad.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 3 - Taming the sequence/2. Leaving the monad.md b/Part 3 - Taming the sequence/2. Leaving the monad.md index 78746ce..54ddc05 100644 --- a/Part 3 - Taming the sequence/2. Leaving the monad.md +++ b/Part 3 - Taming the sequence/2. Leaving the monad.md @@ -1,6 +1,6 @@ # Leaving the monad -A [monad] (https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. It is beyond the scope of this guide teaching monads. In www.introtorx.com we find a short definition: +A [monad] (https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. Teaching monads is beyond the scope of this guide. In www.introtorx.com we find a short definition: > Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model. Monads are of interest to us, because the observable is a monad. Rx code declares what needs to be done but the actual processing happens not when Rx statements are executed, but rather when values are emitted. Readers may find it interesting to read more about monads in general. For this guide, when refering to monads the reader only needs to think about the observable. From a0e17f56c0b7790a28498d84d967e5dcbadc6761 Mon Sep 17 00:00:00 2001 From: Jack Keys Date: Fri, 16 Mar 2018 12:22:09 -0600 Subject: [PATCH 12/21] Fix a broken internal link in Part 3.2 --- Part 3 - Taming the sequence/2. Leaving the monad.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 3 - Taming the sequence/2. Leaving the monad.md b/Part 3 - Taming the sequence/2. Leaving the monad.md index 78746ce..29ca4c0 100644 --- a/Part 3 - Taming the sequence/2. Leaving the monad.md +++ b/Part 3 - Taming the sequence/2. Leaving the monad.md @@ -318,7 +318,7 @@ subject.onCompleted(); ### Non-terminating sequences -Some blocking ways to access observables, such as `last()`, require the observable to terminate to unblock. Others, like `first()`, require it to emit at least one event to unblock. Using those methods on `Observable` isn't a big danger, as they only return a non-terminating observable. These same `methods` on `BlockingObservable` can result in a permanent block if the consumer hasn't taken the time to enforce some guarantees, such as timeouts (we will see how this is done in [Timeshifter sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md)). +Some blocking ways to access observables, such as `last()`, require the observable to terminate to unblock. Others, like `first()`, require it to emit at least one event to unblock. Using those methods on `Observable` isn't a big danger, as they only return a non-terminating observable. These same `methods` on `BlockingObservable` can result in a permanent block if the consumer hasn't taken the time to enforce some guarantees, such as timeouts (we will see how this is done in [Timeshifter sequences](/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md)). #### Continue reading From d529f12cabc01ed907cc52cc5c46cf5c9af0c855 Mon Sep 17 00:00:00 2001 From: Zhao Gang Date: Sun, 6 May 2018 21:33:44 +0800 Subject: [PATCH 13/21] Fix a typo Actually the filter filters out odd ones, leaving the even ones. --- Part 2 - Sequence Basics/2. Reducing a sequence.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 2 - Sequence Basics/2. Reducing a sequence.md b/Part 2 - Sequence Basics/2. Reducing a sequence.md index e22c0b0..4febf8a 100644 --- a/Part 2 - Sequence Basics/2. Reducing a sequence.md +++ b/Part 2 - Sequence Basics/2. Reducing a sequence.md @@ -20,7 +20,7 @@ public final Observable filter(Func1 predicate) ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/filter.png) -We will use `filter` to create a sequence of numbers and filter out all the even ones, keeping only odd values. +We will use `filter` to create a sequence of numbers and filter out all the odd ones, keeping only even values. ```java Observable values = Observable.range(0,10); From 5982ab960b2f6fd96cf7c4cd02650eafc2023c4e Mon Sep 17 00:00:00 2001 From: Zhao Gang Date: Thu, 10 May 2018 21:15:41 +0800 Subject: [PATCH 14/21] Fix another typo The range function should be Observable.range(3,7) instead of Observable.range(3,4) --- Part 2 - Sequence Basics/5. Transformation of sequences.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 2 - Sequence Basics/5. Transformation of sequences.md b/Part 2 - Sequence Basics/5. Transformation of sequences.md index 95bccd8..335074c 100644 --- a/Part 2 - Sequence Basics/5. Transformation of sequences.md +++ b/Part 2 - Sequence Basics/5. Transformation of sequences.md @@ -57,7 +57,7 @@ Map: 6 Map: Completed ``` -This was something we could do without `map`, for example by using `Observable.range(3,4)`. In the following, we will do something more practical. The producer will emit numeric values as a string, like many UIs often do, and then use `map` to convert them to a more processable integer format. +This was something we could do without `map`, for example by using `Observable.range(3,7)`. In the following, we will do something more practical. The producer will emit numeric values as a string, like many UIs often do, and then use `map` to convert them to a more processable integer format. ```java Observable values = From 23cddd311199ab6ebf78519b813c3be20339540c Mon Sep 17 00:00:00 2001 From: TaeHyun Kim Date: Sun, 8 Jul 2018 21:26:12 +0900 Subject: [PATCH 15/21] Fix typo in part 1.2 --- Part 1 - Getting Started/2. Key types.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Part 1 - Getting Started/2. Key types.md b/Part 1 - Getting Started/2. Key types.md index 7519602..571b356 100644 --- a/Part 1 - Getting Started/2. Key types.md +++ b/Part 1 - Getting Started/2. Key types.md @@ -129,7 +129,7 @@ Late: 2 Late: 3 ``` -Our late subscriber now missed the first value, which fell off the buffer of size 2. Similarily, old values fall off the buffer as time passes, when the subject is created with `createWithTime` +Our late subscriber now missed the first value, which fell off the buffer of size 2. Similarly, old values fall off the buffer as time passes, when the subject is created with `createWithTime`. ```java ReplaySubject s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS, From fc53bdfb72365e53490efecaf9767385ff6121f3 Mon Sep 17 00:00:00 2001 From: Paul Polozhevets <45691162+past77@users.noreply.github.com> Date: Wed, 16 Mar 2022 17:50:58 +0200 Subject: [PATCH 16/21] Update 2. Key types.md --- Part 1 - Getting Started/2. Key types.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Part 1 - Getting Started/2. Key types.md b/Part 1 - Getting Started/2. Key types.md index 7519602..1d5802a 100644 --- a/Part 1 - Getting Started/2. Key types.md +++ b/Part 1 - Getting Started/2. Key types.md @@ -32,7 +32,7 @@ We already saw one abstract implementation of the [Observer](http://reactivex.io ```java interface Observer { - void onCompleted(); + void onComplete(); void onError(java.lang.Throwable e); void onNext(T t); } @@ -178,7 +178,7 @@ BehaviorSubject s = BehaviorSubject.create(); s.onNext(0); s.onNext(1); s.onNext(2); -s.onCompleted(); +s.onComplete(); s.subscribe( v -> System.out.println("Late: " + v), e -> System.out.println("Error"), @@ -211,7 +211,7 @@ s.subscribe(v -> System.out.println(v)); s.onNext(0); s.onNext(1); s.onNext(2); -s.onCompleted(); +s.onComplete(); ``` [Output](/tests/java/itrx/chapter1/AsyncSubjectExample.java) ``` @@ -228,7 +228,7 @@ As we already mentioned, there are contracts in Rx that are not obvious in the c Subject s = ReplaySubject.create(); s.subscribe(v -> System.out.println(v)); s.onNext(0); -s.onCompleted(); +s.onComplete(); s.onNext(1); s.onNext(2); ``` From b44018a2d448ddfff0b71c1e13a4ec35ba2c0ae6 Mon Sep 17 00:00:00 2001 From: logrusx Date: Mon, 9 Sep 2024 20:16:13 +0300 Subject: [PATCH 17/21] Improve style --- Part 2 - Sequence Basics/1. Creating a sequence.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Part 2 - Sequence Basics/1. Creating a sequence.md b/Part 2 - Sequence Basics/1. Creating a sequence.md index ea070c8..f6db679 100644 --- a/Part 2 - Sequence Basics/1. Creating a sequence.md +++ b/Part 2 - Sequence Basics/1. Creating a sequence.md @@ -181,9 +181,9 @@ Received: 3 ... ``` -This sequence will not terminate until we unsubscribe. +This sequence will not terminate for as long as there are subscribers. -We should note why the blocking read at the end is necessary. Without it, the program terminates without printing something. That's because our operations are non-blocking: we create an observable that will emit values _over time_, then we register the actions to execute if and when values arrive. None of that is blocking and the main thread proceeds to terminate. The timer that produces the ticks runs on its own thread, which does not prevent the JVM from terminating, killing the timer with it. +Note the blocking read at the end is necessary. Without it, the program terminates without printing something. That's because observable's operations are non-blocking: when created, the observable will emit values _over time_, then the actions registered execute if and when values arrive. None of that is blocking and the main thread proceeds to terminate. The timer that produces the ticks runs on its own thread, which does not prevent the JVM from terminating, killing the timer with it. ### Observable.timer From 7844d8449c9657a5d27393f3627325c104b61ab4 Mon Sep 17 00:00:00 2001 From: logrusx Date: Fri, 20 Sep 2024 13:53:12 +0300 Subject: [PATCH 18/21] Part 3, Section 2: Fixes of typos, grammar, structure and style --- .../2. Leaving the monad.md | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/Part 3 - Taming the sequence/2. Leaving the monad.md b/Part 3 - Taming the sequence/2. Leaving the monad.md index fea188c..8ed7504 100644 --- a/Part 3 - Taming the sequence/2. Leaving the monad.md +++ b/Part 3 - Taming the sequence/2. Leaving the monad.md @@ -1,27 +1,30 @@ # Leaving the monad -A [monad] (https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. Teaching monads is beyond the scope of this guide. In www.introtorx.com we find a short definition: +A [monad] (https://en.wikipedia.org/wiki/Monad_%28functional_programming%29) is an abstract concept from functional programming that is unfamiliar to most programmers. Teaching monads is beyond the scope of this guide. A shorter definition can be found at www.introtorx.com: > Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model. -Monads are of interest to us, because the observable is a monad. Rx code declares what needs to be done but the actual processing happens not when Rx statements are executed, but rather when values are emitted. Readers may find it interesting to read more about monads in general. For this guide, when refering to monads the reader only needs to think about the observable. +Monads are of interest to the reader, because the observable is a monad. Rx code declares what needs to be done but the actual processing happens not when Rx statements are executed, but rather when values are emitted. Readers may find it interesting to read more about monads in general. In this guide, when referring to monads the reader only needs to think about the observable. ## Why leave the monad -There are two main reasons one may want to leave the monad. The first reason is that a new Rx developer will still be more comfortable in more traditional paradigms. Doing parts of the computation in a different paradigm may enable you to get some parts working, while you're still figuring out how to do things in Rx. The second reason is that we usually interact with components and libraries that weren't designed with Rx in mind. When refactoring existing code into Rx, it may be useful to have Rx behave in a blocking way. +There are two main reasons one may want to leave the monad. The first reason is that a new Rx developer will still be more comfortable in more traditional paradigms. Doing parts of the computation in a different paradigm may help getting some parts working, while still figuring out how to do things in Rx. The second reason is that interaction with components and libraries that weren't designed with Rx in mind is necessary. When refactoring existing code into Rx, it may be useful to have Rx behave in a blocking way. ## BlockingObservable +### Blocking Operations + The first step to getting data out of an observable in a blocking manner is to transition to a [BlockingObservable](http://reactivex.io/RxJava/javadoc/rx/observables/BlockingObservable.html#). Any `Observable` can be converted to a `BlockingObservable` in one of two ways: -You can use the `Observable`'s `toBlocking` method + ++ `Observable`'s `toBlocking` method ```java public final BlockingObservable toBlocking() ``` -or the static factory of `BlockingObservable` ++ `BlockingObservable`'s static factory method `from` ```java public static BlockingObservable from(Observable o) ``` -`BlockingObservable` does not extend the `Observable` and it can't be used with our usual Rx operators. It has its own implementations of a small set functions, which allow you to extract data out of an `Observable` in a blocking manner. Many of those methods are the blocking counterparts to methods that we have already seen. +`BlockingObservable` does not extend the `Observable` and it can't be used with standard Rx operators. It has its own implementations of a small set of functions, which allow extracting data from an `Observable` in a blocking manner. Many of those methods are the blocking counterparts to methods already discussed. ### forEach @@ -47,9 +50,11 @@ Subscribed 4 ``` -The code here behaves like `subscribe` would. First you register an observer (no overload for `forEach` accepts `Observer`, but the semantics are the same). Execution then proceeds to print "Subscribed" and exits our snippet. As values are emitted (the first one with a 100ms delay), they are passed to our observer for processing. +The code here behaves like `subscribe` would. First an observer is registered (no overload for `forEach` accepts `Observer`, but the semantics are the same). Execution then proceeds to print "Subscribed" and exits the snippet. As values are emitted (the first one with a 100ms delay), they are passed to the observer for processing. + +`BlockingObservable` doesn't have a `subscribe` function, but it has `forEach`. -`BlockingObservable` doesn't have a `subscribe` function, but it has `forEach`. Let's see the same example with `BLockingObservable` +Consider the same example with `BLockingObservable` ```java Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); @@ -70,7 +75,7 @@ System.out.println("Subscribed"); Subscribed ``` -We see here that the call to `forEach` blocked until the observable completed. Another difference is that there can be no handlers for `onError` and `onCompleted`. `onCompleted` is a given if the execution completes, while exceptions will be thrown into the runtime to be caught: +The call to `forEach` is blocked until the observable completed. Another difference is that there can be no handlers for `onError` and `onCompleted`. If the execution completes,`onCompleted` is a given while exceptions will be thrown into the runtime to be caught: ```java Observable values = Observable.error(new Exception("Oops")); @@ -94,7 +99,7 @@ Subscribed ### first, last, single -`BlockingObservable` has methods for `first`, `last` and `single`, along with implementations for default values `firstOrDefault`, `lastOrDefault` and `singleOrDefault`. Having read about their [namesakes](/Part%202%20-%20Sequence%20Basics/4.%20Aggregation.md#first) in `Observable`, you already know what the returned value is. Once again, the difference is the blocking nature of the methods. They don't return an observable that will emit the value when it is available. Rather, they block until the value is available and return the value itself, without the surrounding observable. +`BlockingObservable` has methods for `first`, `last` and `single`, along with implementations for default values `firstOrDefault`, `lastOrDefault` and `singleOrDefault`. Their return types are just like the ones for their reactive [namesakes](/Part%202%20-%20Sequence%20Basics/4.%20Aggregation.md#first) in `Observable`. Once again, the difference is the blocking nature of the methods. They don't return an observable that will emit the value when it is available. Rather, they block until the value is available and return the value itself, without the surrounding observable. ```java Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); @@ -110,7 +115,7 @@ System.out.println(value); 3 ``` -As we can see, the call to `first` blocked until a value was available, and only then was a value returned. +The call to `first` blocks until a value is available, and only then a value is returned. Like with `forEach`, exceptions are thrown in the runtime to be caught @@ -135,7 +140,7 @@ Caught: java.lang.IllegalArgumentException: Sequence contains too many elements ### To Iterable -You can transform your observables to [iterables](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html) throught a variety of methods on `BlockingObservable`. Iterables are pull-based, unlike Rx, which is push-based. That means that when the consumer is ready to consume a value, one is requested with `next()` on the iterable's [Iterator](https://docs.oracle.com/javase/8/docs/api/java/util/Iterator.html). The call to `next()` will either return a value immediately or block until one is ready. +Observables can be transformed to [iterables](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html) throught a variety of methods on `BlockingObservable`. Iterables are pull-based, unlike Rx, which is push-based. That means that when the consumer is ready to consume a value, one is requested with `next()` on the iterable's [Iterator](https://docs.oracle.com/javase/8/docs/api/java/util/Iterator.html). The call to `next()` will either return a value immediately or block until one is ready. There are several ways to go from `BlockingObservable` to `Iterable` and each has a different behaviour. @@ -301,9 +306,9 @@ Emitted: 0 ### Deadlocks -So far we were able to ignore potential deadlocks. Rx's non-blocking nature makes it harder to create unnecessary deadlocks. However, in this chapter we returned to blocking methods, thus bringing deadlocks to the forefront again. +So far potential deadlocks were ignored. Rx's non-blocking nature makes it harder to create unnecessary deadlocks. However, this chapter discussed blocking methods, thus bringing deadlocks to the forefront again. -The example below would work as a non-blocking case. But because we used blocking operations, it will never unblock +The example below would work as a non-blocking case. But because blocking operations were used, it will never unblock ```java ReplaySubject subject = ReplaySubject.create(); @@ -318,7 +323,7 @@ subject.onCompleted(); ### Non-terminating sequences -Some blocking ways to access observables, such as `last()`, require the observable to terminate to unblock. Others, like `first()`, require it to emit at least one event to unblock. Using those methods on `Observable` isn't a big danger, as they only return a non-terminating observable. These same `methods` on `BlockingObservable` can result in a permanent block if the consumer hasn't taken the time to enforce some guarantees, such as timeouts (we will see how this is done in [Timeshifter sequences](/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md)). +Some blocking ways to access observables, such as `last()`, require the observable to terminate to unblock. Others, like `first()`, require it to emit at least one event to unblock. Using those methods on `Observable` isn't a big danger, as they only return a non-terminating observable. These same `methods` on `BlockingObservable` can result in a permanent block if the consumer hasn't taken the time to enforce some guarantees, such as timeouts, which is discussed in [Timeshifter sequences](/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md). #### Continue reading From b1512fa50bc78762cddf24c6d7721fb3904c033b Mon Sep 17 00:00:00 2001 From: logrusx Date: Mon, 30 Sep 2024 10:25:11 +0300 Subject: [PATCH 19/21] Part 3 Section 3: Fixes of typos, grammar, structure and style --- .../3. Advanced error handling.md | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/Part 3 - Taming the sequence/3. Advanced error handling.md b/Part 3 - Taming the sequence/3. Advanced error handling.md index 1688ccc..e135470 100644 --- a/Part 3 - Taming the sequence/3. Advanced error handling.md +++ b/Part 3 - Taming the sequence/3. Advanced error handling.md @@ -1,16 +1,16 @@ # Advanced error handling -We've already seen how we can handle an error in the observer. However, by that time, we are practically outside of the monad. There can be many kinds of errors and not every error is worth pushing all the way to the top. In standard Java, you can catch an exception at any level and decide if you want to handle it there or throw it further. Similarly in Rx, you can define behaviour based on errors without terminating the observable and forcing the observer to deal with everything. +In previous chapters, error handling in the observer was already discussed. However, by that time, it's practically outside the monad. There are many kinds of errors and not every error is worth pushing all the way to the top. In standard Java, exceptions can be caught at any level and a decision to handle it there or throw it further can be made. Similarly, in Rx, behaviour based on errors without terminating the observable and forcing the observer to deal with everything, can be defined. ## Resume ### onErrorReturn -The `onErrorReturn` operator allows you to ignore an error and emit one final value before terminating (successfully this time). +The `onErrorReturn` operator ignores an error and emits one final value before terminating (successfully this time). ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png) -In the next example, we will convert an error into a normal value to be printed: +In the next example, an error is converted into a normal value: ```java Observable values = Observable.create(o -> { @@ -32,7 +32,7 @@ Error: adjective unknown ### onErrorResumeNext -The `onErrorResumeNext` allows you to resume a failed sequence with another sequence. The error will not appear in the resulting observable. +The `onErrorResumeNext` resumes a failed sequence with another sequence. The error does not appear in the resulting observable. ```java public final Observable onErrorResumeNext( @@ -43,7 +43,7 @@ public final Observable onErrorResumeNext( ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png) -The first overload uses the same followup observable in every case. The second overload allows you to decide what the resume sequence should be based on the error that occurred. +The first overload uses the same followup observable in every case. The second overload allows for a decision what the resume sequence should be based on the error occurred. ```java Observable values = Observable.create(o -> { @@ -64,17 +64,17 @@ with onError: 2147483647 with onError: Completed ``` -There's nothing stopping the resumeSequence from failing as well. In fact, if you wanted to change the type of the error, you can return an observable that fails immediately. In standard Java, components may decide they can't handle an error and that they should re-throw it. In such cases, it is common wrap a new exception around the original error, thus providing additional context. You can do the same in Rx: +Nothing prevents the resume sequence from failing as well. In fact, error type can be changed by returning an observable that fails immediately. In standard Java, components may decide they can't handle an error and that they should re-throw it. In such cases, it is common to provide additional context by wrapping the original error into a new exception. Here's how it's done in Rx: ```java .onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e))) ``` -Now the sequence still fails, but you've wrapped the original error in a new error. +Now the sequence still fails, but the original error is wrapped into a new error. ### onExceptionResumeNext -`onExceptionResumeNext` only has one difference to `onErrorResumeNext`: it only catches errors that are `Exception`s. +It only has one difference to `onErrorResumeNext`: it only catches errors that are `Exception`'s. ```java Observable values = Observable.create(o -> { @@ -99,7 +99,7 @@ public final Observable retry(long count) ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png) -If the error doesn't go away, `retry()` will lock us in an infinite loop of retries. The second overload limits the number of retries. If errors persist and the sequence fails n times, `retry(n)` will fail too. Let's see this in an example +If the error doesn't go away, `retry()` will lock in an infinite loop of retries. The second overload limits the number of retries. If errors persist and the sequence fails n times, `retry(n)` will fail too. Let's see this in an example ```java Random random = new Random(); @@ -122,27 +122,27 @@ values java.lang.Exception ``` -Here we've specified that we want to retry once. Our observable fails after two values, then tries again, fails again. The second time it fails the exception is allowed pass through. +Here it's specified to retry only once. The observable fails after two values, then tries again, fails again. The second time it fails, the exception is allowed to pass through. -In this example, we have done something naughty: we have made our subscription stateful to demonstrate that the observable is restarted from the source: it produced different values the second time around. `retry` does not cache any elements like `replay`, nor would it make sense to do so. Retrying makes sense only if there are side effects, or if the observable is [hot](/Part%203%20-%20Taming%20the%20sequence/6.%20Hot%20and%20Cold%20observables.md). +In this example, the subscription is stateful to demonstrate that the observable is restarted from the source: it produced different values the second time around. `retry` does not cache any elements like `replay`, nor would it make sense to do so. Retrying makes sense only if there are side effects, or if the observable is [hot](/Part%203%20-%20Taming%20the%20sequence/6.%20Hot%20and%20Cold%20observables.md). ### retryWhen -`retry` will restart the subscription as soon as the failure happens. If we need more control over this, we can use `retryWhen`. +`retry` will restart the subscription as soon as the failure happens. If more control is needed, `retryWhen` can be used. ```java public final Observable retryWhen( - Func1,? extends Observable> notificationHandler) + Func1, ? extends Observable> notificationHandler) ``` -The argument to `retryWhen` is a function that takes an observable and returns another. The input observable emits all the errors that `retryWhen` encounters. The resulting observable signals when to retry: +The argument to `retryWhen` is a function that takes an observable and returns another observable. The input observable emits all the errors that `retryWhen` encounters. The resulting observable signals when to retry: * if it emits a value, `retryWhen` will retry, * if it terminates with error, `retryWhen` will emit the error and not retry. * if it terminates successfully, `retryWhen` will terminate successfully. Note that the type of the signaling observable and the actual values emitted don't matter. The values are discarded and the observable is only used for timing. -In the next example, we will construct a retrying policy where we wait 100ms before retrying. +In the next example, policy with 100ms delay before retrying will be used. ```java Observable source = Observable.create(o -> { @@ -169,11 +169,11 @@ TimeInterval [intervalInMilliseconds=103, value=1] TimeInterval [intervalInMilliseconds=0, value=2] ``` -Our source observable emits 2 values and immediately fails. When that happens, the observable of failures inside `retryWhen` emits the error. We delay that emission by 100ms and send it back to signal a retry. `take(2)` guarantees that our signaling observable will terminate after we receive two errors. `retryWhen` sees the termination and doesn't retry after the second failures. +The source observable emits 2 values and immediately fails. When that happens, the observable of failures inside `retryWhen` emits the error. That emission is delayed by 100 ms and sent back to signal a retry. `take(2)` guarantees that the signaling observable will terminate after two errors are received. `retryWhen` sees the termination and doesn't retry after the second failures. ## using -The `using` operator is for creating observables from resources that need to be managed. It guarantees that your resources will be managed regardless of when and how subscriptions are terminated. If you were to just use `create`, you would have to do the managing in the traditional Java paradigm and inject it into Rx. `using` is a more natural way of managing resources in Rx. +The `using` operator creates observables from resources that need to be managed. It guarantees that the resources will be managed regardless of when and how subscriptions are terminated. If `create` was used, managing would have to be done in the traditional Java paradigm and injected into Rx. `using` is a more natural way of managing resources in Rx. ```java public static final Observable using( @@ -184,7 +184,7 @@ public static final Observable using( When a new subscription begins, `resourceFactory` leases the necessary resource. `observableFactory` uses that resource to produce items. When the resource is no longer needed, it is disposed of with the `disposeAction`. The dispose action is executed regardless of the way the subscription terminates (successfully or with a failure). -In the next example, we pretend that a `string` is a resource that needs managing. +In the next example, a string is considered a resource that needs managing. ```java Observable values = Observable.using( @@ -223,15 +223,15 @@ e Disposed: MyResource ``` -When we subscribe to `values`, the resource factory function is called which returns `"MyResource"`. That string is used to produce an observable which emits all of the characters in the string. Once the subscription ends, the resource is disposed of. A `String` doesn't need any more managing than what the garbage collector will do. Resources may actually need such managing, e.g., database connections, opened files etc. +When `values` is subscribed to, the resource factory function is called which returns `"MyResource"`. That string is used to produce an observable which emits all the characters in the string. Once the subscription ends, the resource is disposed of. A `String` doesn't need any more managing than what the garbage collector will do. Resources may actually need such managing, e.g., database connections, opened files etc. -It is important to note here that we are responsible for terminating the observable, just like we were when using the `create` method. With `create`, terminating is a matter of semantics. With `using`, not terminating defeats the point of using it in the first place. Only upon termination the resources will be released. If we had not called `o.onCompleted()`, the sequence would be assumed to be still active and needing its resources. +It is important to note here that terminating the observable must be done manually, just like when using the `create` method. With `create`, terminating is a matter of semantics. With `using`, not terminating defeats the point of using it in the first place. Only upon termination the resources will be released. If `o.onCompleted()` was to be called, the sequence would be assumed to be still active and needing its resources. #### Continue reading -| Previous | Next | -| --- | --- | -| [Leaving the monad](/Part%203%20-%20Taming%20the%20sequence/2.%20Leaving%20the%20monad.md) | [Combining sequences](/Part%203%20-%20Taming%20the%20sequence/4.%20Combining%20sequences.md) | +| Previous | Next | +|--------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------| +| [Leaving the monad](/Part%203%20-%20Taming%20the%20sequence/2.%20Leaving%20the%20monad.md) | [Combining sequences](/Part%203%20-%20Taming%20the%20sequence/4.%20Combining%20sequences.md) | From 27285a3ecdf72fba65146ed2889f56e84a4c649a Mon Sep 17 00:00:00 2001 From: logrusx Date: Sun, 13 Oct 2024 21:38:54 +0300 Subject: [PATCH 20/21] Part 3, Chpter 4: Spelling, structure and style improvement --- .../4. Combining sequences.md | 79 ++++++++----------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/Part 3 - Taming the sequence/4. Combining sequences.md b/Part 3 - Taming the sequence/4. Combining sequences.md index 17b6eae..9d6942a 100644 --- a/Part 3 - Taming the sequence/4. Combining sequences.md +++ b/Part 3 - Taming the sequence/4. Combining sequences.md @@ -1,6 +1,6 @@ # Combining sequences -So far, we've seen most of the methods that allow us to create a sequence and transform it into what we want. However, most applications will have more than one source of input. We need a way a of combining sequences. We've already seen a few sequences that use more than one observable. In this chapter, we will see the most important operators that use multiple sequences to produce one. +So far, most of the methods that allow creation and transform of a sequence were discussed. However, most applications have more than one source of input. That's why a way of combining sequences is needed. In this chapter, the most important operators that use multiple sequences to produce one will be discussed. ## Concatenation @@ -8,7 +8,7 @@ The most straight-forward combination of sequences is to have one run after the ### concat -The `concat` operator concatenates sequences one after the other. There are many overloads to `concat`, which allow you to provide source observables in different numbers and formats. +The `concat` operator concatenates sequences one after the other. There are many overloads to `concat`, which accept source observables in different numbers and formats. ```java public static final Observable concat( @@ -46,7 +46,7 @@ Observable.concat(seq1, seq2) 12 ``` -If the number of sequences to be combined is dynamic, you can provide an observable that emits the sequences to be concatenated. In this example, we will use our familiar `groupBy` to create a sequence that emits words starting with the same letter together. +If the number of sequences to be combined is dynamic, observable that emits the sequences to be concatenated can be provided. In this example, the familiar `groupBy` will be used to create a sequence that emits words starting with the same letter together. ```java Observable words = Observable.just( @@ -99,7 +99,7 @@ public void exampleConcatWith() { ### repeat -`repeat` allows you to concatenate a sequence after itself, either an infinite or a finite number of times. `repeat` doesn't cache the values to repeat them. When the time comes, it will start a new subscription and dispose of the old one. +`repeat` concatenates a sequence after itself, either an infinite or a finite number of times. `repeat` doesn't cache the values to repeat them. When the time comes, it will start a new subscription and dispose of the old one. ```java public final Observable repeat() @@ -125,9 +125,9 @@ words.repeat(2) ### repeatWhen -If you need more control than `repeat` gives, you can control when the repetition starts with the `repeatWhen` operator. The _when_ is defined by an observable that you provide. When the original sequence completes, it waits for the handling observable to emit something (the value is irrelevant) and only then does it repeat. If the handling observable terminates, that means that the repetitions should stop. +If more control is needed than `repeat` gives, `repeatWhen` operator can be used. The _when_ is defined by a user supplied obeseveable. When the original sequence completes, it waits for the handling observable to emit something (the value is irrelevant) and only then does it repeat. If the handling observable terminates, that means that the repetitions should stop. -It may be useful for the signal to know when a repetition has been completed. `repeatWhen` provides a special observable that emits `void` when a repetition terminates. You can use that observable to construct your signal. +It may be useful for the signal to know when a repetition has been completed. `repeatWhen` provides a special observable that emits `void` when a repetition terminates. That observable can be used to construct the signal. ```java public final Observable repeatWhen( @@ -158,9 +158,9 @@ repeatWhen: 1 repeatWhen: Completed ``` -Here the repetition happens immediately: `ob` emits when a repetition has ended, so the returned observable also emits right after a completed repetition. That signal the new repetition to begin. +Here the repetition happens immediately: `ob` emits when a repetition has ended, so the returned observable also emits right after a completed repetition. That signals the new repetition to begin. -In the next example, we create sequence that repeats every two seconds, forever. +In the next example, a sequence repeats every two seconds, forever. ```java Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); @@ -174,7 +174,7 @@ values .subscribe(new PrintSubscriber("repeatWhen")); ``` -Note that the sequence repeats every 2 seconds regardless of when it completed. That's because we created an independent `interval` observable that sends a signal every 2 seconds. In the next chapter, [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md), we will see ways of dealing with sequences in time with more control. +Note that the sequence repeats every 2 seconds regardless of when it completed. That's because the returned independent `interval` observable that sends a signal every 2 seconds. In the next chapter, [Time-shifted sequences](/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md), dealing with sequences in time with more control will be discussed. Another thing to note is the `ob.subscribe()` statement, which appears to be useless. That is necessary because it forces `ob` to be created. In the current implementation of `repeatWhen`, if `ob` is not subscribed to, then repetitions never begin. @@ -192,7 +192,7 @@ public final Observable startWith(T t1, T t2, T t3) ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/startWith.png) -Here an example +Example: ```java Observable values = Observable.range(0, 3); @@ -209,7 +209,7 @@ values.startWith(-1,-2) 2 ``` -`startWith` is a shorthand for using `concat` with a `just` and our source sequence. +`startWith` is a shorthand for using `concat` with `just` and a source sequence. ```java Observable.concat( @@ -222,7 +222,7 @@ values.startWith(-1,-2,-3) ## Concurrent sequences -Observables aren't always emitting values at predictable moments in time. We will now see some operators intended for combining sequences that emit values concurrently. +Observables aren't always emitting values at predictable moments in time. The following operators are intended for combining sequences that emit values concurrently. ### amb @@ -242,7 +242,7 @@ public static final Observable amb( ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png) -In the following example, `amb` will mirror the second observable, because it waits less to start. +In the following example, `amb` mirrors the second observable, because it waits less to start. ```java Observable.amb( @@ -255,8 +255,8 @@ Observable.amb( Second ``` -It's usefulness may be not be obvious -> The amb feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds. _-Lee Cambell www.introtorx.com_ +It's usefulness may not be obvious +> The `amb` feature can be useful if you have multiple cheap resources that can provide values, but latency is widely variable. For an example, you may have servers replicated around the world. Issuing a query is cheap for both the client to send and for the server to respond, however due to network conditions the latency is not predictable and varies considerably. Using the Amb operator, you can send the same request out to many servers and consume the result of the first that responds. _-Lee Cambell www.introtorx.com_ An alternative style of doing `amb` is the `ambWith` operator. `ambWith` allows you to combine the observables one by one in a chain. This is more convenient when using `amb` in the middle of a chain or operators. @@ -349,7 +349,7 @@ public static final Observable mergeDelayError( ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png) -In the next example, we merge two observables which emit every 100ms. One fails early while the other observable continues to complete. +In the next example, two observables which emit every 100ms are merged. One fails early while the other observable continues to complete. ```java Observable failAt200 = @@ -444,7 +444,7 @@ Observable.switchOnNext( 2 ``` -This example may be a bit confusing. What we've done is creating an observable that creates a new observable every 100ms. Every created observable emits its number in the sequence every 30ms. After 100ms, each of those observables has had enough time to emit its number 3 times. Then a new observable is created, which causes them to be replaced by the new one. +This example may be a bit confusing. That's because an observable that creates a new observable every 100ms is supplied to concat. Every created observable emits its number in the sequence every 30ms. After 100ms, each of those observables has had enough time to emit its number 3 times. Then a new observable is created, which causes them to be replaced by the new one. #### switchMap @@ -481,12 +481,20 @@ Observable.interval(100, TimeUnit.MILLISECONDS) ## Pairing sequences -So far, we've seen operators which, in one way or the other, flattened multiple sequences into one of the same type. The next operators put the source sequences side-by-side and use the values to create a composite value. +So far, operators which, in one way or the other, flattened multiple sequences into one of the same type were discussed. The following operators put the source sequences side-by-side and use the values to create a composite value. ### zip `zip` is a very basic function out of functional programming. It takes two or more sequences and matches their values one-to-one by index. A function is required to combine the values. Unlike what you might expect from other environments, in RxJava `zip` doesn't default to combining all the values in a tuple. +```java +public static Observable zip(Iterable> ws, FuncN zipFunction) +public static Observable zip(Observable[] ws, FuncN zipFunction) +public static Observable zip(Observable> ws, FuncN zipFunction) +public static Observable zip(Observable o1, Observable o2, Func2 zipFunction) +//up to T9 +``` + ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png) In the next example, we have two sources that emit items at different rates. @@ -526,30 +534,11 @@ Left emits 8 5 - 5 ``` -As we can see, `zip` matched values based on index. +Evidently, `zip` matched values based on index. `zip` has multiple overloads for zipping more than two sequences together. -```java -public static final Observable zip( - java.lang.Iterable> ws, - FuncN zipFunction) -public static final Observable zip( - Observable> ws, - FuncN zipFunction) -public static final Observable zip( - Observable o1, - Observable o2, - Func2 zipFunction) -public static final Observable zip( - Observable o1, - Observable o2, - Observable o3, - Func3 zipFunction) -/// etc -``` - -When zipping more than two sequences, the operator will wait until all of the sources have emitted the next value before it emits the next zipped value. In the next example, we add another source with its own frequency again. +When zipping more than two sequences, the operator will wait until all of the sources have emitted the next value before it emits the next zipped value. In the next example, a third source with its own frequency again. ```java Observable.zip( @@ -570,7 +559,7 @@ Observable.zip( 5 - 5 - 5 ``` -The zipped sequence terminates when any of the source sequences terminates successfully. Further values from the other sequences will be ignored. We can see that in the next example, where we zip sequences of different sizes and count the elements in the zipped sequence. +The zipped sequence terminates when any of the source sequences terminates successfully. Further values from the other sequences will be ignored. The following example demonstrates that. ```java Observable.zip( @@ -589,7 +578,7 @@ Observable.zip( The zipped sequence contains as many elements as the shortest source sequence. -There is also the `zipWith` operator, which is an alternative style of zipping 2 sequences. `zipWith` allows you to zip in a chain, but it can be inconvenient for zipping more that two sequences. +There is also the `zipWith` operator, which is an alternative style of zipping 2 sequences. `zipWith` allows to zip in a chain, but it can be inconvenient for zipping more that two sequences. ```java Observable.interval(100, TimeUnit.MILLISECONDS) @@ -609,7 +598,7 @@ Observable.interval(100, TimeUnit.MILLISECONDS) 5 - 5 ``` -The `zipWith` also has an overload that allows you to zip your observable sequence with an iterable. +The `zipWith` operator also has an overload that allows zipping an observable with an iterable. ```java Observable.range(0, 5) @@ -661,11 +650,11 @@ Right emits 3 - 2 ``` -As we can see, `combineLatest` first it waits for every sequence to have a value. After that, every value emitted by either observable results in a combined value being emitted. +Evidently, `combineLatest` first waits for every sequence to have a value. After that, every value emitted by either observable results in a combined value being emitted. -Just like every combinator we've seen in this chapter, there are overloads that allow to combine more than two sequences. +Just like every combiner seen so far in this chapter, there are overloads that allow to combine more than two sequences. -I like to think of `combineLatest` as one event occuring in the context of another. `combineLatest` is very useful when consuming input from GUIs, where multiple stateful GUI controls affect the same output. Imagine a text input field, a paragraph that echoes the text and a checkbox that signals to capitalise it or not. Everytime the text field or the checkbox changes, `combineLatest` will combine the text with the decision to capitalise it or not. The end result is ready to be written to the output. +It's convenient to think of `combineLatest` as one event occurring in the context of another. `combineLatest` is very useful when consuming input from GUIs, where multiple stateful GUI controls affect the same output. Imagine a text input field, a paragraph that echoes the text and a checkbox that signals to capitalise it or not. Every time the text field or the checkbox changes, `combineLatest` will combine the text with the decision to capitalise it or not. The end result is ready to be written to the output. From 1c9ed8f3e3fb234c7a1197156b5186a0c2c35ae6 Mon Sep 17 00:00:00 2001 From: logrusx Date: Wed, 13 Nov 2024 19:57:38 +0200 Subject: [PATCH 21/21] Part 3 Chapter 5: Style, speling and grammar improvements --- .../5. Time-shifted sequences.md | 121 +++++++++--------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/Part 3 - Taming the sequence/5. Time-shifted sequences.md b/Part 3 - Taming the sequence/5. Time-shifted sequences.md index 2a1c565..22b5064 100644 --- a/Part 3 - Taming the sequence/5. Time-shifted sequences.md +++ b/Part 3 - Taming the sequence/5. Time-shifted sequences.md @@ -1,14 +1,14 @@ # Time-shifted sequences -One of the key features in Rx is that you don't know when items will be emitted. Some observables will emit everything immediately and synchronously(e.g. `range`), some emit on regular intervals, and some are hard or even impossible to predict. For example, mouse events and UDP packets simply arrive when they arrive. We need tools to decide what to do with those events, not only based on what they are, but also based on when they arrived and at what frequency. +One of the key features in Rx is that it's unknown when items will be emitted. Some observables will emit everything immediately and synchronously(e.g. `range`), some emit on regular intervals, and some are hard or even impossible to predict. For example, mouse events and UDP packets simply arrive when they arrive. Tools are necessary to decide what to do with those events, not only based on what they are, but also based on when they arrived and at what frequency. ## Buffer -`buffer` allows you to collect values and get them in bulks, rather than one at a time. The are several different ways of buffering values. +`buffer` accumulates values and provides them in bulks, rather than one at a time. There are several different ways of buffering values. ### Complete, non-overlapping buffering -First we will examine variants of buffer where every value is buffered exactly once, with no losses and no duplicates. +The easiest form of buffering is the one where every value is buffered exactly ones with no loss and no duplication. #### buffer by count @@ -30,11 +30,11 @@ Observable.range(0, 10) #### buffer by time -The next overload allows you to buffer based on time. Time is divided into windows of equal length. Values are collected for the each window and at the end of each window the buffer is emited. +The next overload fills the buffer based on time. Time is divided into windows of equal length. Values are collected for each window and at the end of it the buffer is emitted. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.png) -In the next example, we produce values every 100ms and buffer them in windows of 250ms. +In the next example, the observable emits values every 100 ms and buffer them in windows of 250 ms. ```java Observable.interval(100, TimeUnit.MILLISECONDS).take(10) @@ -50,11 +50,11 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(10) [9] ``` -The size of a collection here depends on how many values were emitted in that timespan and not on a desired size. The collection can even be empty, if there where no events during the window. +The size of a collection depends on how many values were emitted in that time span and not on a desired size. The collection can even be empty, if there where no events during the window. #### buffer by count and time -You can use both a buffer size and a timespan to buffer values. The buffered values are emitted if either the buffer is full or if the time slot ends and a new one starts. +Both a buffer size and a time span can be used to buffer values. The buffered values are emitted if either the buffer is full or if the time slot ends and a new one starts. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer6.png) @@ -77,11 +77,11 @@ Observable.interval(100, TimeUnit.MILLISECONDS) [9] ``` -We see a lot of empty lists here. This is because the buffer is emitted both when it reaches size 2 and when a time window closes. As we can see from our previous example, only two values belong in those windows. Since the buffer was emptied when it reached size 2, it is empty when the window closes. +There are a lot of empty lists here. This is because the buffer is emitted both when it reaches size 2 and when a time window closes. As show in the previous example, only two values belong in those windows. Since the buffer was emptied when it reached size 2, it is empty when the window closes. #### buffer with signal -Instead of fixed points in time, you can also signal `buffer` with an observable to flush. Every time the signal emits onNext, the values in the buffer will be emitted. Buffering with a signal can be very useful if you want to buffer values until the moment that you are ready for them. +Instead of fixed points in time, `buffer` can also be signaled with an observable to flush. Every time the signal emits onNext, the values in the buffer are be emitted. Buffering with a signal can be very useful if buffering values is necessary until the moment the subscriber is ready for them. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer8.png) @@ -93,19 +93,19 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .subscribe(System.out::println); ``` -There is a variant for the way above, where you provide the signaling observable through a function: `.buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS))`. The difference here is that the function that creates the observable is executed when a subscription happens. You can use to start your signal when the subscription starts. +There is an overload, where the signaling observable is provided through a lambda function: `.buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS))`. The difference here is that the function that creates the observable is executed when a subscription happens. It's also possible to start your signal when the subscription starts. ### Overlapping buffers -Every method for buffering that we've seen has an alternative that allows buffers to overloap or to leave out values. +Every method for buffering above has an alternative that allows buffers to overlap or to leave out values. #### buffer by count -When buffering based on the desired buffer size, you can also declare how far apart the beginings of each buffer are. +When buffering based on the desired buffer size, it's possible to declare how far apart the beginnings of each buffer are too. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer4.png) -As we can see in the marble diagram, we start a new buffer every 3 values but the buffer is limited to 2 values. Therefore, every third element is left out. You can also start the new buffer before the previous buffer closes. +As the marble diagram shows, a new buffer every 3 values is started but the buffer is limited to 2 values. Therefore, every third element is left out. It's also possible to start the new buffer before the previous buffer closes. * When `count` > `skip`, the buffers overlap * When `count` < `skip`, elements are left out * The case of `count` = `skip` is equivalent to the simpler case we saw in the previous subchapter. @@ -125,18 +125,18 @@ Output [9] ``` -As we can see, a new buffer starts every 3 elements, and that buffer contains the next 4 elements. +As the output shows, a new buffer starts every 3 elements, and that buffer contains the next 4 elements. #### buffer by time -We can do a very similar thing for the variant where buffering is based on a timespan. You decide how frequently to open new buffers and how long they should last. +A very similar thing can be done for the variant where buffering is based on a timespan. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.png) -Once again, this allows you either to make your buffers overlap or leave out elements. +Once again, buffers either overlap or leave out elements. * When `timespan` > `timeshift`, the buffers overlap * When `timespan` < `timeshift`, elements are left out -* The case of `timespan` = `timeshift` is equivalent to the simpler case we saw in the previous subchapter. +* The case of `timespan` = `timeshift` is equivalent to the simpler case in the previous subchapter. -In the next example we will create a new buffer every 200ms and have it collect for 350ms. That means that buffers overlap by 150ms. +In the next example a buffer is created every 200 ms and collects for 350 ms. That means that buffers overlap by 150 ms. ```java Observable.interval(100, TimeUnit.MILLISECONDS).take(10) @@ -155,7 +155,7 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(10) #### buffer by signal -The last and most powerful variant of `buffer` allows you to define the start and the end of buffers using signaling observables. +The last and most powerful overload of `buffer` starts and ends buffers using signaling observables. ```java public final Observable> buffer( Observable bufferOpenings, @@ -163,7 +163,7 @@ public final Observable> buffer( ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer2.png) -This function takes two arguments. The first argument, `bufferOpenings`, is an observable. Every time this observable emits a value, a new buffer begins. Along with opening a new buffer, the value which it emitted is passed to the `bufferClosingSelector`, which is a function. This function uses the value to create a new observable, which will signal the end of the corresponding buffer when it emits its first onNext event. +This function takes two arguments. The first argument, `bufferOpenings`, is an observable. Every time this observable emits a value, a new buffer begins. Along with opening a new buffer, the value which it emitted is passed to the `bufferClosingSelector`, which is a function. This function uses the value to create a new observable, which signals the end of the corresponding buffer when it emits its first onNext event. Let's see this in code: ```java @@ -181,17 +181,17 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(10) [9] ``` -We've created an `Observable.interval`, which signals the opening of a new buffer every 250ms. Because observables created with `interval` do not immediately emit a value, and first buffer actually starts at 250ms and the values before were lost. For the closing of a buffer, we provided a lambda function that took every value emitted by `bufferOpenings`. The values generated by `interval` are the natural progression 0,1,2,3... but we don't actually use the value, because such an example would be too complicated. Instead, we just created an observable that waits 200ms and then emits a single value. That means that each buffer last exactly 200ms, similarily to buffering by time. +The supplied `Observable.interval` signals the opening of a new buffer every 250 ms. Because observables created with `interval` do not immediately emit a value, and first buffer actually starts at 250 ms and the values emitted during that time are lost. For the closing of a buffer a lamba function function that takes every value emitted by `bufferOpenings` is supplied. The values generated by `interval` are the natural progression 0,1,2,3... but they aren't used since such an example would be too complicated. Instead, an observable that waits 200 ms and then emits a single value is supplied. That means that each buffer lasts exactly 200 ms, similarity to buffering by time. ### takeLastBuffer -We have already seen the [takeLast](/Part%202%20-%20Sequence%20Basics/2.%20Reducing%20a%20sequence.md#skiplast-and-takelast) operator, which returns the last N number of items. Internally, `takeLast` needs to buffer items and re-emits them when the source sequence ends. The `takeLastBuffer` operator returns the last elements as one buffer. +The [takeLast](/Part%202%20-%20Sequence%20Basics/2.%20Reducing%20a%20sequence.md#skiplast-and-takelast) operator, which returns the last N number of items was already discussed in Part 2. Internally, `takeLast` needs to buffer items and re-emits them when the source sequence ends. The `takeLastBuffer` operator returns the last elements as one buffer. #### By count ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLastBuffer.png) -`takeLastBuffer` by count will emit the last `N` elements in a list. +`takeLastBuffer` by count emits the last `N` elements in a list. ```java Observable.range(0, 5) @@ -207,7 +207,7 @@ Observable.range(0, 5) ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLastBuffer.tn.png) -`takeLastBuffer` by time will emit, as a buffer, the items that were received during the specified timespan, which is measure from the end of the source sequence. +`takeLastBuffer` by time emits the items that were received during the specified timespan as a buffer, which is measured from the end of the source sequence. ```java Observable.interval(100, TimeUnit.MILLISECONDS) @@ -222,7 +222,7 @@ Observable.interval(100, TimeUnit.MILLISECONDS) #### By count and time -The buffer emitted by this overload of `takeLastBuffer` will contain items that were emitted over the specified timespan before the end. If this window contains more than the specified number of items, the buffer will contain the last `N` items. +The buffer emitted by this overload of `takeLastBuffer` contains items that were emitted over the specified timespan before the end. If this window contains more than the specified number of items, the buffer emits only the last `N` items. ```java Observable.interval(100, TimeUnit.MILLISECONDS) @@ -235,16 +235,16 @@ Observable.interval(100, TimeUnit.MILLISECONDS) [3, 4] ``` -As we saw in the previous example, the last 200ms include three values. With `.takeLastBuffer(2, 200, TimeUnit.MILLISECONDS)` we specified that we want values from the last 200ms, but no more than 2 values. For that reason, we only get the last two values. +As evident from the previous example, the last 200 ms include three values. `.takeLastBuffer(2, 200, TimeUnit.MILLISECONDS)` specifies values will be collected during the last 200 ms, limited to 2 values. For that reason, only the last two values are printed. ## Delay -`delay`, as the name suggests, will postpone the emission of values for a specified amount of time. The are two ways to do that. One is by storing values until you are ready to emit them. The other is to delay the subscription to observable. +`delay`, as the name suggests, postpones the emission of values for a specified amount of time. The are two ways to do that. One is to store values for as long as necessary. The other is to delay the subscription to observable. ### delay -The simplest overload of `delay` will delay every item by the same amount of time. You can think of it as delaying the beginning of the sequence, while maintaining the time intervals between successive elements. +The simplest overload of `delay` delays every item by the same amount of time. It's like delaying the beginning of the sequence, while maintaining the time intervals between successive elements. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png) @@ -265,11 +265,11 @@ TimeInterval [intervalInMilliseconds=100, value=3] TimeInterval [intervalInMilliseconds=101, value=4] ``` -We created 5 values spaced 100ms apart and then we delayed the sequence by 1s. We can see that the first value takes ~(1000 + 100)ms and the next values take 100ms each. +Five values spaced 100 ms apart are created and then the sequence is delayed by 1 second. First value takes around 1000 + 100 ms and the next values take 100 ms each. -You can also delay each value individually. +Each value can also delayed individually. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.o.png) -This overload takes a function which will create an observable for each item. When that observable emits onNext, the corresponding item is emitted in the delayed sequence. Here's some code: +This overload takes a function which creates an observable for each item. When that observable emits onNext, the corresponding item is emitted in the delayed sequence. ```java Observable.interval(100, TimeUnit.MILLISECONDS).take(5) @@ -286,11 +286,11 @@ TimeInterval [intervalInMilliseconds=201, value=3] TimeInterval [intervalInMilliseconds=199, value=4] ``` -The initial sequence is spaced 100ms apart, while the resulting is 200ms. If you remember, `interval` emits the numbers i = 1,2,3,etc. We delay each item `i` by `i*100`, so the first item is delayed by 100ms, then second by 200ms, the third by 300ms. The difference between the successive delays is 100ms. Added to the initial 100ms interval, that results in 200ms interval between items. +The initial sequence is spaced 100 ms apart, while the resulting is 200 ms. If you remember, `interval` emits the numbers i = 1,2,3,etc. We delay each item `i` by `i*100`, so the first item is delayed by 100 ms, then second by 200 ms, the third by 300 ms. The difference between the successive delays is 100 ms. Added to the initial 100 ms interval, that results in 200 ms interval between items. ### delaySubscription -Rather than storing values and emitting them later, you can delay the subscription altogether. This will have a different effect depending on if the observable is hot or cold. This will be discussed more in the [Hot and cold observables](/Part 3 - Taming the sequence/6. Hot and Cold observables.md) chapter. For our examples so far, the observables are cold and subscription event is when the source observable is created (i.e. the begining of the sequence). What that means is that there is no difference in the sequences between delaying each item by the same amount and delaying the subscription. Since that is the case here, delaying the subscription is more efficient, since the operator doesn't need to buffer items internally. +Rather than storing values and emitting them later, the subscription can be delayed altogether. This will have a different effect depending on if the observable is hot or cold. Details can be found in [Hot and cold observables](/Part%203%20-%20Taming%20the%20sequence/6.%20Hot%20and%20Cold%20observables.md) chapter. For the examples so far, the observables are cold and subscription event is when the source observable is created (i.e. the beginning of the sequence). What that means is that there is no difference in the sequences between delaying each item by the same amount and delaying the subscription. Since that is the case here, delaying the subscription is more efficient, since the operator doesn't need to buffer items internally. Let's see code for the different overloads for delaying a subscription ```java @@ -308,9 +308,9 @@ TimeInterval [intervalInMilliseconds=100, value=3] TimeInterval [intervalInMilliseconds=99, value=4] ``` -What we see here is that the subscription of the `interval` observable (i.e. its creation) was delayed by 1000ms. After that, the sequence goes as defined. +Output demonstrates the `interval` observable (i.e. its creation) subscription was delayed by 1000 ms. After that, the sequence goes as defined. -You can also delay the subscription based on a signaling observable through the following overload: +A subscription can also be delayed based on a signaling observable through the following overload: ```java public final Observable delaySubscription(Func0> subscriptionDelay) ``` @@ -326,7 +326,7 @@ Observable.interval(100, TimeUnit.MILLISECONDS).take(5) ### delay values and subscription -The last method in this category allows you to delay both the subscription and each item individually. +The last method in this category delays both the subscription and each item individually. ```java public final Observable delay( Func0> subscriptionDelay, @@ -334,11 +334,11 @@ public final Observable delay( ``` ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.oo.png) -This combines two delay variants we've already seen into one. The first argument is a function that creates an observable that will signal when to perform the subscription. The second argument takes every item and decides how long it should be delayed. +This is a combination of the two ways previously discussed. The first argument is a function that creates an observable that will signal when to perform the subscription. The second argument takes every item and decides how long it should be delayed. ## Sample -`sample` allows you to thin-out a sequence by dividing it into time windows and taking only one value out of each window. When each window ends, the last value within that window (if any) is emitted. +`sample` thins out a sequence by dividing it into time windows and taking only one value out of each window. When each window ends, the last value within that window (if any) is emitted. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png) @@ -355,7 +355,7 @@ Observable.interval(150, TimeUnit.MILLISECONDS) ... ``` -The division of time doesn't have to be uniform. You can specify the end of each part with a signaling observable. +The division of time isn't necessarily uniform. The end of each part can be signaled with an observable. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png) @@ -368,11 +368,11 @@ Observable.interval(150, TimeUnit.MILLISECONDS) ## Throttling -Throttling is also intended for thinning out a sequence. When the producer emits more values than we want and we don't need every sequential value, we can thin out the sequence by throttling it. +Throttling is also intended for thinning out a sequence. When the producer emits more values than than desired and not every sequential value is needed, the sequence can be thinned out by throttling it. ### throttleFirst -The `throttleFirst` operators filter out values relative to the values that were already accepted. After a value has been accepted, values will be rejected for the duration of the window. Once The window expires, the next value will be accepted and a new window starts. +The `throttleFirst` operator filters out values relative to the values that were already accepted. After a value has been accepted, values will be rejected for the duration of the window. Once The window expires, the next value will be accepted and a new window starts. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png) @@ -389,11 +389,11 @@ Observable.interval(150, TimeUnit.MILLISECONDS) ... ``` -Here, `interval` emits every 150ms. The values seen as output were emitted at `(i+1)*150`ms, relative to the start of the sequence. The first item is emitted at 150ms and is accepted by default. Now items are rejected for the next 1000ms. The first item after that comes at 1200ms. Again, items are rejected for the next 1000ms, so the next item comes at 2250ms. +Here, `interval` emits every 150 ms. The values seen as output were emitted at `(i+1)*150` ms, relative to the start of the sequence. The first item is emitted at 150 ms and is accepted by default. Now items are rejected for the next 1000 ms. The first item after that comes at 1200 ms. Again, items are rejected for the next 1000 ms, so the next item comes at 2250 ms. ### throttleLast -The `throttleLast` operator divides time at regular intervals, rather than relative to the last item. it emits the last value in each window, rather than the first after it. +The `throttleLast` operator divides time at regular intervals, rather than relative to the last item. It emits the last value in each window, rather than the first after it. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.png) @@ -410,15 +410,15 @@ Observable.interval(150, TimeUnit.MILLISECONDS) ... ``` -Here, a window starts with the creation of the sequence at 0ms. That window expires at 1000ms and the last value in that window was at 900ms. The next window last 1000ms until 2000ms. The last item in that window is at 1950. In the next window, the item is at 2850ms. +Here, a window starts with the creation of the sequence at 0 ms. That window expires at 1000 ms and the last value in that window was at 900 ms. The next window last 1000 ms until 2000 ms. The last item in that window is at 1950. In the next window, the item is at 2850 ms. ## Debouncing -In this operator, a time window starts every time a value is received. Once the window expires, the value is emitted. If, however, another value is received before the window expires, the previous value is rejected and the window restarts for the next value. +In this operator, a time window starts every time a value is received. Once the window expires, the value is emitted. If, however, another value is received before the window expires, the previous value is discarded and the window restarts for the next value. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.png) -Demonstrating this is a bit more complicated, since an `interval` observable will either have all of its values accepted or only its last value accepted (which is never if the observable is infinite). For that reason we will construct a more complicated observable. +Demonstrating this is a bit more complicated, since an `interval` observable will either have all of its values accepted or only its last value accepted (which is never if the observable is infinite). For that reason a more complex observable has to be constructed: ```java Observable.concat( @@ -444,9 +444,9 @@ TimeInterval [intervalInMilliseconds=99, value=8] TimeInterval [intervalInMilliseconds=101, value=9] ``` -As we can see here, our observable will emit 4 values in quick succession, then 3 values in greater intervals and finally 3 values in quick succession. The `scan` only serves to turn the values into the natural sequence, rather than 3 repetitions of 1,2,3. The reason the first two emissions are simultaneous is that that `scan` emits the initial value and the first value together. +The observable emits 4 values in quick succession, then 3 values in greater intervals and finally 3 values in quick succession. The `scan` only serves to turn the values into the natural sequence, rather than 3 repetitions of 1,2,3. The reason the first two emissions are simultaneous is that `scan` emits the initial value and the first value together. -Now that we understand our source observable, let's `debounce` it: +Here's how `debounce` works: ```java Observable.concat( @@ -466,16 +466,15 @@ Observable.concat( 9 ``` -We debounced with a window of 150ms. The bursts of emissions in our observable were faster than that (100ms), so only the last value in each burst passed through. During the slower part of our observable, all the values were accepted, because the 150ms window expired before the next value arrived. +The window of debouncing is 150 ms. The bursts of emissions in the observable were faster than that (100 ms), so only the last value in each burst passed through. During the slower part of our observable, all the values were accepted, because the 150 ms window expired before the next value arrived. There is a `throttleWithTimeout` operator which has the same behaviour as the `debounce` operator that we just saw. One is practically an alias of the other, even though neither is officially declared as such in the documentation. -You can also debounce based on a per item basis. In this case, you provide a function that calculates for each item how long the window should be after it. You signal that the window is using a new observable for each item -. When the observable terminates, the window expires. +Debouncing can be done on a per item basis. In this case, `debounce` takes a function that calculates how long the window should be after each item. The window is using a new observable for each item. When the observable terminates, the window expires. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.f.png) -In the next example, the window size for each value `i` is `i*50`ms. +In the next example, the window size for each value `i` is `i*50` ms. ```java Observable.concat( @@ -496,7 +495,7 @@ Observable.concat( 9 ``` -Let's map each item to the length of its window and the time that the next item actually arrives +This is a map of each item to the length of its window and the time that the next item actually arrives: Item | Calculated Window | Time until next value | Window < next value ---- | ----------------- | --------------------- | ------------------- @@ -511,19 +510,19 @@ Item | Calculated Window | Time until next value | Window < next value 8 | 400 | 101 | 9 | 450 | | Yes -We can now see why the values turned out to be so. +It's now clear why the values turned out to be so. -This operator is useful against observables that undergo periods of uncertainty, where the value changes frequently from one non-definitive state to another. For example, imagine that you are monitoring the contents of a text field and you want to offer suggestions based on what the user is writting. You could recompute your suggestions on every keystroke, but that would be too noisy and too costly. If, instead, you `debounce` the changes to the text field, you will offer suggestions only when the user has paused or finished typing. +This operator is useful against observables that undergo periods of uncertainty, where the value changes frequently from one non-definitive state to another. For example, imagine the content of a text field is monitored and suggestions need to be offered based on what the user is writing. Suggestions can be recomputed on every keystroke, but that would be too noisy and too costly. Instead, debouncing the changes to the text field, offer suggestions only when the user has paused or finished typing. ## Timeout -`timeout` is used to detect observables that have remained inactive for a given amount of time. If a specified amount of time passes without the source emitting any items, `timeout` makes the observable fail with a `TimeoutException`. +`timeout` is used to detect observables that have remained inactive for a given period of time. If a specified amount of time passes without the source emitting any items, `timeout` makes the observable fail with a `TimeoutException`. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1.png) -We will reuse our composite observable from the examples of `debounce` to demonstrate `timeout`. +The composite observable from the examples of `debounce` is used to demonstrate `timeout`. ```java Observable.concat( @@ -546,9 +545,9 @@ Observable.concat( java.util.concurrent.TimeoutException ``` -The output mirrors the source observable for as long as values come more frequently than 200ms. As soon as a value takes more than that to arrive, an error is pushed. +The output mirrors the source observable for as long as values come more frequently than 200 ms. As soon as a value takes more than that to arrive, an error is pushed. -Instead of failing, you can provide a fallback observable. When a timeout occures, the resulting observable will switch to the fallback. The original observable will be ignored from then on, even if it resumes. +Instead of failing, a fallback observable can be supplied. When a timeout occurs, the resulting observable switches to the fallback. The original observable is ignored from then on, even if it resumes. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2.png) @@ -573,7 +572,7 @@ Observable.concat( -1 ``` -You can also specify the timeout window per item. In that case, you provide a function that creates an observable for each value. When the observable terminates, that is the signal for the timeout. If no values had been emitted until that, that triggers the timeout. +Timeout window per item can be specified as well. In that case, a function that creates an observable for each value should be supplied. When the observable terminates, that is the signal for the timeout. If no values had been emitted until that, that triggers the timeout. ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout5.png) @@ -591,7 +590,7 @@ Observable.concat( System.out::println, System.out::println); ``` -Again, you can provide a fallback observable with +A fallback observable can be provided here as well: ```java .timeout(i -> Observable.timer(200, TimeUnit.MILLISECONDS), Observable.just(-1)) ```