diff --git a/practice/build.gradle b/practice/build.gradle new file mode 100644 index 0000000..d4c5699 --- /dev/null +++ b/practice/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'java' +} + +group 'rx-java' +version '1.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + compile 'io.reactivex.rxjava2:rxjava:2.2.0' + testCompile group: 'junit', name: 'junit', version: '4.12' +} diff --git a/practice/gradle/wrapper/gradle-wrapper.jar b/practice/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..94336fc Binary files /dev/null and b/practice/gradle/wrapper/gradle-wrapper.jar differ diff --git a/practice/gradle/wrapper/gradle-wrapper.properties b/practice/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..e01849d --- /dev/null +++ b/practice/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Mon Jan 25 22:31:41 KST 2021 +distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStorePath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME diff --git a/practice/gradlew b/practice/gradlew new file mode 100755 index 0000000..cccdd3d --- /dev/null +++ b/practice/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/practice/gradlew.bat b/practice/gradlew.bat new file mode 100644 index 0000000..f955316 --- /dev/null +++ b/practice/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/practice/settings.gradle b/practice/settings.gradle new file mode 100644 index 0000000..e16a1b8 --- /dev/null +++ b/practice/settings.gradle @@ -0,0 +1,2 @@ +rootProject.name = 'practice' + diff --git a/practice/src/main/java/Sample1_3.java b/practice/src/main/java/Sample1_3.java new file mode 100644 index 0000000..e69e722 --- /dev/null +++ b/practice/src/main/java/Sample1_3.java @@ -0,0 +1,82 @@ +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.subjects.ReplaySubject; +import io.reactivex.subjects.Subject; + +public class Sample1_3 { + public void error() { + Subject s = ReplaySubject.create(); + s.subscribe( + v -> System.out.println(v), + e -> System.out.println(e)); + s.onNext(0); + s.onError(new Exception("Oops")); + } + + public void unsubscribing_1() { + Subject values = ReplaySubject.create(); + Disposable disposable = values.subscribe( + v -> System.out.println(v), + e -> System.out.println(e), + () -> System.out.println("Done") + ); + values.onNext(0); + values.onNext(1); + disposable.dispose(); + values.onNext(2); + + } + + public void unsubscribing_2() { + Subject values = ReplaySubject.create(); + Disposable disposable1 = values.subscribe( + v -> System.out.println("First: " +v) + ); + Disposable disposable2 = values.subscribe( + v -> System.out.println("Second: " +v) + ); + values.onNext(0); + values.onNext(1); + disposable1.dispose(); + System.out.println("Unsubscribed first"); + values.onNext(2); + } + + public void onErrorAndonCompleted() { + Subject values = ReplaySubject.create(); + Disposable disposable1 = values.subscribe( + v -> System.out.println("First: "+ v), + e -> System.out.println("First: "+ e), + () -> System.out.println("Completed") + + ); + + values.onNext(0); + values.onNext(1); + values.onComplete(); + values.onNext(2); + } + + public void freeingResources() { + /* + * Not working on RxJava2 + */ + +// Subscription s = Subscriptions.create(() -> System.out.println("Clean")); +// s.unsubscribe(); + + //Disposable.fromAction takes an action that will be executed on unsubscription to release the resources + // There also are shorthand for common actions when creating a sequence. + Disposable s = Disposables.fromAction(() -> System.out.println("Clean")); + s.dispose(); + + Disposables.disposed(); + + } + + public static void main (String[] args) { + Sample1_3 sample = new Sample1_3(); + sample.freeingResources(); + + } +} diff --git a/practice/src/main/java/Sample2_1.java b/practice/src/main/java/Sample2_1.java new file mode 100644 index 0000000..85502d0 --- /dev/null +++ b/practice/src/main/java/Sample2_1.java @@ -0,0 +1,235 @@ +import java.io.IOException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; + +public class Sample2_1 { + /* + * In most cases, + * "subject"s are not the best way to create a new Observable. + * We will now see tidier ways to create observable sequences. +* */ + + public void factory_observable_just() { + Observable values = Observable.just("one", "two", "three"); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void factory_observable_empty() { + //This observable will emit a single onCompleted and nothing else + Observable values = Observable.empty(); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void factory_observable_never() { + //This observable wil never emit anything + Observable values = Observable.never(); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + //the code above will print nothing + //note that this doesn't mean that the program is blocking + //in fact, it will terminate immediately. + } + + public void factory_observable_error() { + //This observable will emit a single error event and terminate + Observable values = Observable.error(new Exception("Oops!")); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void factory_observable_defer_explain_via_just() { + //defer doesn't define a new kind of observable, but allows you to declare how an observable should be created every time a subscriber arrives. + //Consider how you would create an observable that returns the current time and terminates. + //You are emitting a single value, so it sounds like a case for just + + Observable now = Observable.just(System.currentTimeMillis()); + + now.subscribe(System.out::println); + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + now.subscribe(System.out::println); + } + + public void factory_observable_defer() { + Observable now = Observable.defer(() -> + Observable.just(System.currentTimeMillis())); + + now.subscribe(System.out::println); + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + now.subscribe(System.out::println); + } + + public void factory_observable_create() { + Observable values = Observable.create( o -> { + o.onNext("Hello"); + o.onComplete(); + }); + //Observable same = Observable.just("hello"); + + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + // This method should be your preffered way of creating a custom observable, when none of the existing shorthands serve your purpose. + // The code is similar to how we created a Subject and pushed values to it, but there are a few important differences. + // First of all, the source of the events is neatly encapsulated and separated from unrelated code. + // Secondly, `Subject`s carry dangers that are not obvious : with a `Subject` you are managing state, and anyone with access to the instance can push values into it and alter the sequence. + // We wil see more about this issue later on. + // Another key difference to using subjects is that the code is executed lazily, when and if an observer subscribes. + // In the example above, the code is run not when the observable is created(because there is no `Subscriber` yet), but each time `subscriber`is called + // This means that every values is generated again for each subscriber, similar to `ReplaySubject` + // The end result is similar to `ReplaySubject`, exept that no caching takes place + // However, if we had used a `ReplaySubject`, and the cration method was time-consuming, that would block the thread that executes the creation. + // You'd have to manually create a new thread to push values into `Subject`. + // We're not presenting Rx's methods for concurrency yet, but there are convenient ways to make the execution of the `onSubscribe` function concurrently + + // You may have already noticed that you can trivially implement any of the previous observables using Observable.create. + // Infact, our example for `create` is equivalent to `Observable.just("hello")` + } + + // In functional programming it is common to create sequences of unrestricted or infinite length + // RxJava has factory methods that create such sequences + public void functional_unfolds_observable_range() { + // A straight forward and familiar method to any functional programmer. It emits the specified range of integers. + Observable values = Observable.range(10, 15); + values.subscribe(System.out::println); + + } + + public void functional_unfolds_observable_interval_1() { + // This function will create an infinite sequence of ticks, seperated by the specified time duration + Observable values = Observable.interval(1000, TimeUnit.MILLISECONDS); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: "+ e), + () -> System.out.println("Completed") + ); + try { + System.in.read(); + } catch (IOException ignore) { + System.out.println("Interrupted !"); + } + + // This sequence will not terminate until we unsubscribe + // We should note why the blocking read at the end is necessary. + // Without it, the program terminates without printing something. That's because our operations are non-blocking: + // we create an observable that will emit values over time, then we register the actions to execute if and when values arrive. + // None of that is blocking and the main thread proceeds to terminate. The timer that produces the ticks runs on its own thread, + // which does not prevent the JVM from terminating. killing the timer with it. + } + + public void functional_unfolds_observable_timer_1() { + // This example creates an observable that waits a given amount of time, then emits 0L and terminates. + Observable values = Observable.timer(1, TimeUnit.SECONDS); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try { + System.in.read(); + } catch (IOException ignore) { + System.out.println("Interrupted !"); + } + + + } + + public void functional_unfolds_observable_interval_2() { + // This example will wait a specified amount of time, then begin emitting like interval with the given frequency + Observable values = Observable.interval(2, 1, TimeUnit.SECONDS); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try { + System.in.read(); + } catch (IOException ignore) { + System.out.println("Interrupted !"); + } + + } + + +// public void transitioning_into_observable_eventhandler() { +// Observable events = Observable.create( o -> { +// button2.setOnAction(new EventHandler() { +// @Override public void handle(ActionEvent e) { +// o.onNext(e) +// } +// }); +// }); +// } + + public void transitioning_into_observable_fromFuture() { + // Much like most of the functions we've seen so far, you can turn any kind of input into an Rx observable with create. + // There are several shorthands for converting common types of input + + FutureTask f = new FutureTask(() -> { + Thread.sleep(2000); + return 21; + }); + new Thread(f).start(); + + Observable values = Observable.fromFuture(f); + + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + // The observable emits the result of the FutureTask when it is available and the terminates. + // If the task is canceled, the observable will emit a `java.util.concurrent.CancellationException` error. + + // If you're interested in the results of the `Future` for a limited amount of time, you can provide a timeout period like this + // Observable values = Observable.fromFuture(f, 1000, TimeUnit.MICROSECONDS); + } + + public void transitioning_into_observable_fromArray() { + // You can also turn any collection into an observable using the overloads of Observable.from that take arrays and iterables. + // This will result in every item in the collection being emitted and then a final onCompleted event + Integer[] is = {1, 2, 3}; + Observable values = Observable.fromArray(is); + Disposable disposable = values.subscribe( + v -> System.out.println("Received: " + v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + // `Observable` is not interchangeable with `Iterable` or `Stream`. + // `Observable`s are push-based, i.e., the call to `onNext` causes the stack of handlers to execute all the way to the final subscriber method (unless specified otherwise). + // The other models are pull-based, which means that values are requested as soon as possible and execution blocks until the result is returned. + public static void main(String[] args) { + Sample2_1 sample = new Sample2_1(); + sample.transitioning_into_observable_fromArray(); + + } +} diff --git a/practice/src/main/java/Sample2_2.java b/practice/src/main/java/Sample2_2.java new file mode 100644 index 0000000..1f98418 --- /dev/null +++ b/practice/src/main/java/Sample2_2.java @@ -0,0 +1,250 @@ +import java.util.concurrent.TimeUnit; + +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; + +public class Sample2_2 { + //Reducing a sequence + public void filter() { + // If the decision is false, the item is ommited from the filtered sequence. + Observable values = Observable.range(0, 10); + Disposable oddNumbers = values + .filter(v -> v % 2 == 0) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void distinct() { + Observable values = Observable.create(o -> { + o.onNext(1); + o.onNext(1); + o.onNext(2); + o.onNext(3); + o.onNext(2); + o.onComplete(); + }); + + Disposable disposable = values + .distinct() + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void distinct_keySelector() { + //An overload of distinct takes a key selector. For each item, the function generates a key and the key is then used to determine distinctiveness + Observable values = Observable.create(o -> { + o.onNext("First"); + o.onNext("Second"); + o.onNext("Third"); + o.onNext("Fourth"); + o.onNext("Fifth"); + o.onComplete(); + }); + + Disposable disposable = values + .distinct(v -> v.charAt(0)) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void distinct_untilChanged() { + // The difference is that only consecutive non-distinct values are filtered out + Observable values = Observable.create(o -> { + o.onNext(1); + o.onNext(1); + o.onNext(2); + o.onNext(3); + o.onNext(2); + o.onComplete(); + }); + + Disposable disposable = values + .distinctUntilChanged() + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void distinct_untilChanged_keySelector() { + Observable values = Observable.create(o -> { + o.onNext("First"); + o.onNext("Second"); + o.onNext("Third"); + o.onNext("Fourth"); + o.onNext("Fifth"); + }); + + Disposable disposable = values + .distinctUntilChanged(v -> v.charAt(0)) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + } + + public void ignoreElements() { + //ignoreElement will ignore every value, but lets pass through onCompleted and onError + Observable values = Observable.range(0, 10); + Disposable disposable = values + .ignoreElements() + .subscribe( + () -> System.out.println("Completed"), + e -> System.out.println("Error: " + e) + ); + + //ignoreElements() produces the same results as `filter(v -> false)` + } + + public void take() { + // The next group of methods serve to cut the sequence at a specific point based on the item's index, + // and either take the first part or the second part + // `take` takes the first n elements, while `skip` skips them. + // Note that neither function considers it an error if there are fewer items in the sequence than the specified index. + + Observable values = Observable.range(0, 5); + Disposable take2 = values + .take(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void take_with_error() { + Observable values = Observable.create(o -> { + o.onNext(1); + o.onError(new Exception("Oops")); + }); + + Disposable disposable = values + .take(1) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void skip() { + // `skip` returns the other half of a take + Observable values = Observable.range(0, 5); + Disposable disposable = values + .skip(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void take_moment() { + // There are overloads where the cutoff is a moment in time ather than place in the sequence + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .take(250, TimeUnit.MILLISECONDS) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try{System.in.read();} catch(Exception ignore){} + } + + public void takeWhile() { + //`take` and `skip` work with predefined indices. If you want to "discover" the cutoff point as the values come, + //`takeWhile` and `skipWhile` will use a predicate instead. `takeWhile` takes items while a predicate function returns `true` + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .takeWhile(v -> v < 2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try{System.in.read();} catch(Exception ignore){} + } + + public void skipWhile() { + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .skipWhile(v -> v < 2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try{System.in.read();} catch(Exception ignore){} + + } + + public void skipLast() { + //`skipLast` and `takeLast` work just like `take` and `skip`, with the difference that the point of reference is from the end + Observable values = Observable.range(0, 5); + + Disposable disposable = values + .skipLast(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void takeUntil() { + // The cutoff point is defined as the moment when another obervable emits an item + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + Observable cutoff = Observable.timer(250, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .takeUntil(cutoff) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try{System.in.read();} catch(Exception ignore){} + } + + public void skipUntil() { + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + Observable cutoff = Observable.timer(250, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .skipUntil(cutoff) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try{System.in.read();} catch(Exception ignore){} + + + } + + public static void main(String[] args) { + Sample2_2 sample = new Sample2_2(); + sample.skipUntil(); + } +} diff --git a/practice/src/main/java/Sample2_3.java b/practice/src/main/java/Sample2_3.java new file mode 100644 index 0000000..6185941 --- /dev/null +++ b/practice/src/main/java/Sample2_3.java @@ -0,0 +1,236 @@ +import java.util.concurrent.TimeUnit; + +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.operators.observable.ObservableInternalHelper; + +public class Sample2_3 { + + public void all_success() { + // The all method established that every value emitted by an observable meets a criterion. + Observable values = Observable.create(o -> { + o.onNext(0); + o.onNext(10); + o.onNext(10); + o.onNext(2); + o.onComplete(); + }); + + Disposable evenNumbers = values + .all(i -> i % 2 == 0) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + try {System.in.read();} catch (Exception ignore) {} + + // As soon as an item fails the predicate, `false` will be emiited. + // A value of `true` on the other hand cannot be emitted until the source sequence has completed and `all` of the items are checked + // Returning the decision inside an observable is a convenient way of making the operation non-blocking + + } + + public void all_fail() { + Observable values = Observable.interval(150, TimeUnit.MILLISECONDS).take(5); + + Disposable disposable = values + .all(i -> i < 3) // Will fail eventually + .subscribe( + v -> System.out.println("All: " + v), + e -> System.out.println("All Error: " + e) + ); + + Disposable disposable2 = values + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try {System.in.read();} catch (Exception ignore) {} + } + + public void all_error() { + // If the source observable emits an error, then `all` becomes irrelevant and the error pass through, terminating the sequence + Observable values = Observable.create(o -> { + o.onNext(0); + o.onNext(2); + o.onError(new Exception()); + }); + + Disposable disposable = values + .all(i -> i % 2 == 0) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + + try {System.in.read();} catch (Exception ignore) {} + } + + public void all_fail_before_error() { + // If, howeve, the predicate fails, then `false` is emitted and the sequence terminates. Even if the source observable fails after that, + // the event is ignroed, as required by the Rx contract ( no events after a termination event ) + + Observable values = Observable.create( o -> { + o.onNext(1); + o.onNext(2); + o.onError(new Exception()); + }); + + Disposable disposable = values + .all(i -> i % 2 == 0) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + } + + public void exist() { + // Change to use `Maybe` in RxJava2 ? + // The exist method returns an observable that will emit `true` if any of the values emitted by the observable make the predicate true + +// Observable values = Observable.range(0, 2); +// +// Disposable disposable = values +// .exists(i -> i > 2) +// .subscribe( +// v -> System.out.println(v), +// e -> System.out.println("Error: " + e) +// ); + + } + + public void isEmpty() { + // This operator's result is a boolean value, indecating if an observable emitted values before completing or not + Observable values = Observable.timer(1000, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .isEmpty() + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + + try {System.in.read();} catch (Exception ignore) {} + } + + public void contains() { + // `contains` establishes if a particular element is emitted by an observable + // `contains` will use the `Object.equals` method to establish the quality + + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + Disposable disposable = values + .contains(4L) // if we had used 4 where we used 4L, nothing would be printed. 4 != 4L in java + .subscribe( + v -> System.out.println("Contains: " + v), + e -> System.out.println("Contains Error: " + e) + ); + +// Disposable disposable2 = values +// .subscribe( +// v -> System.out.println(v), +// e -> System.out.println("Error: " + e) +// ); + + + try {System.in.read();} catch (Exception ignore) {} + } + + public void defaultIfEmpty() { + // rather than checking with `isEmpty` + // you can force an observable to emit a value on completion if it didn't emit anything before completing + Observable values = Observable.empty(); + + Disposable disposable = values + .defaultIfEmpty(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void defaultIfEmpty_error() { + // the default calue will not be emitted before the error + Observable values = Observable.error(new Exception()); + + Disposable disposable = values + .defaultIfEmpty(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + } + + public void elementAt() { + // you can select exactly one element out of an observable using the `elementAt` method + Observable values = Observable.range(100, 10); + + Disposable disposable = values + .elementAt(2) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e), + () -> System.out.println("Completed") + ); + + try {System.in.read();} catch (Exception ignore) {} + } + + public void elementAtOrError() { + // to prevent `java.lang.IndexOutOfBoundsException` + // -> elementAtOrDefault is gone! + Observable values = Observable.range(100, 10); + + Disposable disposable = values + .elementAtOrError(22) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + + try {System.in.read();} catch (Exception ignore) {} + } + + public void sequenceEqual() { + // two sequences are equal by comparing the values at hte same index + // Both the size of the sequences and the values must be equal + // The function will either use `Object.equals` or the function that you supply to compare values + + Observable strings = Observable.just("1", "2", "3"); + Observable ints = Observable.just(1, 2, 3); + + Observable.sequenceEqual(strings, ints, (s, i) -> s.equals(i.toString())) + //.sequenceEqual(strings, ints) -- result would be false + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + + } + + public void sequenceEqual_error() { + //failing is not part of the comparision. As soon as either sequence fails, the resulting observable forwards the error + Observable values = Observable.create( o -> { + o.onNext(1); + o.onNext(2); + o.onError(new Exception()); + }); + + Observable.sequenceEqual(values, values) + .subscribe( + v -> System.out.println(v), + e -> System.out.println("Error: " + e) + ); + + } + + public static void main(String[] args) { + Sample2_3 sample = new Sample2_3(); + sample.sequenceEqual_error(); + + } +} diff --git a/practice/src/main/java/Sample2_4.java b/practice/src/main/java/Sample2_4.java new file mode 100644 index 0000000..47367c0 --- /dev/null +++ b/practice/src/main/java/Sample2_4.java @@ -0,0 +1,354 @@ +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import io.reactivex.Observable; +import io.reactivex.subjects.ReplaySubject; +import io.reactivex.subjects.Subject; +import model.Person; +import subscriber.PrintSubscriber; + +public class Sample2_4 { + // How we can use the data in the sequence to derive new meaningful values + // The methods we will see here resemble what is called catamorphism. + // In our case, it would mean that the methods consume the values in the sequence and compose them into one + // However, they do not strictly meet the definition, as they don't return a single value. Rather, they return an observable that promises to emit a single value + + public void count() { + Observable values = Observable.range(0,3); + + values + .subscribe(new PrintSubscriber("Values")); + + values + .count() + .subscribe(new PrintSubscriber("Count")); + + } + + public void first() { + // `first` will return an observable that emits only the first value in sequence + // It is similar to `take(1)`, except that it will emit `java.util.NoSuchElementException` if noe is found. + // If you use the overload that takes a predicate, the first value that matches the predicate is returned + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + values + .first(1000L) + .subscribe(new PrintSubscriber("First")); + + } + + public void last() { + // `last` and `lastOrDefault` work in the same way as `first`, + // except that the item returned is the last item before sequence completed + Observable values = Observable.range(0, 10); + + values + .last(1234) + .subscribe(new PrintSubscriber("Last")); + + } + + public void single() { + // `single` emits the only value in the sequence, or the only value that met predicate when is given. + // It differs from `first` and `last` in that it does not ignore multiple matches. + // If multiple matches are found, it will emit an error. It can be used to assert that a sequence must only contain one such value. + // Remember that `single` must check the entire sequence to ensure your assertion + + //Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + Observable values = Observable.range(0,0); + //Observable values = Observable.range(0,1); + + values.take(10) + .single(5) // Emits a result + .subscribe(new PrintSubscriber("Single1")); + + values.take(3) + .single(6) + .subscribe(new PrintSubscriber("Single2")); + + } + + public void reduce() { + // The general idea is that you produce a single value out of many by combining them two at a time + // In its most basic overload, all you need is a function that combines two values into one. + // In example, Here we will calculate the sum of a sequence of integers: 0+1+2+3+4+.... We will also calculate the minimum value for a different example + + // 0 1 2 3 4 + Observable values = Observable.range(0,5); + values + .reduce((i1, i2) -> i1+i2) + .subscribe(new PrintSubscriber("Sum")); + + values + .reduce((i1,i2) -> (i1>i2) ? i2 : i1) + .subscribe(new PrintSubscriber("Min")); + + //Each time, the accumulator function combines the result of the previous step with the next value. + // This is more obvious in another overload + // public final Observable reduce(R initialValue, Func2 accumulator) + + } + + public void count_via_reduce() { + Observable values = Observable.just("Rx", "is", "easy"); + + values + .reduce(0, (acc, next) -> acc + 1) + .subscribe(new PrintSubscriber("Count")); + + + // We start with an accumulator of 0, as we have counted 0 items. + // Every time a new item arrives, we return a new accumulator that is increased by one. + // The Last value corresponds to the number of elements in the source sequence + + // `reduce` can be used to implement the functionality of most of the operators that emit a single value. + // It can not implement behaviour where a value is emitted before the source completes. + // So, you can implement `last` using `reduce`, but an implementation of `all` would not behave exactly like the original + + values + .reduce((i1,i2) -> i2) + .subscribe(new PrintSubscriber("Last")); + } + + public void scan() { + // `scan` is very similar to `reduce`, with the key difference being that `scan` will emit all the intermediate result + // In the case of our example for a sum, using `scan` will produce a running sum + + Observable values = Observable.range(0, 5); + + values + .scan((i1, i2) -> i1+i2) + .subscribe(new PrintSubscriber("Sum")); + + // `scan` is more general than `reduce`, since `reduce` can be implemented with `scan: reduce(acc) = scan(acc).takeLast()` + + } + + public void scan_minimum() { + // `scan` emits when the source emits and does not need the source to complete. + // We demonstrate that by implementing an observable that returns a running minimu,: + Subject values = ReplaySubject.create(); + + values + .subscribe(new PrintSubscriber("Values")); + + values + .scan((i1,i2)->(i1` into a `List` + + Observable values = Observable.range(10, 5); + + values + .reduce( + new ArrayList(), + (acc, value) -> { + acc.add(value); + return acc; + }) + .subscribe(v -> System.out.println(v)); + } + + public void aggregate_right() { + // `aggregate()` has a problem formality: `reduce` is meant to be a functional fold and such folds are not supposed to work on mutable accumulators. + // If we were to do this the "right" way, we would have to create a new instance of `ArrayList` for every new item, like this + Observable values = Observable.range(10, 5); + + values + .reduce( + new ArrayList(), + (acc, value) -> { + ArrayList newAcc = (ArrayList) acc.clone(); + newAcc.add(value); + return newAcc; + } + ).subscribe(v -> System.out.println(v)); + } + + public void collect() { + // The performance of creating a new collection for every new item is unacceptable. For that reason, Rx offers the `collect` operator, + // which does the same thing as `reduce`, only using a mutable accumulator this time + // By using `collect` you document that you are not following the convention of immutability and you also simplify tour code a little: + Observable values = Observable.range(10, 5); + + values + .collect( + () -> new ArrayList(), + (acc, value) -> acc.add(value)) + .subscribe(v -> System.out.println(v)); + } + + public void toList() { + Observable values = Observable.range(10, 5); + + values + .toList() + .subscribe(v -> System.out.println(v)); + } + + public void toSortedList() { + Observable values = Observable.range(10, 5); + + values + .toSortedList((i1,i2) -> i2 - i1) + .subscribe(v -> System.out.println(v)); + } + + // `keySelector` is a function that produces a key from a value + // `valueSelector` produces from the emitted value the actual value that will be stored in the map + // `mapFactory` creates the collection that will hold the items + public void toMap_simple() { + Observable values = Observable.just( + new Person("Will", 25), + new Person("Nick", 40), + new Person("Saul", 35) + ); + + values + .toMap(person -> person.name) + .subscribe(new PrintSubscriber("toMap")); + } + + public void toMap_key_value() { + Observable values = Observable.just( + new Person("Will", 25), + new Person("Nick", 40), + new Person("Saul", 35) + ); + + values + .toMap( + person -> person.name, + person -> person.age) + .subscribe(new PrintSubscriber("toMap")); + } + + public void toMap_key_value_container() { + Observable values = Observable.just( + new Person("Will", 25), + new Person("Nick", 40), + new Person("Saul", 35) + ); + + values + .toMap( + person -> person.name, + person -> person.age, + () -> new HashMap<>()) + .subscribe(new PrintSubscriber("toMap")); + + // The container is provided as a factory function because a new container needs to be created for every new subscription + } + + // When mapping, it is very common that many values share the same key. + // The datastrcuture that maps one key to multiple values is called a multimap and it is a map from keys to collections. + // This process can also be called "grouping" + + public void multiMap_grouping() { + Observable values = Observable.just( + new Person("Will", 35), + new Person("Nick", 40), + new Person("Saul", 35) + ); + + values + .toMultimap( + person -> person.age, + person -> person.name + ) + .subscribe(new PrintSubscriber("toMap")); + } + + public void multiMap_container() { + // The fourth allows us to provide not only the `Map` but also the `Collection` that the values will be stored in + // The key is provided as a parameter, in case we want to customise the corresponding collection based on key + // This example we'll just ignore it + Observable values = Observable.just( + new Person("Will", 35), + new Person("Nick", 40), + new Person("Saul", 35) + ); + + values + .toMultimap( + person -> person.age, + person -> person.name, + () -> new HashMap(), + (key) -> new ArrayList() + ).subscribe(new PrintSubscriber("toMap")); + + // The operators just presented have actually limited use. + // It is tempting for a beginner to collect the data in a collection and process them in the traditional way. + // That should be avoided not just for didactic purpose, but because this practice defeats the advantages of using Rx in the first place + + } + + public void groupBy() { + // The last general function that we will see for now is `groupBy` + // It is the Rx way of doing `toMultimap` + // For each value, it calculates a key and groups the values into seperate observables based on that key + + //The return value is an observable of `GroupObservable` + + // The nested observables may complicate the signature, but they offer the advantage of allowing the groups to start emitting their items before the source observable has completed + + // In example, we will take a set of words and, for each starting letter, we will print the last word that occured + + Observable values = Observable.just( + "first", + "second", + "third", + "forth", + "fifth", + "sixth" + ); + + values.groupBy(word -> word.charAt(0)) + .subscribe( + group -> group.last("last") + .subscribe(v -> System.out.println(group.getKey() + ": " + v)) + ); + } + + public void flatMap() { + Observable values = Observable.just( + "first", + "second", + "third", + "forth", + "fifth", + "sixth" + ); + +// values.groupBy(word -> word.charAt(0)) +// .flatMap(group -> group.last("last") +// .map(v -> group.getKey() + ": " + v)) +// .subscribe(v -> System.out.println(v)); + + //FIXME + values.groupBy(word -> word.charAt(0)) + //.flatMap((groupedObservable, mapper) -> mapper.) + .subscribe(v -> System.out.println(v)); + } + + + public static void main(String[] args) { + Sample2_4 sample = new Sample2_4(); + sample.groupBy(); + try {System.in.read();} catch (Exception ignore) {} + + } +} diff --git a/practice/src/main/java/Sample2_5.java b/practice/src/main/java/Sample2_5.java new file mode 100644 index 0000000..cf6d2fd --- /dev/null +++ b/practice/src/main/java/Sample2_5.java @@ -0,0 +1,167 @@ +import java.util.concurrent.TimeUnit; + +import io.reactivex.Observable; +import subscriber.PrintSubscriber; + +public class Sample2_5 { + + public void map() { + Observable values = Observable.range(0, 4); + + values.map(i -> i + 3) + .subscribe(new PrintSubscriber("Map")); + } + + public void map_practical() { + Observable values = Observable.just("0", "1", "2", "3") + .map(Integer::parseInt); + + values.subscribe(new PrintSubscriber("Map")); + } + + public void cast() { + Observable values = Observable.just(0, 1, 2, 3); + + values + .cast(Integer.class) + .subscribe(new PrintSubscriber("Map")); + } + + public void cast_fail() { + Observable values = Observable.just(0, 1, 2, "3"); + + values + .cast(Integer.class) + .subscribe(new PrintSubscriber("Map")); + } + + public void ofType() { + Observable values = Observable.just(0, 1, "2", 3); + + values + .ofType(Integer.class) + .subscribe(new PrintSubscriber("Map")); + } + + //timestamp 는 나중에.. + + public void materialize() { + Observable values = Observable.interval(100, TimeUnit.MILLISECONDS); + + values.take(3) + .materialize() + .subscribe(new PrintSubscriber("Materialize")); + + // `dematerialize` will reverse the effect of `materialize`, returning a materialized observable to its normal form + } + + public void flatMap_1() { + // `map` took one value and returned another, replacing items in the sequence one-for-one. + // `flatMap` will replace an item with any number of items, including zero or infinite items. + // `flatMap`'s transformation method takes values from the source observable and, for each of them, returns a new observable that emits the new values + + // The observable returned by `flatMap` will emit all the values emitted by all the observables produced by the transformation function. + // Values from the same observable will be in order, but they may be interleaved with values from other observables. + + // example, `flatMap` is applied on an observable with a single value. + // `values` will emit a single value, `2`. `flatMap` will turn it into an observable that is the range between 0 and 2. + // The values in this observable are emitted in the final observable + Observable values = Observable.just(2); + + values + .flatMap(i -> Observable.range(0,2)) + .subscribe(new PrintSubscriber("flatMap")); + } + + public void flatMap_2() { + // When `flatMap` is applied on an observable with multiple values, each value will produce a new observable. + // `values1 will emit `1`, `2` and `3`. The resulting observables will emit the values `[0]`, `[0,1]`, `[0,1,2]`, respectively. + // The values will be flattened together into one observable: the one that is returned by `flatMap` + + Observable values = Observable.range(1,3); + + values + .flatMap(i -> Observable.range(0,i)) + .subscribe(new PrintSubscriber("flatMap")); + } + + public void flatMap_3() { + // Much like `map`, `flatMap`'s input and output type are free to differ. + // In the next example, we will transform integers into `Character` + + Observable values = Observable.just(1); + + values + .flatMap(i -> + Observable.just((char)(i+64)) + ) + .subscribe(new PrintSubscriber("flatMap")); + } + + public void flatMap_4() { + // This hasn't helped us more than `map` operator. There is one key difference that we can exploit to get more out of the `flatMap` operator + // While every value must result in a `Observable`, nothing prevents this observable from being empty. + // We can use that to silenty filter the sequence while transforming it at the same time + + Observable values = Observable.range(0, 30); + + values + .flatMap(i -> { + if (0 < i &&i <= 26) + return Observable.just(Character.valueOf((char)(i+64))); + else + return Observable.empty(); + }) + .subscribe(new PrintSubscriber("flatMap")); + } + + public void flatMap_5() { + // In our examples for `flatMap` so far, the values where in sequence: first all the values from the first observable, + // then all the values from the second observable. + // Though this seems intuitive, especially when coming from a synchronous environment, it is important to note that this is not always the case + // The observable returned by `flatMap` emits values as soon as they are available. + // It just happened that in our examples, all of the observables had all of their values ready ssynchronously. + // To demonstrate, we construct asynchronous observables using the `interval` method + + Observable.just(100, 150) + .flatMap( i -> + Observable.interval(i, TimeUnit.MILLISECONDS) + .map(v -> i) + ) + .take(10) + .subscribe(new PrintSubscriber("flatMap")); + + // We started with the values 100 and 150, which we used as the interval period for the asynchronous observable created in `flatMap` + // Since `interval` emits the numbers 1,2,3... in both cases, to better distinguish the two observables, we replaced those values with interval time that each observable operates on + } + + public void concatMap() { + // Even though `flatMap` share its name with a very common operator in functional programming, we saw that it doesn't behave exactly like a functional progammer would expect + // `flatMap` may interleave the supplied sequences. There is an operator that won't interleave the sequences and is called `concatMap`, because it is related to the concat operator that we will see later + + Observable.just(100, 150) + .concatMap( i -> + Observable.interval(i, TimeUnit.MILLISECONDS) + .map(v -> i) + .take(3) + ) + .subscribe( + new PrintSubscriber("concatMap") + ); + + // We can see in the output that the two sequences are kept seperate + // Note that the `concatMap` operator only works with terminating sequences: it can't move on to the next sequence before the current sequence terminates + // For that reason, we had to limit `interval`'s infinite sequence with `take` + + + } + + + public static void main(String[] args) { + Sample2_5 sample = new Sample2_5(); + sample.concatMap(); + + try {System.in.read();} catch (Exception ignore) {} + + } +} diff --git a/practice/src/main/java/model/Person.java b/practice/src/main/java/model/Person.java new file mode 100644 index 0000000..73c20df --- /dev/null +++ b/practice/src/main/java/model/Person.java @@ -0,0 +1,10 @@ +package model; + +public class Person { + public final String name; + public final Integer age; + public Person(String name, int age) { + this.name = name; + this.age = age; + } +} diff --git a/practice/src/main/java/subscriber/PrintSubscriber.java b/practice/src/main/java/subscriber/PrintSubscriber.java new file mode 100644 index 0000000..4b9db36 --- /dev/null +++ b/practice/src/main/java/subscriber/PrintSubscriber.java @@ -0,0 +1,41 @@ +package subscriber; + +import io.reactivex.MaybeObserver; +import io.reactivex.Observer; +import io.reactivex.SingleObserver; +import io.reactivex.disposables.Disposable; + +public class PrintSubscriber implements Observer, SingleObserver, MaybeObserver { + private final String name; + + public PrintSubscriber(String name) { + this.name = name; + } + @Override + public void onSubscribe(final Disposable d) { + + } + + @Override + public void onSuccess(final Object o) { + System.out.println(name + ": Success: " + o); + } + + @Override + public void onNext(final Object o) { + System.out.println(name + ": " + o); + + } + + @Override + public void onError(final Throwable e) { + System.out.println(name + ": Error: " + e); + + } + + @Override + public void onComplete() { + System.out.println(name + ": Completed"); + + } +}