diff --git a/pom.xml b/pom.xml index 2ffb4e8..6e68d92 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,11 @@ rxjava 1.0.0-rc.10 + + io.reactivex.rxjava2 + rxjava + 2.1.7 + junit junit diff --git a/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java b/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java index 120f53d..7194c4b 100644 --- a/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java +++ b/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java @@ -1,6 +1,6 @@ package learnrxjava.examples; -import rx.Observable; +import io.reactivex.Observable; public class ErrorHandlingBasics { diff --git a/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java b/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java index b6a61a8..7033ade 100644 --- a/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java +++ b/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java @@ -2,7 +2,8 @@ import java.util.concurrent.TimeUnit; -import rx.Observable; +import io.reactivex.Observable; + public class ErrorHandlingRetryWithBackoff { @@ -29,7 +30,7 @@ public static void main(String... args) { System.out.println("2) delay retry by " + i + " second(s)"); return Observable.timer(i, TimeUnit.SECONDS); }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries"))); - }).toBlocking().forEach(System.out::println); + }).blockingForEach(System.out::println); } } diff --git a/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java b/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java index ff92ace..7caba26 100644 --- a/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java +++ b/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java @@ -2,9 +2,10 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.schedulers.Schedulers; + public class FlowControlDebounceBuffer { @@ -20,18 +21,18 @@ public static void main(String args[]) { // then the buffered one that uses the debounced stream to demark window start/stop Observable> buffered = burstStream.buffer(debounced); // then we subscribe to the buffered stream so it does what we want - buffered.toBlocking().forEach(System.out::println); + buffered.blockingForEach(System.out::println); } /** * This is an artificial source to demonstrate an infinite stream that bursts intermittently */ public static Observable intermittentBursts() { - return Observable.create((Subscriber s) -> { - while (!s.isUnsubscribed()) { + return Observable.create((ObservableEmitter emitter) -> { + while (!emitter.isDisposed()) { // burst some number of items for (int i = 0; i < Math.random() * 20; i++) { - s.onNext(i); + emitter.onNext(i); } try { // sleep for a random amount of time diff --git a/src/main/java/learnrxjava/examples/FlowControlSampleExample.java b/src/main/java/learnrxjava/examples/FlowControlSampleExample.java index d97b31e..647970e 100644 --- a/src/main/java/learnrxjava/examples/FlowControlSampleExample.java +++ b/src/main/java/learnrxjava/examples/FlowControlSampleExample.java @@ -1,23 +1,25 @@ package learnrxjava.examples; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.schedulers.Schedulers; + + public class FlowControlSampleExample { public static void main(String args[]) { - hotStream().sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println); + hotStream().sample(500, TimeUnit.MILLISECONDS).blockingForEach(System.out::println); } /** * This is an artificial source to demonstrate an infinite stream that emits randomly */ public static Observable hotStream() { - return Observable.create((Subscriber s) -> { + return Observable.create((ObservableEmitter s) -> { int i = 0; - while (!s.isUnsubscribed()) { + while (!s.isDisposed()) { s.onNext(i++); try { // sleep for a random amount of time diff --git a/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java b/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java index 31bcb22..07d69ea 100644 --- a/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java +++ b/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java @@ -1,27 +1,29 @@ package learnrxjava.examples; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.schedulers.Schedulers; + + public class FlowControlThrottleExample { public static void main(String args[]) { // first item emitted in each time window - hotStream().throttleFirst(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println); + hotStream().throttleFirst(500, TimeUnit.MILLISECONDS).take(10).blockingForEach(System.out::println); // last item emitted in each time window - hotStream().throttleLast(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println); + hotStream().throttleLast(500, TimeUnit.MILLISECONDS).take(10).blockingForEach(System.out::println); } /** * This is an artificial source to demonstrate an infinite stream that emits randomly */ public static Observable hotStream() { - return Observable.create((Subscriber s) -> { + return Observable.create((ObservableEmitter s) -> { int i = 0; - while (!s.isUnsubscribed()) { + while (!s.isDisposed()) { s.onNext(i++); try { // sleep for a random amount of time diff --git a/src/main/java/learnrxjava/examples/FlowControlWindowExample.java b/src/main/java/learnrxjava/examples/FlowControlWindowExample.java index 418c2f6..c95a914 100644 --- a/src/main/java/learnrxjava/examples/FlowControlWindowExample.java +++ b/src/main/java/learnrxjava/examples/FlowControlWindowExample.java @@ -2,18 +2,20 @@ import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.schedulers.Schedulers; + + public class FlowControlWindowExample { public static void main(String args[]) { // buffer every 500ms (using 999999999 to mark start of output) - hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println); + hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).blockingForEach(System.out::println); // buffer 10 items at a time (using 999999999 to mark start of output) - hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println); + hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).blockingForEach(System.out::println); System.out.println("Done"); } @@ -22,8 +24,8 @@ public static void main(String args[]) { * This is an artificial source to demonstrate an infinite stream that bursts intermittently */ public static Observable hotStream() { - return Observable.create((Subscriber s) -> { - while (!s.isUnsubscribed()) { + return Observable.create((ObservableEmitter s) -> { + while (!s.isDisposed()) { // burst some number of items for (int i = 0; i < Math.random() * 20; i++) { s.onNext(i); diff --git a/src/main/java/learnrxjava/examples/HelloWorld.java b/src/main/java/learnrxjava/examples/HelloWorld.java index 6694b9f..e708f78 100644 --- a/src/main/java/learnrxjava/examples/HelloWorld.java +++ b/src/main/java/learnrxjava/examples/HelloWorld.java @@ -1,10 +1,11 @@ package learnrxjava.examples; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Subscriber; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.functions.Consumer; +import io.reactivex.schedulers.Schedulers; + public class HelloWorld { @@ -13,7 +14,7 @@ public static void main(String[] args) { // Hello World Observable.create(subscriber -> { subscriber.onNext("Hello World!"); - subscriber.onCompleted(); + subscriber.onComplete(); }).subscribe(System.out::println); // shorten by using helper method @@ -27,96 +28,85 @@ public static void main(String[] args) { () -> System.out.println("Done")); // expand to show full classes - Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber subscriber) { - subscriber.onNext("Hello World!"); - subscriber.onCompleted(); - } - - }).subscribe(new Subscriber() { - - @Override - public void onCompleted() { - System.out.println("Done"); - } - - @Override - public void onError(Throwable e) { - e.printStackTrace(); - } - - @Override - public void onNext(String t) { + ObservableOnSubscribe handler = emitter -> { + emitter.onNext("Hello World!"); + emitter.onComplete(); + }; + + Observable.create(handler).subscribe(new Consumer() { + + @Override + public void accept(String t) throws Exception { System.out.println(t); - } - - }); - - // add error propagation - Observable.create(subscriber -> { - try { - subscriber.onNext("Hello World!"); - subscriber.onCompleted(); - } catch (Exception e) { - subscriber.onError(e); - } + } + }); + + + Observable.create(emitter -> { + + try { + emitter.onNext("Hello World!"); + emitter.onComplete(); + }catch(Exception ex) { + emitter.onError(ex); + } + }).subscribe(System.out::println); // add concurrency (manually) - Observable.create(subscriber -> { + Observable.create(emitter -> { new Thread(() -> { try { - subscriber.onNext(getData()); - subscriber.onCompleted(); + emitter.onNext(getData()); + emitter.onComplete(); } catch (Exception e) { - subscriber.onError(e); + emitter.onError(e); } }).start(); }).subscribe(System.out::println); // add concurrency (using a Scheduler) - Observable.create(subscriber -> { + Observable.create(emitter -> { try { - subscriber.onNext(getData()); - subscriber.onCompleted(); + emitter.onNext(getData()); + emitter.onComplete(); } catch (Exception e) { - subscriber.onError(e); + emitter.onError(e); } }).subscribeOn(Schedulers.io()) .subscribe(System.out::println); // add operator - Observable.create(subscriber -> { + Observable.create(emitter -> { try { - subscriber.onNext(getData()); - subscriber.onCompleted(); + emitter.onNext(getData()); + emitter.onComplete(); } catch (Exception e) { - subscriber.onError(e); + emitter.onError(e); } }).subscribeOn(Schedulers.io()) .map(data -> data + " --> at " + System.currentTimeMillis()) .subscribe(System.out::println); // add error handling - Observable.create(subscriber -> { + Observable.create(emitter -> { try { - subscriber.onNext(getData()); - subscriber.onCompleted(); + emitter.onNext(getData()); + emitter.onComplete(); } catch (Exception e) { - subscriber.onError(e); + emitter.onError(e); } }).subscribeOn(Schedulers.io()) .map(data -> data + " --> at " + System.currentTimeMillis()) - .onErrorResumeNext(e -> Observable.just("Fallback Data")) + .onErrorResumeNext(e -> { + return Observable.just("Fallback Data");}) .subscribe(System.out::println); // infinite - Observable.create(subscriber -> { + Observable.create(emitter -> { int i = 0; - while (!subscriber.isUnsubscribed()) { - subscriber.onNext(i++); + while (!emitter.isDisposed()) { + emitter.onNext(i++); } }).take(10).subscribe(System.out::println); diff --git a/src/main/java/learnrxjava/examples/ParallelExecution.java b/src/main/java/learnrxjava/examples/ParallelExecution.java index 92d710b..4d939e1 100644 --- a/src/main/java/learnrxjava/examples/ParallelExecution.java +++ b/src/main/java/learnrxjava/examples/ParallelExecution.java @@ -1,7 +1,9 @@ package learnrxjava.examples; -import rx.Observable; -import rx.Subscriber; -import rx.schedulers.Schedulers; + +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.schedulers.Schedulers; public class ParallelExecution { @@ -24,35 +26,35 @@ public static void main(String[] args) { } private static void mergingAsync() { - Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); + Observable.merge(getDataAsync(1), getDataAsync(2)).blockingForEach(System.out::println); } private static void mergingSync() { // here you'll see the delay as each is executed synchronously - Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); + Observable.merge(getDataSync(1), getDataSync(2)).blockingForEach(System.out::println); } private static void mergingSyncMadeAsync() { // if you have something synchronous and want to make it async, you can schedule it like this // so here we see both executed concurrently - Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); + Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).blockingForEach(System.out::println); } private static void flatMapExampleAsync() { Observable.range(0, 5).flatMap(i -> { return getDataAsync(i); - }).toBlocking().forEach(System.out::println); + }).blockingForEach(System.out::println); } private static void flatMapExampleSync() { Observable.range(0, 5).flatMap(i -> { return getDataSync(i); - }).toBlocking().forEach(System.out::println); + }).blockingForEach(System.out::println); } private static void flatMapBufferedExampleAsync() { Observable.range(0, 5000).buffer(500).flatMap(i -> { - return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> { + return Observable.fromIterable(i).subscribeOn(Schedulers.computation()).map(item -> { // simulate computational work try { Thread.sleep(1); @@ -60,7 +62,7 @@ private static void flatMapBufferedExampleAsync() { } return item + " processed " + Thread.currentThread(); }); - }).toBlocking().forEach(System.out::println); + }).blockingForEach(System.out::println); } private static void flatMapWindowedExampleAsync() { @@ -73,7 +75,7 @@ private static void flatMapWindowedExampleAsync() { } return item + " processed " + Thread.currentThread(); }); - }).toBlocking().forEach(System.out::println); + }).blockingForEach(System.out::println); } // artificial representations of IO work @@ -82,15 +84,20 @@ static Observable getDataAsync(int i) { } static Observable getDataSync(int i) { - return Observable.create((Subscriber s) -> { - // simulate latency - try { - Thread.sleep(1000); - } catch (Exception e) { - e.printStackTrace(); - } - s.onNext(i); - s.onCompleted(); - }); + + return Observable.create(new ObservableOnSubscribe() { + + @Override + public void subscribe(ObservableEmitter emitter) throws Exception { + try { + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + } + emitter.onNext(i); + emitter.onComplete(); + + } + }); } }