diff --git a/rx.java/README b/rx.java/README new file mode 100644 index 0000000..e69de29 diff --git a/rx.java/documents/Reactive-Programming.pptx b/rx.java/documents/Reactive-Programming.pptx new file mode 100644 index 0000000..bbf0c62 Binary files /dev/null and b/rx.java/documents/Reactive-Programming.pptx differ diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/App.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/App.java deleted file mode 100644 index 7ca749f..0000000 --- a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/App.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.wrightm.github.learn.rx.java; - -/** - * Hello world! - * - */ -public class App -{ - public static void main( String[] args ) - { - System.out.println( "Hello World!" ); - } -} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ConditionalRetry.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ConditionalRetry.java new file mode 100644 index 0000000..b585bed --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ConditionalRetry.java @@ -0,0 +1,147 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Notification; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func1; + +public class ConditionalRetry { + + + public static void main(String[] args){ + //runJava8(); + runJava7(); + } + + public static void runJava8 (){ + final AtomicInteger counter = new AtomicInteger(); + // create an onSubscribe observable + // is invoked once there is a subscriber + Observable messagesWithRunTimeException = Observable.create((Subscriber subscriber) -> { + System.out.println("Execution: " + counter.get()); + if(counter.incrementAndGet() < 3){ + // Raise error until count is greater than 2 + subscriber.onError(new RuntimeException("retryable")); + } else { + // return "hello" + subscriber.onNext("hello"); + // complete stream + subscriber.onCompleted(); + } + }); + + final AtomicInteger counter2 = new AtomicInteger(); + Observable messageWithIllegalStateException = Observable.create((Subscriber message) -> { + System.out.println("Execution ex2: " + counter2.get()); + if(counter2.incrementAndGet() < 3){ + message.onError(new RuntimeException("retryable")); + } else { + message.onError(new IllegalStateException()); + } + }); + + subscribeJava8(messagesWithRunTimeException); + subscribeJava8(messageWithIllegalStateException); + } + + public static void subscribeJava8(Observable messages){ + // materialize converts the onNext, OnError and OnCompleted streams into one emitted stream. + // flatMap - turns the observable into observable and flatens them into a single observable. + messages = messages.materialize().flatMap(notification -> { + // On error + if(notification.isOnError()) { + // then check if notification is an IllegalStateException + // this is then converted to an observable which then will raise an IllegalStateException + if(notification.getThrowable() instanceof IllegalStateException){ + // convert notification into an observable + return Observable.just(notification); + } else { + // create an observable that emits nothing and signal an error + return Observable.error(notification.getThrowable()); + } + } else { + // create an observable from + return Observable.just(notification); + } + // retry - if error is thrown resubscribe to the stream + // dematerialize - undoes materialize to separate the onNext, onError, onCompleted emits. + }).retry().dematerialize(); + + // subscribe to stream, + // print line OnNext and print stack trace onError + messages.subscribe(System.out::println, t -> t.printStackTrace()); + } + + public static void runJava7(){ + + AtomicInteger counter = new AtomicInteger(); + Observable messagesWithRunTimeException = Observable.create(new Observable.OnSubscribe(){ + + @Override + public void call(Subscriber subscriber) { + System.out.println("Exception: "+ counter.get()); + if(counter.incrementAndGet() < 3){ + subscriber.onError(new RuntimeException("retryable")); + } else { + subscriber.onNext("hello"); + subscriber.onCompleted(); + } + } + + }); + + AtomicInteger counter2 = new AtomicInteger(); + Observable messagesWithIllegalStateException = Observable.create(new Observable.OnSubscribe(){ + + @Override + public void call(Subscriber subscriber) { + System.out.println("Exception: "+ counter2.get()); + if(counter2.incrementAndGet() < 3){ + subscriber.onError(new RuntimeException("retryable")); + } else { + subscriber.onError(new IllegalStateException("illegal state")); + } + } + }); + + subscriberJava7(messagesWithRunTimeException); + subscriberJava7(messagesWithIllegalStateException); + + } + + public static void subscriberJava7(Observable messages){ + + messages = messages.materialize().flatMap(new Func1, Observable>>() { + + @Override + public Observable> call(Notification notification) { + if(notification.isOnError()){ + if(notification.getThrowable() instanceof IllegalStateException){ + return Observable.just(notification); + } else { + return Observable.error(notification.getThrowable()); + } + } else { + return Observable.just(notification); + } + } + }).retry().dematerialize(); + + messages.subscribe(new Action1() { + + @Override + public void call(String message) { + System.out.println("Message: " + message); + } + }, new Action1() { + + @Override + public void call(Throwable exception) { + System.out.println("Exception: "+ exception.getMessage()); + } + }); + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingBasics.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingBasics.java new file mode 100644 index 0000000..b016c9a --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingBasics.java @@ -0,0 +1,152 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.awt.event.ActionEvent; +import java.beans.PropertyChangeListener; +import java.io.IOException; + +import javax.swing.Action; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func1; + +public class ErrorHandlingBasics { + + public static void main(String[] args){ + + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + subscriber.onError(new RuntimeException("failed")); + } + + }).subscribe(new Action1() { + + @Override + public void call(String message) { + System.out.println("Shouldn't get here"); + } + }, new Action1() { + + @Override + public void call(Throwable exception) { + System.out.println("Exception: "+ exception.getMessage()); + } + }); + + + Observable message = Observable.just("hello").map(new Func1(){ + + @Override + public String call(String message) { + System.out.println(message); + if(message == "hello"){ + throw new RuntimeException("Pish"); + } + return message; + } + + }); + + message.subscribe(new Action1() { + + @Override + public void call(String message) { + System.out.println("Shouldnt get here"); + } + }, new Action1() { + + @Override + public void call(Throwable exception) { + System.out.println(exception.getMessage()); + } + + }); + + + /* + * Conditionals that may return an error can be done in a flatMap + */ + Observable message2 = Observable.just(true).flatMap(new Func1>(){ + + @Override + public Observable call(Boolean condition) { + if(condition){ + return Observable.error(new RuntimeException("failed")); + } else { + return Observable.just("data","here"); + } + } + + }); + + message2.subscribe(new Action1(){ + + @Override + public void call(String message) { + System.out.println("subscribe: "+message); + } + + }, new Action1(){ + + @Override + public void call(Throwable exception) { + System.out.println("Exception: "+exception.getMessage()); + } + + }); + + /* + * Errors can be handled by Observables + */ + + // error - emits nothing and signals an error + // onErrorResumeNext - On an error an observable emits a string + Observable error = Observable.error(new RuntimeException("failed")); + error.onErrorResumeNext(Observable.just("5) data")) + .subscribe(System.out::println, t -> System.out.println("5) Error: " + t)); + + /* + * A throwable is obtained by conditional logic + */ + Observable message3 = Observable.error(new IllegalStateException("6) failed")) + .onErrorResumeNext( new Func1 >(){ + + @Override + public Observable call(Throwable exception) { + if(exception instanceof IOException){ + return Observable.error(exception); + } else { + return Observable.just("6) data"); + } + /* + if(exception instanceof IllegalStateException){ + return Observable.error(exception); + } else { + return Observable.just("6) data"); + } + */ + } + + }); + + message3.subscribe(new Action1() { + + @Override + public void call(Object message) { + System.out.println("Message: "+message); + } + }, new Action1(){ + + @Override + public void call(Throwable exception) { + System.out.println("Exception: "+exception.getMessage()); + } + + }); + + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingRetryWithBackoff.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingRetryWithBackoff.java new file mode 100644 index 0000000..4afdf4c --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ErrorHandlingRetryWithBackoff.java @@ -0,0 +1,127 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.Subscriber; + +public class ErrorHandlingRetryWithBackoff { + + public static void main(String[] args){ + /* + * retry(n) can be used to immediately retry n times + */ + //java7(); + java8(); + } + + public static void java7(){ + + /* + * retry(n) can be used to immediately retry n times + */ + Observable.create( new OnSubscribe() { + + @Override + public void call(Subscriber message) { + message.onError(new RuntimeException("1) always fails")); + } + + }).retry(3).subscribe(new Action1() { + + @Override + public void call(String message) { + System.out.println("OnNext: "+message); + } + }, new Action1() { + + @Override + public void call(Throwable exception) { + System.out.println("Exception: "+exception.getMessage()); + } + }); + + /* + * retryWhen allows custom behavior on when and if a retry should be done + + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + System.out.println("2) subscribing"); + subscriber.onError(new RuntimeException("2) always fails")); + } + }) + // if a source Observable emits an error, + // pass that error to another Observable to determine whether to resubscribe to the source + .retryWhen(new Func1,Observable>() { + + @Override + public Observable call(Observable exception) { + return exception + // + // Combines the one or more Streams and allows you to combine them and return some observable + // + .zipWith(Observable.range(1,3), new Func2() { + + @Override + public Integer call(Throwable arg0, Integer arg1) { + return arg1; + } + }) + .flatMap(new Func1>() { + + @Override + public Observable call(Integer arg0) { + System.out.println("2) delay retry by " + arg0 + " second(s)"); + return Observable.timer(arg0, TimeUnit.SECONDS); + } + }) + // + // emit the emissions from two or more Observables without interleaving them. concat 1 stream to the end of another + // + .concatWith(Observable.error(new RuntimeException("Failed after 3 retries"))); + } + + }) + // + // + // + .toBlocking() + // + // + .forEach(System.out::println); + **/ + } + + public static void java8(){ + + /* + * retry(n) can be used to immediately retry n times + */ + Observable.create(s -> { + System.out.println("1) subscribing"); + s.onError(new RuntimeException("1) always fails")); + }).retry(3).subscribe(System.out::println, t -> System.out.println("1) Error: " + t)); + + System.out.println(""); + + /* + * retryWhen allows custom behavior on when and if a retry should be done + */ + Observable.create(s -> { + System.out.println("2) subscribing"); + s.onError(new RuntimeException("2) always fails")); + }).retryWhen(attempts -> { + return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { + System.out.println("2) delay retry by " + i + " second(s)"); + return Observable.timer(i, TimeUnit.SECONDS); + }).concatWith(Observable.error(new RuntimeException("Failed after 3 retries"))); + }).toBlocking().forEach(System.out::println); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlDebounceBuffer.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlDebounceBuffer.java new file mode 100644 index 0000000..9f32bc5 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlDebounceBuffer.java @@ -0,0 +1,78 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class FlowControlDebounceBuffer { + + public static void main(String args[]){ + java7(); + } + + public static void java7(){ + // debounce to the last value in each burst + // only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items + //intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println); + + /* + * A Connectable Observable resembles an ordinary Observable, + * except that it does not begin emitting items when it is subscribed to, + * but only when its connect() method is called. + * In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items. + * + */ + + /* The following will emit a buffered list up to 20 items in total as it is debounced */ + // first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe + // take - emit only the first n items emitted by an Observable + // publish - convert an ordinary Observable into a connectable Observable + // refCount - The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. + // It operates on a connectable Observable and returns an ordinary Observable. + // When the first observer subscribes to this Observable, + // RefCount connects to the underlying connectable Observable. + // RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so. + Observable burstStream = intermittentBursts().take(20).publish().refCount(); + // then we get the debounced version + Observable debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS); + // then the buffered one that uses the debounced stream to demark window start/stop + // buffer - periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time + // when used with debounce it will collect for each 10 milliseconds + Observable> buffered = burstStream.buffer(debounced); + // then we subscribe to the buffered stream so it does what we want + buffered.toBlocking().forEach(System.out::println); + } + + public static Observable intermittentBursts(){ + return Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + // + // Stops emitting data onces the subscriber unsubscribes + // + while (!subscriber.isUnsubscribed()){ + // Emit a number of numbers on the stream + for(int i = 0; i < Math.random()*20; i++){ + System.out.println("onNext: "+i); + subscriber.onNext(i); + } + try { + // sleep for a random amount of time + Thread.sleep((long) (Math.random() * 1000)); + } catch (Exception e){ + // do nothing + } + } + } + }) + // + // specify which Scheduler an Observable should use when its subscription is invoked + // + .subscribeOn(Schedulers.newThread()); // new thread since blocking + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlReactivePullCold.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlReactivePullCold.java new file mode 100644 index 0000000..0305d42 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlReactivePullCold.java @@ -0,0 +1,71 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +/** + * Example of a "cold Observable" using "reactive pull" to emit only as many items as requested by Subscriber. + */ +public class FlowControlReactivePullCold { + + public static void main(String[] args) { + // observeOn - specify the Scheduler on which an observer will observe this Observable + // Schedulers.computation - meant for computational work such as event-loops and callback processing; + // do not use this scheduler for I/O + getData(1).observeOn(Schedulers.computation()).toBlocking().forEach(System.out::println); + } + + /** + * This is a simple example of an Observable Iterable using "reactive pull". + */ + public static Observable getData(int id) { + // simulate a finite, cold data source + final ArrayList data = new ArrayList(); + for (int i = 0; i < 5000; i++) { + data.add(i + id); + } + return fromIterable(data); + } + + /** + * A more performant but more complicated implementation can be seen at: + * https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java + *

+ * Real code should just use Observable.from(Iterable iter) instead of re-implementing this logic. + *

+ * This is being shown as a simplified version to demonstrate how "reactive pull" data sources are implemented. + */ + public static Observable fromIterable(Iterable it) { + // return as Observable (real code would likely do IO of some kind) + return Observable.create((Subscriber s) -> { + final Iterator iter = it.iterator(); + final AtomicLong requested = new AtomicLong(); + // setProducer - + s.setProducer((long request) -> { + /* + * We add the request but only kick off work if at 0. + * + * This is done because over async boundaries `request(n)` can be called multiple times by + * another thread while this `Producer` is still emitting. We only want one thread ever emitting. + */ + if (requested.getAndAdd(request) == 0) { + do { + if(s.isUnsubscribed()) { + return; + } + if (iter.hasNext()) { + s.onNext(iter.next()); + } else { + s.onCompleted(); + } + } while (requested.decrementAndGet() > 0); + } + }); + }); + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlSampleExample.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlSampleExample.java new file mode 100644 index 0000000..e2abbda --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlSampleExample.java @@ -0,0 +1,37 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class FlowControlSampleExample { + + public static void main(String args[]) { + hotStream() + // sample - The Sample operator periodically, in this case every 500 ms and looks at an Observable and + // emits whichever item it has most recently emitted since the previous sampling. + .sample(500, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println); + } + + /** + * This is an artificial source to demonstrate an infinite stream that emits randomly + */ + public static Observable hotStream() { + return Observable.create((Subscriber s) -> { + int i = 0; + while (!s.isUnsubscribed()) { + s.onNext(i++); + try { + // sleep for a random amount of time + // NOTE: Only using Thread.sleep here as an artificial demo. + Thread.sleep((long) (Math.random() * 100)); + } catch (Exception e) { + // do nothing + } + } + }).subscribeOn(Schedulers.newThread()); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlThrottleExample.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlThrottleExample.java new file mode 100644 index 0000000..f47e308 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlThrottleExample.java @@ -0,0 +1,42 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class FlowControlThrottleExample { + + public static void main(String args[]) { + // first item emitted in each time window + hotStream() + // throttleFirst - emits not the most-recently emitted item in the sample period + .throttleFirst(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println); + + // last item emitted in each time window + hotStream() + // throttleLast - Same as sample, emits the last item from the stream from a given period. + .throttleLast(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println); + } + + /** + * This is an artificial source to demonstrate an infinite stream that emits randomly + */ + public static Observable hotStream() { + return Observable.create((Subscriber s) -> { + int i = 0; + while (!s.isUnsubscribed()) { + s.onNext(i++); + try { + // sleep for a random amount of time + // NOTE: Only using Thread.sleep here as an artificial demo. + Thread.sleep((long) (Math.random() * 100)); + } catch (Exception e) { + // do nothing + } + } + }).subscribeOn(Schedulers.newThread()); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlWindowExample.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlWindowExample.java new file mode 100644 index 0000000..eb10b32 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/FlowControlWindowExample.java @@ -0,0 +1,51 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + +public class FlowControlWindowExample { + + public static void main(String[] args){ + + hotStream() + // every 500 ms subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time + .window(500, TimeUnit.MILLISECONDS) + .take(10) + .flatMap(new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.startWith(999999999); + } + + }) + .toBlocking().forEach(System.out::println); + + } + + /** + * This is an artificial source to demonstrate an infinite stream that bursts intermittently + */ + public static Observable hotStream() { + return Observable.create((Subscriber s) -> { + while (!s.isUnsubscribed()) { + // burst some number of items + for (int i = 0; i < Math.random() * 20; i++) { + s.onNext(i); + } + try { + // sleep for a random amount of time + // NOTE: Only using Thread.sleep here as an artificial demo. + Thread.sleep((long) (Math.random() * 1000)); + } catch (Exception e) { + // do nothing + } + } + }).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByExamples.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByExamples.java new file mode 100644 index 0000000..df6aefb --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByExamples.java @@ -0,0 +1,252 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.functions.Func1; +import rx.observables.GroupedObservable; + +public class GroupByExamples { + + public static void main(String[] args){ + java7(); + //java8(); + } + + public static void java7(){ + + Observable.range(1,100) + .groupBy(new Func1(){ + + @Override + public Boolean call(Integer t1) { + return (t1 % 2 == 0); + } + }) + // GroupedObservable is returned by groupBy. + // A GroupedObservable will cache the items it is to emit until such time as it is subscribed to + .flatMap( new Func1,Observable>>(){ + + @Override + public Observable> call( + GroupedObservable t1) { + return t1.toList(); + } + + }).forEach(System.out::println); + + System.out.println("2--------------------------------------------------------------------------------------------------------"); + + Observable.range(1, 100) + .groupBy(new Func1() { + + @Override + public Boolean call(Integer t1) { + return (t1 % 2 == 0); + } + }) + .flatMap(new Func1,Observable>>(){ + + @Override + public Observable> call( + GroupedObservable t1) { + + return t1.take(10).toList(); + } + + }).forEach(System.out::println); + + System.out.println("3--------------------------------------------------------------------------------------------------------"); + + Observable.range(1, 100) + .groupBy(new Func1() { + + @Override + public Boolean call(Integer t1) { + // TODO Auto-generated method stub + return (t1 % 2 == 0); + } + + }) + .flatMap(new Func1, Observable>>() { + + @Override + public Observable> call( + GroupedObservable t1) { + // TODO Auto-generated method stub + return t1 + // filter - emit only those items from an Observable that pass a predicate test + .filter(new Func1() { + + @Override + public Boolean call(Integer t1) { + // TODO Auto-generated method stub + return (t1 <= 20); + } + }).toList(); + } + }); + + System.out.println("4--------------------------------------------------------------------------------------------------------"); + + Observable.range(1, 100) + .groupBy(new Func1() { + + @Override + public Boolean call(Integer t1) { + // TODO Auto-generated method stub + return (t1 % 2 == 0); + } + }) + .flatMap(new Func1,Observable>>(){ + + @Override + public Observable> call( + GroupedObservable t1) { + return t1 + // Group list into 20 items + .take(20).toList(); + } + + }) + // take the first two lists in the stream + .take(2).forEach(System.out::println); + + System.out.println("5--------------------------------------------------------------------------------------------------------"); + + Observable.range(1, 100) + .groupBy(new Func1(){ + + @Override + public Boolean call(Integer t1) { + return (t1 % 2 == 0); + } + + }).flatMap(new Func1, Observable>>() { + + @Override + public Observable> call( + GroupedObservable t1) { + return t1 + // Once an item doesn't match the condition the stream ends + // then the next one will start + .takeWhile(new Func1() { + + @Override + public Boolean call(Integer t1) { + // TODO Auto-generated method stub + return t1 < 30; + } + }).toList(); + } + }) + // Filter the empty lists that fail the condition + .filter(l -> !l.isEmpty()) + // invoke a function on each item emitted by the Observable; block until the Observable completes + .forEach(System.out::println); + + System.out.println("6--------------------------------------------------------------------------------------------------------"); + + Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c")) + .groupBy(n -> n) + .flatMap(g -> { + return g + // take first 10 + .take(10) + // reduce - apply a function to each emitted item, sequentially, and emit only the final accumulated value + .reduce((s,s2) -> s + s2); + }).forEach(System.out::println); + + System.out.println("7--------------------------------------------------------------------------------------------------------"); + + Observable.timer(0, 1, TimeUnit.MILLISECONDS) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.take(10).toList(); + }).take(2).toBlocking().forEach(System.out::println); + + System.out.println("8--------------------------------------------------------------------------------------------------------"); + + Observable.timer(0, 1, TimeUnit.MILLISECONDS) + .take(20) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.toList(); + }).toBlocking().forEach(System.out::println); + + + } + + public static void java8(){ + // odd/even into 2 lists + Observable.range(1, 100) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.toList(); + }).forEach(System.out::println); + + System.out.println("2--------------------------------------------------------------------------------------------------------"); + + // odd/even into lists of 10 + Observable.range(1, 100) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.take(10).toList(); + }).forEach(System.out::println); + + System.out.println("3--------------------------------------------------------------------------------------------------------"); + + //odd/even into lists of 10 + Observable.range(1, 100) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.filter(i -> i <= 20).toList(); + }).forEach(System.out::println); + + System.out.println("4--------------------------------------------------------------------------------------------------------"); + + //odd/even into lists of 20 but only take the first 2 groups + Observable.range(1, 100) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.take(20).toList(); + }).take(2).forEach(System.out::println); + + System.out.println("5--------------------------------------------------------------------------------------------------------"); + + //odd/even into 2 lists with numbers less than 30 + Observable.range(1, 100) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.takeWhile(i -> i < 30).toList(); + }).filter(l -> !l.isEmpty()).forEach(System.out::println); + + System.out.println("6--------------------------------------------------------------------------------------------------------"); + + Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c")) + .groupBy(n -> n) + .flatMap(g -> { + return g.take(3).reduce((s, s2) -> s + s2); + }).forEach(System.out::println); + + System.out.println("7--------------------------------------------------------------------------------------------------------"); + + Observable.timer(0, 1, TimeUnit.MILLISECONDS) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.take(10).toList(); + }).take(2).toBlocking().forEach(System.out::println); + + System.out.println("8--------------------------------------------------------------------------------------------------------"); + + Observable.timer(0, 1, TimeUnit.MILLISECONDS) + .take(20) + .groupBy(n -> n % 2 == 0) + .flatMap(g -> { + return g.toList(); + }).toBlocking().forEach(System.out::println); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByLogic.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByLogic.java new file mode 100644 index 0000000..2b77601 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/GroupByLogic.java @@ -0,0 +1,158 @@ +package com.wrightm.github.learn.rx.java.examples; + + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observables.GroupedObservable; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; + +public class GroupByLogic { + + public static void main(String[] args){ + // Create a TestScheduler, which is useful for debugging. + // It allows you to test schedules of events by manually advancing the clock at whatever pace you choos + final TestScheduler testScheduler = Schedulers.test(); + final PublishSubject testPlayEventSubject = PublishSubject.create(); + TestSubscriber testSchedulerStateStream = new TestSubscriber(); + + // Publish PlayEvents + testPlayEventSubject + // Group Play Event By Id + .groupBy(new Func1() { + + @Override + public Integer call(PlayEvent t1) { + return t1.getOriginatorId(); + } + + }) + // + .flatMap(new Func1, Observable>() { + + @Override + public Observable call( + GroupedObservable groupedPlayEvents) { + System.out.println("***** new group: " + groupedPlayEvents.getKey()); + return groupedPlayEvents + /* Timeout after last event, and preventing memory leaks so that inactive streams are closed */ + // timeout - emit items from a source Observable, + // but issue an exception if no item is emitted in a specified timespan + .timeout(3, TimeUnit.HOURS, testScheduler) + /* So that consecutive identical playevents are skipped, can also use skipWhile */ + // distinctUntilChanged - suppress duplicate consecutive items emitted by the source Observable + .distinctUntilChanged(new Func1() { + + @Override + public String call(PlayEvent playEvent) { + return playEvent.getSession(); + } + }) + // + // On error return an empty observable + // + .onErrorResumeNext(new Func1>() { + + @Override + public Observable call(Throwable t1) { + System.out.println(" ***** complete group: " + groupedPlayEvents.getKey()); + // complete if we timeout or have an error of any kind (this could emit a special PlayEvent instead + return Observable.empty(); + } + }) + // since we have finite groups we can `reduce` to a single value, otherwise use `scan` if you want to emit along the way + .reduce(new StreamState(), new Func2() { + + @Override + public StreamState call(StreamState streamState, PlayEvent playEvent) { + System.out.println(" state: " + streamState + " event: " + playEvent.id + "-" + playEvent.session); + streamState.addEvent(playEvent); + return streamState; + } + + }) + .filter(state -> { + // if using `scan` above instead of `reduce` you could conditionally remove what you don't want to pass down + return true; + }); + + + } + }) + .doOnNext(new Action1() { + + @Override + public void call(StreamState t1) { + System.out.println(">>> Output State: " + t1); + } + }) + .subscribe(testSchedulerStateStream); + + // add items to observe + testPlayEventSubject.onNext(createPlayEvent(1, "a")); + testPlayEventSubject.onNext(createPlayEvent(2, "a")); + // move scheduler clock forward by 2 Hours + testScheduler.advanceTimeBy(2, TimeUnit.HOURS); + + // add items to observe + testPlayEventSubject.onNext(createPlayEvent(1, "b")); + // move scheduler clock forward by 2 Hours + testScheduler.advanceTimeBy(2, TimeUnit.HOURS); + + // add items to observe + testPlayEventSubject.onNext(createPlayEvent(1, "a")); + testPlayEventSubject.onNext(createPlayEvent(2, "b")); + + // Get Sequence of events from the items observed + System.out.println("onNext after 4 hours: " + testSchedulerStateStream.getOnNextEvents()); + + } + + public static PlayEvent createPlayEvent(int id, String v) { + return new PlayEvent(id, v); + } + + public static class PlayEvent { + + private int id; + private String session; + + public PlayEvent(final int id, final String session){ + this.id = id; + this.session = session; + } + + public final int getOriginatorId(){ + return id; + } + + public final String getSession(){ + return session; + } + } + + + public static class StreamState { + + private int id = -1; + + public void addEvent(PlayEvent event){ + if(id == -1){ + this.id = event.id; + } + } + + @Override + public String toString() { + return "StreamState => id: " + id; + } + + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/HelloWorld.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/HelloWorld.java new file mode 100644 index 0000000..543392e --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/HelloWorld.java @@ -0,0 +1,204 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Subscriber; +import rx.Observable.OnSubscribe; +import rx.schedulers.Schedulers; + +public class HelloWorld { + + public static void main(String[] args) { + + // Hello World + Observable.create(subscriber -> { + subscriber.onNext("Hello World!"); + subscriber.onCompleted(); + }).subscribe(System.out::println); + + System.out.println("2--------------------------------------------------------------------------------------------------------"); + + // shorten by using helper method + Observable.just("Hello", "World!") + .subscribe(System.out::println); + + System.out.println("3--------------------------------------------------------------------------------------------------------"); + + // add onError and onComplete listeners + Observable.just("Hello World!") + .subscribe(System.out::println, + Throwable::printStackTrace, + () -> System.out.println("Done")); + + System.out.println("4--------------------------------------------------------------------------------------------------------"); + + // expand to show full classes + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + subscriber.onNext("Hello World!"); + subscriber.onCompleted(); + } + + }).subscribe(new Subscriber() { + + @Override + public void onCompleted() { + System.out.println("Done"); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onNext(String t) { + System.out.println(t); + } + + }); + System.out.println("5--------------------------------------------------------------------------------------------------------"); + + // add error propagation + Observable.create(subscriber -> { + try { + subscriber.onNext("Hello World!"); + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + }).subscribe(System.out::println); + + System.out.println("6--------------------------------------------------------------------------------------------------------"); + + // add concurrency (manually) + Observable.create(subscriber -> { + new Thread(() -> { + try { + subscriber.onNext(getData()); + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + }).start(); + }).subscribe(System.out::println); + + System.out.println("7--------------------------------------------------------------------------------------------------------"); + + // add concurrency (using a Scheduler) + Observable.create(subscriber -> { + try { + subscriber.onNext(getData()); + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + }).subscribeOn(Schedulers.io()) + .subscribe(System.out::println); + + System.out.println("8--------------------------------------------------------------------------------------------------------"); + + // add operator + Observable.create(subscriber -> { + try { + subscriber.onNext(getData()); + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + + }) + //Creates and returns a Scheduler intended for IO-bound work. + // The implementation is backed by an Executor thread-pool that will grow as needed. + // This can be used for asynchronously performing blocking IO. + .subscribeOn(Schedulers.io()) + .map(data -> data + " --> at " + System.currentTimeMillis()) + .subscribe(System.out::println); + + System.out.println("9--------------------------------------------------------------------------------------------------------"); + + // add error handling + Observable.create(subscriber -> { + try { + subscriber.onNext(getData()); + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + }).subscribeOn(Schedulers.io()) + .map(data -> data + " --> at " + System.currentTimeMillis()) + .onErrorResumeNext(e -> Observable.just("Fallback Data")) + .subscribe(System.out::println); + + System.out.println("10--------------------------------------------------------------------------------------------------------"); + + // infinite + Observable.create(subscriber -> { + int i = 0; + while (!subscriber.isUnsubscribed()) { + subscriber.onNext(i++); + } + }) + // take the first 10 emitted from the stream + .take(10).subscribe(System.out::println); + + System.out.println("11--------------------------------------------------------------------------------------------------------"); + + //Hello World + Observable.create(subscriber -> { + throw new RuntimeException("failed!"); + }).onErrorResumeNext(throwable -> { + return Observable.just("fallback value"); + }).subscribe(System.out::println); + + System.out.println("12--------------------------------------------------------------------------------------------------------"); + + Observable.create(subscriber -> { + throw new RuntimeException("failed!"); + }).onErrorResumeNext(Observable.just("fallback value")) + .subscribe(System.out::println); + + System.out.println("13--------------------------------------------------------------------------------------------------------"); + + Observable.create(subscriber -> { + throw new RuntimeException("failed!"); + }).onErrorReturn(throwable -> { + return "fallback value"; + }).subscribe(System.out::println); + + System.out.println("14--------------------------------------------------------------------------------------------------------"); + + Observable.create(subscriber -> { + throw new RuntimeException("failed!"); + }) + // This retries 3 times, each time incrementing the number of seconds it waits. + .retryWhen(attempts -> { + return attempts + // Combine streams, exception and integer streams + .zipWith(Observable.range(1, 3), (throwable, i) -> i) + // delay response for a number of seconds (1,2,3) + .flatMap(i -> { + System.out.println("delay retry by " + i + " second(s)"); + return Observable.timer(i, TimeUnit.SECONDS); + }) + // add empty emit and signal an error + .concatWith(Observable.error(new RuntimeException("Exceeded 3 retries"))); + }) + .subscribe(System.out::println, t -> t.printStackTrace()); + + + try { + Thread.sleep(20000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + + private static String getData() { + return "Got Data!"; + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecution.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecution.java new file mode 100644 index 0000000..571c58c --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecution.java @@ -0,0 +1,106 @@ +package com.wrightm.github.learn.rx.java.examples; + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class ParallelExecution { + + public static void main(String[] args) { + System.out.println("------------ mergingAsync"); + mergingAsync(); + System.out.println("------------ mergingSync"); + mergingSync(); + System.out.println("------------ mergingSyncMadeAsync"); + mergingSyncMadeAsync(); + System.out.println("------------ flatMapExampleSync"); + flatMapExampleSync(); + System.out.println("------------ flatMapExampleAsync"); + flatMapExampleAsync(); + System.out.println("------------ flatMapBufferedExampleAsync"); + flatMapBufferedExampleAsync(); + System.out.println("------------ flatMapWindowedExampleAsync"); + // Example of backpressure winning + //flatMapWindowedExampleAsync(); + System.out.println("------------"); + } + + private static void mergingAsync() { + Observable + // merge - combine multiple Observables into one by merging their emissions + .merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); + } + + private static void mergingSync() { + // here you'll see the delay as each is executed synchronously + Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); + } + + private static void mergingSyncMadeAsync() { + // if you have something synchronous and want to make it async, you can schedule it like this + // so here we see both executed concurrently + Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); + } + + private static void flatMapExampleAsync() { + Observable.range(0, 5).flatMap(i -> { + return getDataAsync(i); + }).toBlocking().forEach(System.out::println); + } + + private static void flatMapExampleSync() { + Observable.range(0, 5).flatMap(i -> { + return getDataSync(i); + }).toBlocking().forEach(System.out::println); + } + + private static void flatMapBufferedExampleAsync() { + Observable.range(0, 5000) + // buffer - periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time + .buffer(500).flatMap(i -> { + return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> { + // simulate computational work + try { + Thread.sleep(1); + } catch (Exception e) { + } + return item + " processed " + Thread.currentThread(); + }); + }).toBlocking().forEach(System.out::println); + } + + private static void flatMapWindowedExampleAsync() { + Observable.range(0, 5000) + // window -Window is similar to Buffer, + // but rather than emitting packets of items from the source Observable, it emits Observables, + // each one of which emits a subset of items from the source Observable and then terminates with an + .window(500).flatMap(work -> { + return work.observeOn(Schedulers.computation()).map(item -> { + // simulate computational work + try { + Thread.sleep(1); + } catch (Exception e) { + } + return item + " processed " + Thread.currentThread(); + }); + }).toBlocking().forEach(System.out::println); + } + + // artificial representations of IO work + static Observable getDataAsync(int i) { + return getDataSync(i).subscribeOn(Schedulers.io()); + } + + static Observable getDataSync(int i) { + return Observable.create((Subscriber s) -> { + // simulate latency + try { + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + } + s.onNext(i); + s.onCompleted(); + }); + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecutionExample.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecutionExample.java new file mode 100644 index 0000000..a7e500e --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ParallelExecutionExample.java @@ -0,0 +1,110 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.List; + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class ParallelExecutionExample { + + public static void run() { + Observable searchTile = getSearchResults("search term"); + + Observable populatedTiles = searchTile.flatMap(t -> { + Observable reviews = getSellerReviews(t.getSellerId()); + Observable imageUrl = getProductImage(t.getProductId()); + + return Observable.zip(reviews, imageUrl, (r, u) -> { + return new TileResponse(t, r, u); + }); + }); + + List allTiles = populatedTiles.toList() + .toBlocking().single(); + } + + public static void main(String[] args) { + final long startTime = System.currentTimeMillis(); + + Observable searchTile = getSearchResults("search term") + .doOnSubscribe(() -> logTime("Search started ", startTime)) + .doOnCompleted(() -> logTime("Search completed ", startTime)); + + Observable populatedTiles = searchTile.flatMap(t -> { + Observable reviews = getSellerReviews(t.getSellerId()) + .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); + Observable imageUrl = getProductImage(t.getProductId()) + .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); + + return Observable.zip(reviews, imageUrl, (r, u) -> { + return new TileResponse(t, r, u); + }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); + }); + + List allTiles = populatedTiles.toList() + .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) + .toBlocking().single(); + } + + private static Observable getSearchResults(String string) { + return mockClient(new Tile(1), new Tile(2), new Tile(3)); + } + + private static Observable getSellerReviews(int id) { + return mockClient(new Reviews()); + } + + private static Observable getProductImage(int id) { + return mockClient("image_" + id); + } + + private static void logTime(String message, long startTime) { + System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); + } + + private static Observable mockClient(T... ts) { + return Observable.create((Subscriber s) -> { + // simulate latency + try { + Thread.sleep(1000); + } catch (Exception e) { + } + for (T t : ts) { + s.onNext(t); + } + s.onCompleted(); + }).subscribeOn(Schedulers.io()); + // note the use of subscribeOn to make an otherwise synchronous Observable async + } + + public static class TileResponse { + + public TileResponse(Tile t, Reviews r, String u) { + // store the values + } + + } + + public static class Tile { + + private final int id; + + public Tile(int i) { + this.id = i; + } + + public int getSellerId() { + return id; + } + + public int getProductId() { + return id; + } + + } + + public static class Reviews { + + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ScanVsReduceExample.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ScanVsReduceExample.java new file mode 100644 index 0000000..c5f8b5f --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ScanVsReduceExample.java @@ -0,0 +1,29 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.ArrayList; + +import rx.Observable; + +public class ScanVsReduceExample { + + public static void main(String... args) { + + Observable.range(0, 10) + // reduce - apply a function to each emitted item, sequentially, and emit only the final accumulated value + // Accumulate then return + .reduce(new ArrayList<>(), (list, i) -> { + list.add(i); + return list; + }).forEach(System.out::println); + + System.out.println("reduce vs scan"); + + Observable.range(0, 10) + // scan - apply a function to each item emitted by an Observable, sequentially, and emit each successive value + // return list for each sucessive value + .scan(new ArrayList<>(), (list, i) -> { + list.add(i); + return list; + }).forEach(System.out::println); + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/UnitTesting.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/UnitTesting.java new file mode 100644 index 0000000..13caf06 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/UnitTesting.java @@ -0,0 +1,30 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +public class UnitTesting { + + public static void main(String... args) { + TestScheduler test = Schedulers.test(); + TestSubscriber ts = new TestSubscriber<>(); + + Observable.interval(200, TimeUnit.MILLISECONDS, test) + .map(i -> { + return i + " value"; + }).subscribe(ts); + + test.advanceTimeBy(200, TimeUnit.MILLISECONDS); + ts.assertReceivedOnNext(Arrays.asList("0 value")); + + test.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + ts.assertReceivedOnNext(Arrays.asList("0 value", "1 value", "2 value", "3 value", "4 value")); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ZipInterval.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ZipInterval.java new file mode 100644 index 0000000..477c53c --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/ZipInterval.java @@ -0,0 +1,19 @@ +package com.wrightm.github.learn.rx.java.examples; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; + +public class ZipInterval { + + public static void main(String... args) { + Observable data = Observable.just("one", "two", "three", "four", "five"); + Observable + // combine two observables/streams together. + .zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> { + return d + " " + t+1; + }).toBlocking().forEach(System.out::println); + + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/summary-of-examples.txt b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/summary-of-examples.txt new file mode 100644 index 0000000..a14e7ef --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/examples/summary-of-examples.txt @@ -0,0 +1,10 @@ +1. ConditionalRetry + * Shows how to create onSubscribe observer using create + * Shows how to create an observable from an object using just + * Shows how to combine onNext, onError and onCompleted streams using materiliase and convert them back into there on streams demateriliase + * Shows how to flatten and observable of observables into a single observable using flatMap + * Shows hot to deal with on exception thrown using isOnError. + +2. ErrorHandlingBasics + * + \ No newline at end of file diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Bookmark.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Bookmark.java new file mode 100644 index 0000000..e8155cc --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Bookmark.java @@ -0,0 +1,29 @@ +package com.wrightm.github.learn.rx.java.types; + +public class Bookmark { + + private final int id; + private final int offset; + + public Bookmark(final int id, final int offset) { + this.id = id; + this.offset = offset; + } + + public final int getID(){ + return this.id; + } + + public final int getOffset(){ + return this.offset; + } + + @Override + public String toString(){ + return new StringBuilder().append("BookMark{id=") + .append(id) + .append(", offset=") + .append(offset) + .append("}").toString(); + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BookmarkRow.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BookmarkRow.java new file mode 100644 index 0000000..e828ec4 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BookmarkRow.java @@ -0,0 +1,20 @@ +package com.wrightm.github.learn.rx.java.types; + +public class BookmarkRow { + + private final int videoId; + private final int bookmarkId; + + public BookmarkRow(final int videoId, final int bookmarkId){ + this.videoId = videoId; + this.bookmarkId = bookmarkId; + } + + public final int getVideoID(){ + return this.videoId; + } + + public final int getBookmarkID(){ + return this.bookmarkId; + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArt.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArt.java new file mode 100644 index 0000000..47e717c --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArt.java @@ -0,0 +1,64 @@ +package com.wrightm.github.learn.rx.java.types; + +public class BoxArt { + + private final int width; + private final int height; + private final String url; + + public BoxArt(final int width, final int height, final String url) { + this.width = width; + this.height = height; + this.url = url; + } + + public final int getWidth(){ + return width; + } + + public final int getHeight(){ + return height; + } + + public final String getURL(){ + return url; + } + + @Override + public int hashCode(){ + final int prime = 31; + int result = 1; + result = prime * result + height; + result = prime * result + ((url == null) ? 0 : url.hashCode()); + result = prime * result + width; + return result; + } + + @Override + public boolean equals(Object obj){ + if(this == obj){ + return true; + } + if(obj == null){ + return false; + } + if(getClass() != obj.getClass()){ + return false; + } + BoxArt other = (BoxArt) obj; + if(height != other.height){ + return false; + } + if(url == null){ + if(url != other.url){ + return false; + } + } else if(!other.equals(url)){ + return false; + } + if(width != other.width){ + return false; + } + return true; + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArtRow.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArtRow.java new file mode 100644 index 0000000..e64e597 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/BoxArtRow.java @@ -0,0 +1,32 @@ +package com.wrightm.github.learn.rx.java.types; + +public class BoxArtRow { + + private final int videoId; + private final int width; + private final int height; + private final String url; + + public BoxArtRow(final int videoId, final int width, final int height, final String url){ + this.videoId = videoId; + this.width = width; + this.height = height; + this.url = url; + } + + public final int getVideoID(){ + return videoId; + } + + public final int getWidth(){ + return width; + } + + public final int getHeight(){ + return height; + } + + public final String getURL(){ + return url; + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/ComposableList.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/ComposableList.java new file mode 100644 index 0000000..87ab84d --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/ComposableList.java @@ -0,0 +1,28 @@ +package com.wrightm.github.learn.rx.java.types; + +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +public interface ComposableList extends Iterable { + + // Transform from one object T to another object R + public ComposableList map(Function projectionFunction); + + // Filter abject from the list if it fails predicate test. + public ComposableList filter(Predicate predicateFunction); + + // Transform one object to another List of objects R + public ComposableList concatMap(Function> projectionFunctionThatReturnsList); + + public ComposableList reduce(BiFunction combiner); + + public ComposableList reduce(R initialValue, BiFunction combiner); + + public int size(); + + public void forEach(Consumer action); + + public T get(int index); +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/InterestingMoment.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/InterestingMoment.java new file mode 100644 index 0000000..4b4435f --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/InterestingMoment.java @@ -0,0 +1,31 @@ +package com.wrightm.github.learn.rx.java.types; + +public class InterestingMoment { + + private final String type; + private final int time; + + public InterestingMoment(final String type, final int time){ + this.type = type; + this.time = time; + } + + public final String getType(){ + return type; + } + + public final int getTime(){ + return time; + } + + @Override + public String toString(){ + return new StringBuilder().append("InterestingMoment{type=") + .append(type) + .append(", time=") + .append(time) + .append('}') + .toString(); + } + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/JSON.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/JSON.java new file mode 100644 index 0000000..d1a0122 --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/JSON.java @@ -0,0 +1,7 @@ +package com.wrightm.github.learn.rx.java.types; + +import java.util.HashMap; + +public class JSON extends HashMap { + +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Movie.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Movie.java new file mode 100644 index 0000000..14ee93d --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/Movie.java @@ -0,0 +1,55 @@ +package com.wrightm.github.learn.rx.java.types; + +import java.util.List; +import rx.Observable; + +public class Movie { + + private final int id; + private final String title; + private final double rating; + private Observable bookmarks; + private Observable boxArts; + private Observable interestingMoments; + + public Movie(final int id, final String title, final double rating){ + this.id = id; + this.title = title; + this.rating = rating; + } + + public Movie(final int id, final String title, final double rating, List bookmarks, List boxArts){ + this(id,title,rating); + this.bookmarks = Observable.from(bookmarks); + this.boxArts = Observable.from(boxArts); + } + + public Movie(final int id, final String title, final double rating, final List bookmarks, List boxArts, final List interestingMoments){ + this(id, title, rating, bookmarks, boxArts); + this.interestingMoments = Observable.from(interestingMoments); + } + + public final int getID(){ + return id; + } + + public final String getTitle(){ + return title; + } + + public final double getRating(){ + return rating; + } + + public final Observable getBookmark(){ + return bookmarks; + } + + public final Observable getBoxArts(){ + return boxArts; + } + + public final Observable getInterstingMoments(){ + return interestingMoments; + } +} diff --git a/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/MovieList.java b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/MovieList.java new file mode 100644 index 0000000..766f94c --- /dev/null +++ b/rx.java/src/main/java/com/wrightm/github/learn/rx/java/types/MovieList.java @@ -0,0 +1,32 @@ +package com.wrightm.github.learn.rx.java.types; + +public class MovieList { + + private final String name; + private final ComposableList