目录
1.rxjava从原理是基于一种扩展观察者模式。
2.扩展观察者模式当中有4个关键角色
3.rxjava本质原理
4.创建rxjava可以分为三个步骤
5.rxjava使用方法
6.rxjava使用总结
7.轮询的定义
8.相比轮询,长连接的缺点
9.使用Handler实现轮询方法
10.使用rxjava实现轮询的网络请求
11.缓存策略
12.为什么删除缓存?
13.LRU核心思想
14.LruCache
15.LruCache类源码分析
16.Rxjava是如何实现缓存的
<1>观察者。观察者它是用来接收事件,并基于事件作出响应动作的一个角色。
<2>被观察者。被观察者它是用于生产事件的。它生产事件会交给观察者,观察者会根据响应作出不同的动作。
<3>订阅。这个角色是用于连接被观察者和观察者之间的。
<4>事件。被观察者和观察者之间沟通的载体。
当被观察者Observable通过订阅subscribe这个方法,按顺序发送事件给观察者Observer的时候,这时观察者Observer它会根据我们接收到事件顺序,依次做出不同的响应动作。我们在这个动作当中可以做不同的业务需求。
<1>创建被观察者(Observable),被观察者要生产事件来交给观察者进行接收。
<2>创建观察者(Observer),观察者会根据接收到观察者发送的事件,来去定义响应事件的动作和行为。
<3>通过订阅(Subscribe)连接观察者和被观察者。
Step 1
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
首先我们要创建被观察者Observable。Observable通过create方法生产事件。而create是rxjava当中最基本的一个创建事件序列的方法。在这个方法当中,我们传入了一个OnSubscribe这个对象。也就说当我们这个被观察者被订阅的时候,它会调用onSubscribe当中的subscribe方法,来去依次地被触发。这其实就是一种观察者模式。也就是说被观察者它是被观察者在那边观察的。一旦被观察者有事件的改变,观察者就会通知其他的去做出不同的响应。所以说这是一种扩展的观察者模式。还要注意的是,我们要去复写subscribe方法。subscribe这个方法里面,我们要去定义发送事件。如何定义发送事件?它其实就是通过ObservableEmitter这个观察者的发射类来进行的。这个类其实就是事件发射器的类,在这个类当中我们可以去定义需要发送的事件。同时我们还可以向观察者发送事件。
Step 2
方法一:
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
创建观察者有两种方式,第一种就是上述代码所展示的。在创建观察者的时候,还要去定义响应事件的行为。各回调方法说明如下:
onSubscribe:观察者接收事件前,默认最先调用复写onSubscribe()
onNext:当被观察者生产Next事件,并且观察者接收到时,会调用该复写方法进行响应。
onError:当被观察者生产Error事件,并且观察者接收到时,会调用该复写方法进行响应。
onComplete:当被观察者生产Complete事件,并且观察者接收到时,会调用该复写方法进行响应。
方法二:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
第二种subscribe,它其实是rxjava内置的实现了Observer的抽象类。这个抽象类的创建过程与Observer类似。Subscribe这个抽象类是对Observer接口进行了扩展。 这个类多了两个方法。一个是onStart(),它在还未响应事件前会调用,做一些初始化的工作;还有就是unsubscribe这个方法。它用于去取消订阅。就说一旦观察者订阅了被观察者之后,它可以去取消订阅。那么这个方法调用之后,观察者就不会再接收到被观察者发送过来的响应事件了。
Step 3
observable.subscribe(observer);
第三步就是订阅,它就是调用subscribe这个方法,就完成了观察者、被观察者的调用。
<1>创建被观察者(Observable)&生产事件
<2>创建观察者(Observer)并定义响应事件的行为
<3>通过订阅(Subscribe)连接观察者和被观察者
APP端每隔一定的时间重复请求的操作。
长连接也可以完成类似轮询的需求,但这个长连接并不是稳定可靠的。我们在轮询操作的时候一般都需要稳定的网络请求。而轮询操作相比长连接它是有生命周期的。就是说轮询是在一定的生命周期内去执行完成的。而我们的长连接它是要跨整个进程生命周期的。所以说这两者是有区别的。
private static Handler loopRequestHandler = new Handler(){
@Override
public void handleMessage(Message msg) {
if (msg.what == LOOP_WHAT) {
dosomething();
loopRequestHandler.removeMessages(LOOP_WHAT);
System.gc();
loopRequestHandler.sendEmptyMessageDelayed(LOOP_WHAT, 2000);
}
}
};
该方法其实就是使用了延迟执行的原理,使Handler内的方法每隔两秒执行一次。其中dosomething()方法是我们自己要执行的轮询方法。
<1>创建描述网络请求的接口
public class retrofit_interface {
@GET("部分URL")
Call<Person> getCall();
}
<2>发送网络请求
Observable.interval(2, 1, TimeUnit.SECONDS)
/*
* 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
* 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
**/
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
/*
* 步骤3:通过Retrofit发送网络请求
**/
// a. 创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create()) // 支持RxJava
.build();
// b. 创建 网络请求接口 的实例
LoopRequest_interface request = retrofit.create(LoopRequest_interface.class);
// c. 采用Observable<...>形式 对 网络请求 进行封装
Observable<Person> observable = request.getCall();
// d. 通过线程切换发送网络请求
observable.subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(Schedulers.newThread()) // 切换回到主线程 处理请求结果
.subscribe(new Observer<Person>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Person result) {
// e.接收服务器返回的数据
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
})
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
代码中我们调用被观察者interval方法,interval就是延迟发送的一个方法。这里展示的是无限次轮询,而不是有限次轮询。如果想使用有限次轮询,要调用intervalRange这个方法。
这里注意一下interval的三个参数。第一个参数表示第一次延迟的时间。第二个参数表示间隔时间的数字。第三个参数是时间单位。这里的参数设置就表示,我延迟2秒发送事件,同时每隔1秒产生一个数字,从0开始递增,而且我们是无限的轮询。这是第一步。
第二步,我们每次发送数字前,都要发送网络请求。这里的doOnNext回调就是在执行next事件前调用。也就说每隔1秒产生一个数字前,就发送一个网络请求。从而就实现了我们的轮询需求。
第三步就是利用Retrofit进行网络请求。首先要创建一个Retrofit对象,然后通过build构建者模式来创建Retrofit。通过baseurl来设置网络请求路径。调用ConverterFactory设置json解析器。通过适配设置rxjava可适配。这就完成了第一步。第二步我们要创建网络请求接口实例,就是通过Retrofit.create()方法。第三步我们要对我们的网络请求进行封装。第四步是不一样的地方,通过rxjava的线程切换,来进行发送网络请求的任务。首先我们要调用subscribeOn表示我们切换到子线程当中进行网络请求。然后调用observeOn表示我们切换到主线程来处理我们请求的结果。
一般来说Android的缓存策略其实主要包含缓存的添加、获取和删除这三类操作。
不论内存缓存还是硬盘缓存,它的缓存大小都是有上限的。当你的缓存存满之后,如果你想再继续添加缓存,这时候你务必要去删除一些旧的缓存,然后才能去增加新的缓存。
当缓存存满的时候,它会优先淘汰一部分的缓存对象,即近期最少使用的缓存对象。
Android3.1以后提供的缓存类。
<1>LruCache是一个泛型类,主要的算法原理就是把最近使用的那些对象,用强引用的形式存储在我们一个HashMap当中。而这个HashMap它是LinkedHashMap。还有在这里说的强引用,就是我们平时所使用的最正常的一种引用方式。当这个缓存满的时候,它会把最近最少使用的对象从内存当中移除。所以它提供了一个put方法和一个get方法来进行缓存的添加和获取。
<2>核心思想就是维持了一个缓存对象列表,即LinkedHashMap,然后里边的列表的排列方式是按访问的顺序实现的。所以说一直没有访问对象它就会处在整个队的队尾,所以你要删除时就会从队尾把它删除掉。
<3>构造函数
public LruCache(int maxSize) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
this.maxSize = maxSize;
this.map = new LinkedHashMap<K, V>(0, 0.75f, true);
}
在它的构造函数中实现了一个LinkedHashMap,所以说LruCache它的整体的那些逻辑原理都是通过LinkedHashMap来实现的。
<4>put方法
@Nullable
public final V put(@NonNull K key, @NonNull V value) {
if (key == null || value == null) {
throw new NullPointerException("key == null || value == null");
}
V previous;
synchronized (this) {
putCount++;
size += safeSizeOf(key, value);
previous = map.put(key, value);
if (previous != null) {
size -= safeSizeOf(key, previous);
}
}
if (previous != null) {
entryRemoved(false, key, previous, value);
}
trimToSize(maxSize);
return previous;
}
我们对如下代码进行分析
if (key == null || value == null) {
throw new NullPointerException("key == null || value == null");
}
这里会判断一下key和value是否为空,为空的话它会抛出异常。
synchronized (this) {
putCount++;
...
}
putCount为插入的缓存对象值,在这个同步代码块中,会将插入的缓存对象值+1。
synchronized (this) {
...
size += safeSizeOf(key, value);
...
}
增加已有的缓存大小。
synchronized (this) {
...
previous = map.put(key, value);
...
}
它会向这个map当中加入缓存对象。
synchronized (this) {
...
if (previous != null) {
size -= safeSizeOf(key, previous);
}
}
如果已有缓存对象,那么缓存大小一定要恢复到之前。
entryRemoved(false, key, previous, value);
这个方法是个空方法,我们可以自己去实现。
trimToSize(maxSize);
这个方法是调整缓存大小所使用的。
可以看到,put方法并没有什么太多难点,只是一些数据结构的操作。重要的是,在添加了缓存之后,我们要调用trimToSize方法,来判断缓存是否已满,满了就删除近期最少使用的算法。
<5>trimToSize方法
public void trimToSize(int maxSize) {
while (true) {
K key;
V value;
synchronized (this) {
if (size < 0 || (map.isEmpty() && size != 0)) {
throw new IllegalStateException(getClass().getName()
+ ".sizeOf() is reporting inconsistent results!");
}
if (size <= maxSize || map.isEmpty()) {
break;
}
Map.Entry<K, V> toEvict = map.entrySet().iterator().next();
key = toEvict.getKey();
value = toEvict.getValue();
map.remove(key);
size -= safeSizeOf(key, value);
evictionCount++;
}
entryRemoved(true, key, value, null);
}
}
我们对如下代码进行分析
if (size < 0 || (map.isEmpty() && size != 0)) {
...
}
如果这个map是为空的,并且这个size不等于0,或者size小于0的时候,这时会抛出异常。
if (size <= maxSize || map.isEmpty()) {
break;
}
如果size小于我们的最大缓存,那就意味着不需要再删除缓存对象了,这时通过break跳出整个循环。
Map.Entry<K, V> toEvict = map.entrySet().iterator().next();
用迭代器获取队尾的那个对象。这个队尾的对象就是那个近期最少访问的元素。
map.remove(key);
这里会调用remove把这个元素删除掉。
总结:trimToSize这个方法其实它就是通过LinkedHashMap不断地去删除它队尾的元素,把最近最少访问的那个元素删除掉。
<6>所以LrcCache算法就是在内部维护了一个LinkedHashMap,这个LinkedHashMap它是按照顺序来排序的。所以说你调用存储它的put方法,它就会结合添加元素的数量判断是否存满,存满了就会删除。这就是LruCache缓存算法的原理。
<1>设置一个Observable:检查内存缓存是否有该数据的缓存
String memoryCache = null;
/*
* 设置第1个Observable:检查内存缓存是否有该数据的缓存
**/
Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 先判断内存缓存有无数据
if (memoryCache != null) {
// 若有该数据,则发送
emitter.onNext(memoryCache);
} else {
// 若无该数据,则直接发送结束事件
emitter.onComplete();
}
}
});
这个Observable它的作用是检查我们内存缓存当中是否有该数据的缓存。这只是我们模拟的一种读取的场景。我们创建一个Observable被观察者还是调用create方法。create是在我们rxjava当中最基本的一种创造事件序列的方法。在create方法内部我们传入了一个OnSubscribe对象。当我们的Observable被观察者被观察者调用的时候,这个OnSubscribe对象中的subscribe方法就会自动被调用。那么我们所要写的事件序列就会按照次序来依次触发。在这个subscribe内部,我们首先会判断内存缓存有没有数据,这里直接判断memoryCache是否为空,如果有数据就调用发射器的onNext方法;如果没有该数据,那么就调用发射器的onComplete来表示这个事件结束了。
<2>接着设置第二个Observable,检查磁盘缓存是否有该数据的缓存
String diskCache = "从磁盘缓存中获取数据";
/*
* 设置第2个Observable:检查磁盘缓存是否有该数据的缓存
**/
Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 先判断磁盘缓存有无数据
if (diskCache != null) {
// 若有该数据,则发送
emitter.onNext(diskCache);
} else {
// 若无该数据,则直接发送结束事件
emitter.onComplete();
}
}
});
这个Observable它是检查我们磁盘缓存是否有该数据的缓存。它的创建方法也是通过create方法,在create方法内部会传入OnSuscribe这个对象。当我们第二个被观察者被调用的时候,它就会调用subscribe,按照顺序被我们依次触发。它的逻辑与内存缓存类似。它首先会判断磁盘缓存有无数据,通过字符串是否为空来判断。如果有数据它就调用发射器的onNext方法,并将我们字符串传入内部;没有数据就直接调用onComplete方法。
<3>第三个Observable是通过网络获取数据
Observable<String> network = Observable.just("从网络中获取数据");
<4>合并三个被观察者对象
Observable.concat(memory, disk, network)
// 2. 通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
.firstElement()
// 3. 观察者订阅
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("cache", "最终获取的数据来源 = " + s);
}
});
首先我们通过concat合并memory、disk、network这三个被观察者事件。它就是我们检查内存缓存、磁盘缓存和发送网络请求三个事件。然后它会将这三个顺序串连成队列。接下来调用firstElement()方法,从我们前面创建好的串联队列当中,去取出并发送第一个有效事件,也就是next事件。这时候它就会依次判断然后检查memory、disk、network。
该例子逻辑如下:
首先调用firstElement,它就会取出第一个事件memory。那么这时候它就会判断内存缓存当中是否有数据缓存,如果有的话,也就是这个memory如果是不为空的话,那么它就可以直接读取啦,而不会进行下面的操作。
但是如果这个memory是空的话,就表示我们内存缓存当中没有数据。那么这时候它就会往下取,取到第二个,也就是disk,来判断磁盘缓存当中是否有数据缓存。如果这时候磁盘缓存不为空,这时候就会调用next事件,被我们的被观察者监听到,来依次进行响应。所以说它会依次进行判断。