Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
// 通过create创建observable对象,在call中调用subscriber的onnext方法
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Rxjava: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
其实还有一种创建观察者的方式
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Rxjava: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
从本质上看这没有什么区别,因为在 RxJava 的 subscribe 过程中,Observer会被转换成Subscriber。其中更具体的区别在本文中先不说,在后续系列文章中说明。先知道一般使用Subscriber
* 最后,就是订阅事件
// 通过调用subscribe方法使观察者和订阅者产生关联,一旦订阅观察者就开始发送消息
observable.subscribe(observer);
// 或者
observable.subscribe(subscriber);
Rxjava:Hellow World Completed!
Observable.just(1,2,3,4).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(tag ,"onCompleted");
}
@Override
public void onError(Throwable arg0) {
}
@Override
public void onNext(String string) {
Log.d(tag ,string+",");
}
});
1,2,3,4,Completed!
private void Rxjava() {
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
Observable
.from(list)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(tag , s+",");
}
});
}
1,2,3,
private void Rxjava() {
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
Observable
.from(list)
.repeat(2)/重复执行两次
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(tag , s+",");
}
});
}
1,2,3,1,2,3,
Observable.range(0,10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log.d(tag , "onCompleted");
}
@Override
public void onError(Throwable arg0) {
}
@Override
public void onNext(Integer arg0) {
log.d(tag , arg0+",");
}
});
0,1,2,3,4,5,6,7,8,9,onCompleted
private void Rxjava() {
// 第一个参数为指定轮询时间,第二个参数为轮询时间单位(这里以秒为单位)
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long time) {
Log.d(tag , time);
}
});
}
10-10 11:45:22.599 27973-20494/com.kid.rxjavademo rxjava: 0
10-10 11:45:24.600 27973-20494/com.kid.rxjavademo rxjava: 1
10-10 11:45:26.600 27973-20494/com.kid.rxjavademo rxjava: 2
...
private void rxjava() {
// 指定一定时间后才发射(这里是4秒钟),将会在4秒后收到事件
Observable.timer(4, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long timer) {
Log.d(tag , "receiver");
}
});
}
另外timer还有一个三个参数的方法timer(3,3,TimeUnit.SECONDS)意思是 延迟3秒之后,每隔3秒发射一次