diff --git a/pom.xml b/pom.xml
index 2ffb4e8..6e68d92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,6 +13,11 @@
rxjava
1.0.0-rc.10
+
+ io.reactivex.rxjava2
+ rxjava
+ 2.1.7
+
junit
junit
diff --git a/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java b/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java
index 120f53d..7194c4b 100644
--- a/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java
+++ b/src/main/java/learnrxjava/examples/ErrorHandlingBasics.java
@@ -1,6 +1,6 @@
package learnrxjava.examples;
-import rx.Observable;
+import io.reactivex.Observable;
public class ErrorHandlingBasics {
diff --git a/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java b/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java
index b6a61a8..7033ade 100644
--- a/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java
+++ b/src/main/java/learnrxjava/examples/ErrorHandlingRetryWithBackoff.java
@@ -2,7 +2,8 @@
import java.util.concurrent.TimeUnit;
-import rx.Observable;
+import io.reactivex.Observable;
+
public class ErrorHandlingRetryWithBackoff {
@@ -29,7 +30,7 @@ public static void main(String... args) {
System.out.println("2) delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
}).concatWith(Observable.error(new RuntimeException("Failed after 3 retries")));
- }).toBlocking().forEach(System.out::println);
+ }).blockingForEach(System.out::println);
}
}
diff --git a/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java b/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java
index ff92ace..7caba26 100644
--- a/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java
+++ b/src/main/java/learnrxjava/examples/FlowControlDebounceBuffer.java
@@ -2,9 +2,10 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.schedulers.Schedulers;
+
public class FlowControlDebounceBuffer {
@@ -20,18 +21,18 @@ public static void main(String args[]) {
// then the buffered one that uses the debounced stream to demark window start/stop
Observable> buffered = burstStream.buffer(debounced);
// then we subscribe to the buffered stream so it does what we want
- buffered.toBlocking().forEach(System.out::println);
+ buffered.blockingForEach(System.out::println);
}
/**
* This is an artificial source to demonstrate an infinite stream that bursts intermittently
*/
public static Observable intermittentBursts() {
- return Observable.create((Subscriber super Integer> s) -> {
- while (!s.isUnsubscribed()) {
+ return Observable.create((ObservableEmitter emitter) -> {
+ while (!emitter.isDisposed()) {
// burst some number of items
for (int i = 0; i < Math.random() * 20; i++) {
- s.onNext(i);
+ emitter.onNext(i);
}
try {
// sleep for a random amount of time
diff --git a/src/main/java/learnrxjava/examples/FlowControlSampleExample.java b/src/main/java/learnrxjava/examples/FlowControlSampleExample.java
index d97b31e..647970e 100644
--- a/src/main/java/learnrxjava/examples/FlowControlSampleExample.java
+++ b/src/main/java/learnrxjava/examples/FlowControlSampleExample.java
@@ -1,23 +1,25 @@
package learnrxjava.examples;
import java.util.concurrent.TimeUnit;
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.schedulers.Schedulers;
+
+
public class FlowControlSampleExample {
public static void main(String args[]) {
- hotStream().sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
+ hotStream().sample(500, TimeUnit.MILLISECONDS).blockingForEach(System.out::println);
}
/**
* This is an artificial source to demonstrate an infinite stream that emits randomly
*/
public static Observable hotStream() {
- return Observable.create((Subscriber super Integer> s) -> {
+ return Observable.create((ObservableEmitter s) -> {
int i = 0;
- while (!s.isUnsubscribed()) {
+ while (!s.isDisposed()) {
s.onNext(i++);
try {
// sleep for a random amount of time
diff --git a/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java b/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java
index 31bcb22..07d69ea 100644
--- a/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java
+++ b/src/main/java/learnrxjava/examples/FlowControlThrottleExample.java
@@ -1,27 +1,29 @@
package learnrxjava.examples;
import java.util.concurrent.TimeUnit;
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.schedulers.Schedulers;
+
+
public class FlowControlThrottleExample {
public static void main(String args[]) {
// first item emitted in each time window
- hotStream().throttleFirst(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println);
+ hotStream().throttleFirst(500, TimeUnit.MILLISECONDS).take(10).blockingForEach(System.out::println);
// last item emitted in each time window
- hotStream().throttleLast(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println);
+ hotStream().throttleLast(500, TimeUnit.MILLISECONDS).take(10).blockingForEach(System.out::println);
}
/**
* This is an artificial source to demonstrate an infinite stream that emits randomly
*/
public static Observable hotStream() {
- return Observable.create((Subscriber super Integer> s) -> {
+ return Observable.create((ObservableEmitter s) -> {
int i = 0;
- while (!s.isUnsubscribed()) {
+ while (!s.isDisposed()) {
s.onNext(i++);
try {
// sleep for a random amount of time
diff --git a/src/main/java/learnrxjava/examples/FlowControlWindowExample.java b/src/main/java/learnrxjava/examples/FlowControlWindowExample.java
index 418c2f6..c95a914 100644
--- a/src/main/java/learnrxjava/examples/FlowControlWindowExample.java
+++ b/src/main/java/learnrxjava/examples/FlowControlWindowExample.java
@@ -2,18 +2,20 @@
import java.util.concurrent.TimeUnit;
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.schedulers.Schedulers;
+
+
public class FlowControlWindowExample {
public static void main(String args[]) {
// buffer every 500ms (using 999999999 to mark start of output)
- hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
+ hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).blockingForEach(System.out::println);
// buffer 10 items at a time (using 999999999 to mark start of output)
- hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
+ hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).blockingForEach(System.out::println);
System.out.println("Done");
}
@@ -22,8 +24,8 @@ public static void main(String args[]) {
* This is an artificial source to demonstrate an infinite stream that bursts intermittently
*/
public static Observable hotStream() {
- return Observable.create((Subscriber super Integer> s) -> {
- while (!s.isUnsubscribed()) {
+ return Observable.create((ObservableEmitter s) -> {
+ while (!s.isDisposed()) {
// burst some number of items
for (int i = 0; i < Math.random() * 20; i++) {
s.onNext(i);
diff --git a/src/main/java/learnrxjava/examples/HelloWorld.java b/src/main/java/learnrxjava/examples/HelloWorld.java
index 6694b9f..e708f78 100644
--- a/src/main/java/learnrxjava/examples/HelloWorld.java
+++ b/src/main/java/learnrxjava/examples/HelloWorld.java
@@ -1,10 +1,11 @@
package learnrxjava.examples;
import java.util.concurrent.TimeUnit;
-import rx.Observable;
-import rx.Observable.OnSubscribe;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.ObservableOnSubscribe;
+import io.reactivex.functions.Consumer;
+import io.reactivex.schedulers.Schedulers;
+
public class HelloWorld {
@@ -13,7 +14,7 @@ public static void main(String[] args) {
// Hello World
Observable.create(subscriber -> {
subscriber.onNext("Hello World!");
- subscriber.onCompleted();
+ subscriber.onComplete();
}).subscribe(System.out::println);
// shorten by using helper method
@@ -27,96 +28,85 @@ public static void main(String[] args) {
() -> System.out.println("Done"));
// expand to show full classes
- Observable.create(new OnSubscribe() {
-
- @Override
- public void call(Subscriber super String> subscriber) {
- subscriber.onNext("Hello World!");
- subscriber.onCompleted();
- }
-
- }).subscribe(new Subscriber() {
-
- @Override
- public void onCompleted() {
- System.out.println("Done");
- }
-
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
-
- @Override
- public void onNext(String t) {
+ ObservableOnSubscribe handler = emitter -> {
+ emitter.onNext("Hello World!");
+ emitter.onComplete();
+ };
+
+ Observable.create(handler).subscribe(new Consumer() {
+
+ @Override
+ public void accept(String t) throws Exception {
System.out.println(t);
- }
-
- });
-
- // add error propagation
- Observable.create(subscriber -> {
- try {
- subscriber.onNext("Hello World!");
- subscriber.onCompleted();
- } catch (Exception e) {
- subscriber.onError(e);
- }
+ }
+ });
+
+
+ Observable.create(emitter -> {
+
+ try {
+ emitter.onNext("Hello World!");
+ emitter.onComplete();
+ }catch(Exception ex) {
+ emitter.onError(ex);
+ }
+
}).subscribe(System.out::println);
// add concurrency (manually)
- Observable.create(subscriber -> {
+ Observable.create(emitter -> {
new Thread(() -> {
try {
- subscriber.onNext(getData());
- subscriber.onCompleted();
+ emitter.onNext(getData());
+ emitter.onComplete();
} catch (Exception e) {
- subscriber.onError(e);
+ emitter.onError(e);
}
}).start();
}).subscribe(System.out::println);
// add concurrency (using a Scheduler)
- Observable.create(subscriber -> {
+ Observable.create(emitter -> {
try {
- subscriber.onNext(getData());
- subscriber.onCompleted();
+ emitter.onNext(getData());
+ emitter.onComplete();
} catch (Exception e) {
- subscriber.onError(e);
+ emitter.onError(e);
}
}).subscribeOn(Schedulers.io())
.subscribe(System.out::println);
// add operator
- Observable.create(subscriber -> {
+ Observable.create(emitter -> {
try {
- subscriber.onNext(getData());
- subscriber.onCompleted();
+ emitter.onNext(getData());
+ emitter.onComplete();
} catch (Exception e) {
- subscriber.onError(e);
+ emitter.onError(e);
}
}).subscribeOn(Schedulers.io())
.map(data -> data + " --> at " + System.currentTimeMillis())
.subscribe(System.out::println);
// add error handling
- Observable.create(subscriber -> {
+ Observable.create(emitter -> {
try {
- subscriber.onNext(getData());
- subscriber.onCompleted();
+ emitter.onNext(getData());
+ emitter.onComplete();
} catch (Exception e) {
- subscriber.onError(e);
+ emitter.onError(e);
}
}).subscribeOn(Schedulers.io())
.map(data -> data + " --> at " + System.currentTimeMillis())
- .onErrorResumeNext(e -> Observable.just("Fallback Data"))
+ .onErrorResumeNext(e -> {
+ return Observable.just("Fallback Data");})
.subscribe(System.out::println);
// infinite
- Observable.create(subscriber -> {
+ Observable.create(emitter -> {
int i = 0;
- while (!subscriber.isUnsubscribed()) {
- subscriber.onNext(i++);
+ while (!emitter.isDisposed()) {
+ emitter.onNext(i++);
}
}).take(10).subscribe(System.out::println);
diff --git a/src/main/java/learnrxjava/examples/ParallelExecution.java b/src/main/java/learnrxjava/examples/ParallelExecution.java
index 92d710b..4d939e1 100644
--- a/src/main/java/learnrxjava/examples/ParallelExecution.java
+++ b/src/main/java/learnrxjava/examples/ParallelExecution.java
@@ -1,7 +1,9 @@
package learnrxjava.examples;
-import rx.Observable;
-import rx.Subscriber;
-import rx.schedulers.Schedulers;
+
+import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
+import io.reactivex.schedulers.Schedulers;
public class ParallelExecution {
@@ -24,35 +26,35 @@ public static void main(String[] args) {
}
private static void mergingAsync() {
- Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
+ Observable.merge(getDataAsync(1), getDataAsync(2)).blockingForEach(System.out::println);
}
private static void mergingSync() {
// here you'll see the delay as each is executed synchronously
- Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
+ Observable.merge(getDataSync(1), getDataSync(2)).blockingForEach(System.out::println);
}
private static void mergingSyncMadeAsync() {
// if you have something synchronous and want to make it async, you can schedule it like this
// so here we see both executed concurrently
- Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
+ Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).blockingForEach(System.out::println);
}
private static void flatMapExampleAsync() {
Observable.range(0, 5).flatMap(i -> {
return getDataAsync(i);
- }).toBlocking().forEach(System.out::println);
+ }).blockingForEach(System.out::println);
}
private static void flatMapExampleSync() {
Observable.range(0, 5).flatMap(i -> {
return getDataSync(i);
- }).toBlocking().forEach(System.out::println);
+ }).blockingForEach(System.out::println);
}
private static void flatMapBufferedExampleAsync() {
Observable.range(0, 5000).buffer(500).flatMap(i -> {
- return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
+ return Observable.fromIterable(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
@@ -60,7 +62,7 @@ private static void flatMapBufferedExampleAsync() {
}
return item + " processed " + Thread.currentThread();
});
- }).toBlocking().forEach(System.out::println);
+ }).blockingForEach(System.out::println);
}
private static void flatMapWindowedExampleAsync() {
@@ -73,7 +75,7 @@ private static void flatMapWindowedExampleAsync() {
}
return item + " processed " + Thread.currentThread();
});
- }).toBlocking().forEach(System.out::println);
+ }).blockingForEach(System.out::println);
}
// artificial representations of IO work
@@ -82,15 +84,20 @@ static Observable getDataAsync(int i) {
}
static Observable getDataSync(int i) {
- return Observable.create((Subscriber super Integer> s) -> {
- // simulate latency
- try {
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- s.onNext(i);
- s.onCompleted();
- });
+
+ return Observable.create(new ObservableOnSubscribe() {
+
+ @Override
+ public void subscribe(ObservableEmitter emitter) throws Exception {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ emitter.onNext(i);
+ emitter.onComplete();
+
+ }
+ });
}
}