Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

hanfeifei/Study_RxJava

Open more actions menu

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RxJava (Reactive Extensions for the JVM)

简介

a library for composing asynchronous and event-based programs by using observable sequences.

项目主页:

https://github.com/ReactiveX/RxJava

特点:

  • 易于并发从而更好的利用服务器的能力
  • 易于有条件的异步执行
  • 一种更好的方式来避免回调地狱
  • 一种响应式方法

四种角色:

  • Observable
  • Observer
  • Subscriber
  • Subjects

观察者模式

img

使用

1.创建Observable(被观察者,事件的产出者)

**方法1**

Observable observable = Observable.create(new Observable.OnSubscribe<String>() { 
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("呵呵1");
        subscriber.onNext("呵呵2");
        subscriber.onNext("呵呵3");
        subscriber.onCompleted();
    }
});
**方法2**

Observable observable = Observable.just("呵呵1", "呵呵2", "呵呵3");
**方法3**

String[] strArray = {"呵呵1", "呵呵2", "呵呵3"};
Observable observable = Observable.from(strArray);

2.Subscribe (订阅)

observable.subscribe(observer);
或者:
observable.subscribe(subscriber);

3. Observer处理事件

new Observer<String>() {
    @Override
    public void onNext(String str) {
    	Log.d("---Observer--",str);
            }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
}

4.变换(map(),flatMap())

**map()**

Observable.from(list).map(new Func1<Employee, Employee>() {
            @Override
            public Employee call(Employee employee) {
                return employee;
            }
        }).subscribe(new Observer<Employee>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Employee e) {
                Log.d("---test4---", e.toString());
            }
        });
**flatMap()**

Observable.from(list).flatMap(new Func1<Employee, Observable<Book>>() {

            @Override
            public Observable<Book> call(Employee employee) {
                return Observable.from(employee.books);
            }
        }).filter(new Func1<Book, Boolean>() {
            @Override
            public Boolean call(Book book) {
                return book.back;
            }
        }).subscribe(new Observer<Book>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Book book) {
                Log.d("---test6---", book.toString());
            }
        });

5.线程控制Scheduler

  • 线程调度方法:
  • subscribeOn()——事件产生的线程
  • observeOn()——事件消费的线程
  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

适用场景

1.与 Retrofit 的结合

项目主页

3.各种异步操作

4.使用RxJava实现各种Rx

About

a sample demo of RxJava

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

Morty Proxy This is a proxified and sanitized view of the page, visit original site.