rxjava引入 rxjava handler

admin2024-10-07  32

目录

1.rxjava从原理是基于一种扩展观察者模式。

2.扩展观察者模式当中有4个关键角色

3.rxjava本质原理

4.创建rxjava可以分为三个步骤

5.rxjava使用方法

6.rxjava使用总结 

7.轮询的定义

8.相比轮询,长连接的缺点

9.使用Handler实现轮询方法

10.使用rxjava实现轮询的网络请求

11.缓存策略

12.为什么删除缓存?

13.LRU核心思想

14.LruCache

15.LruCache类源码分析

16.Rxjava是如何实现缓存的


1.rxjava从原理是基于一种扩展观察者模式。

2.扩展观察者模式当中有4个关键角色

<1>观察者。观察者它是用来接收事件,并基于事件作出响应动作的一个角色。

<2>被观察者。被观察者它是用于生产事件的。它生产事件会交给观察者,观察者会根据响应作出不同的动作。

<3>订阅。这个角色是用于连接被观察者和观察者之间的。

<4>事件。被观察者和观察者之间沟通的载体。

3.rxjava本质原理

当被观察者Observable通过订阅subscribe这个方法,按顺序发送事件给观察者Observer的时候,这时观察者Observer它会根据我们接收到事件顺序,依次做出不同的响应动作。我们在这个动作当中可以做不同的业务需求。

4.创建rxjava可以分为三个步骤

<1>创建被观察者(Observable),被观察者要生产事件来交给观察者进行接收。

<2>创建观察者(Observer),观察者会根据接收到观察者发送的事件,来去定义响应事件的动作和行为。

<3>通过订阅(Subscribe)连接观察者和被观察者。

5.rxjava使用方法

Step 1

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
});

首先我们要创建被观察者Observable。Observable通过create方法生产事件。而create是rxjava当中最基本的一个创建事件序列的方法。在这个方法当中,我们传入了一个OnSubscribe这个对象。也就说当我们这个被观察者被订阅的时候,它会调用onSubscribe当中的subscribe方法,来去依次地被触发。这其实就是一种观察者模式。也就是说被观察者它是被观察者在那边观察的。一旦被观察者有事件的改变,观察者就会通知其他的去做出不同的响应。所以说这是一种扩展的观察者模式。还要注意的是,我们要去复写subscribe方法。subscribe这个方法里面,我们要去定义发送事件。如何定义发送事件?它其实就是通过ObservableEmitter这个观察者的发射类来进行的。这个类其实就是事件发射器的类,在这个类当中我们可以去定义需要发送的事件。同时我们还可以向观察者发送事件。

Step 2

方法一:

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Integer integer) {

    }

    @Override
    public void onError(@NonNull Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

创建观察者有两种方式,第一种就是上述代码所展示的。在创建观察者的时候,还要去定义响应事件的行为。各回调方法说明如下:

onSubscribe:观察者接收事件前,默认最先调用复写onSubscribe()

onNext:当被观察者生产Next事件,并且观察者接收到时,会调用该复写方法进行响应。

onError:当被观察者生产Error事件,并且观察者接收到时,会调用该复写方法进行响应。 

onComplete:当被观察者生产Complete事件,并且观察者接收到时,会调用该复写方法进行响应。

方法二:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {

    }
};

第二种subscribe,它其实是rxjava内置的实现了Observer的抽象类。这个抽象类的创建过程与Observer类似。Subscribe这个抽象类是对Observer接口进行了扩展。 这个类多了两个方法。一个是onStart(),它在还未响应事件前会调用,做一些初始化的工作;还有就是unsubscribe这个方法。它用于去取消订阅。就说一旦观察者订阅了被观察者之后,它可以去取消订阅。那么这个方法调用之后,观察者就不会再接收到被观察者发送过来的响应事件了。

Step 3

observable.subscribe(observer);

第三步就是订阅,它就是调用subscribe这个方法,就完成了观察者、被观察者的调用。

6.rxjava使用总结 

<1>创建被观察者(Observable)&生产事件

<2>创建观察者(Observer)并定义响应事件的行为

<3>通过订阅(Subscribe)连接观察者和被观察者

7.轮询的定义

APP端每隔一定的时间重复请求的操作。

8.相比轮询,长连接的缺点

长连接也可以完成类似轮询的需求,但这个长连接并不是稳定可靠的。我们在轮询操作的时候一般都需要稳定的网络请求。而轮询操作相比长连接它是有生命周期的。就是说轮询是在一定的生命周期内去执行完成的。而我们的长连接它是要跨整个进程生命周期的。所以说这两者是有区别的。

9.使用Handler实现轮询方法

private static Handler loopRequestHandler = new Handler(){
        @Override
        public void handleMessage(Message msg) {
            if (msg.what == LOOP_WHAT) {
                dosomething();
                loopRequestHandler.removeMessages(LOOP_WHAT);
                System.gc();
                loopRequestHandler.sendEmptyMessageDelayed(LOOP_WHAT, 2000);
            }
        }
    };

该方法其实就是使用了延迟执行的原理,使Handler内的方法每隔两秒执行一次。其中dosomething()方法是我们自己要执行的轮询方法。

10.使用rxjava实现轮询的网络请求

<1>创建描述网络请求的接口

public class retrofit_interface {
    @GET("部分URL")
    Call<Person> getCall();
}

<2>发送网络请求 

Observable.interval(2, 1, TimeUnit.SECONDS)

                /*
                 * 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
                 * 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
                 **/
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(Long integer) throws Exception {

                        /*
                         * 步骤3:通过Retrofit发送网络请求
                         **/
                        // a. 创建Retrofit对象
                        Retrofit retrofit = new Retrofit.Builder()
                                .baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
                                .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
                                .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) // 支持RxJava
                                .build();

                        // b. 创建 网络请求接口 的实例
                        LoopRequest_interface request = retrofit.create(LoopRequest_interface.class);

                        // c. 采用Observable<...>形式 对 网络请求 进行封装
                        Observable<Person> observable = request.getCall();
                        // d. 通过线程切换发送网络请求
                        observable.subscribeOn(Schedulers.io())               // 切换到IO线程进行网络请求
                                .observeOn(Schedulers.newThread())  // 切换回到主线程 处理请求结果
                                .subscribe(new Observer<Person>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {
                                    }

                                    @Override
                                    public void onNext(Person result) {
                                        // e.接收服务器返回的数据
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                    }

                                    @Override
                                    public void onComplete() {

                                    }
                                });

                    }
                })
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

代码中我们调用被观察者interval方法,interval就是延迟发送的一个方法。这里展示的是无限次轮询,而不是有限次轮询。如果想使用有限次轮询,要调用intervalRange这个方法。

这里注意一下interval的三个参数。第一个参数表示第一次延迟的时间。第二个参数表示间隔时间的数字。第三个参数是时间单位。这里的参数设置就表示,我延迟2秒发送事件,同时每隔1秒产生一个数字,从0开始递增,而且我们是无限的轮询。这是第一步。

第二步,我们每次发送数字前,都要发送网络请求。这里的doOnNext回调就是在执行next事件前调用。也就说每隔1秒产生一个数字前,就发送一个网络请求。从而就实现了我们的轮询需求。

第三步就是利用Retrofit进行网络请求。首先要创建一个Retrofit对象,然后通过build构建者模式来创建Retrofit。通过baseurl来设置网络请求路径。调用ConverterFactory设置json解析器。通过适配设置rxjava可适配。这就完成了第一步。第二步我们要创建网络请求接口实例,就是通过Retrofit.create()方法。第三步我们要对我们的网络请求进行封装。第四步是不一样的地方,通过rxjava的线程切换,来进行发送网络请求的任务。首先我们要调用subscribeOn表示我们切换到子线程当中进行网络请求。然后调用observeOn表示我们切换到主线程来处理我们请求的结果。

11.缓存策略

一般来说Android的缓存策略其实主要包含缓存的添加、获取和删除这三类操作。

12.为什么删除缓存?

不论内存缓存还是硬盘缓存,它的缓存大小都是有上限的。当你的缓存存满之后,如果你想再继续添加缓存,这时候你务必要去删除一些旧的缓存,然后才能去增加新的缓存。

13.LRU核心思想

当缓存存满的时候,它会优先淘汰一部分的缓存对象,即近期最少使用的缓存对象。

14.LruCache

Android3.1以后提供的缓存类。

15.LruCache类源码分析

<1>LruCache是一个泛型类,主要的算法原理就是把最近使用的那些对象,用强引用的形式存储在我们一个HashMap当中。而这个HashMap它是LinkedHashMap。还有在这里说的强引用,就是我们平时所使用的最正常的一种引用方式。当这个缓存满的时候,它会把最近最少使用的对象从内存当中移除。所以它提供了一个put方法和一个get方法来进行缓存的添加和获取。

<2>核心思想就是维持了一个缓存对象列表,即LinkedHashMap,然后里边的列表的排列方式是按访问的顺序实现的。所以说一直没有访问对象它就会处在整个队的队尾,所以你要删除时就会从队尾把它删除掉。

<3>构造函数

public LruCache(int maxSize) {
        if (maxSize <= 0) {
            throw new IllegalArgumentException("maxSize <= 0");
        }
        this.maxSize = maxSize;
        this.map = new LinkedHashMap<K, V>(0, 0.75f, true);
    }

在它的构造函数中实现了一个LinkedHashMap,所以说LruCache它的整体的那些逻辑原理都是通过LinkedHashMap来实现的。

<4>put方法

@Nullable
    public final V put(@NonNull K key, @NonNull V value) {
        if (key == null || value == null) {
            throw new NullPointerException("key == null || value == null");
        }

        V previous;
        synchronized (this) {
            putCount++;
            size += safeSizeOf(key, value);
            previous = map.put(key, value);
            if (previous != null) {
                size -= safeSizeOf(key, previous);
            }
        }

        if (previous != null) {
            entryRemoved(false, key, previous, value);
        }

        trimToSize(maxSize);
        return previous;
    }

我们对如下代码进行分析

if (key == null || value == null) {
    throw new NullPointerException("key == null || value == null");
}

这里会判断一下key和value是否为空,为空的话它会抛出异常。

synchronized (this) {
    putCount++;

    ...

}

putCount为插入的缓存对象值,在这个同步代码块中,会将插入的缓存对象值+1。

synchronized (this) {

    ...

    size += safeSizeOf(key, value);

    ...

}

 增加已有的缓存大小。

synchronized (this) {

    ...

    previous = map.put(key, value);

    ...

}

它会向这个map当中加入缓存对象。

synchronized (this) {

    ...

    if (previous != null) {
        size -= safeSizeOf(key, previous);
    }
}

 如果已有缓存对象,那么缓存大小一定要恢复到之前。

entryRemoved(false, key, previous, value);

 这个方法是个空方法,我们可以自己去实现。

trimToSize(maxSize);

这个方法是调整缓存大小所使用的。

可以看到,put方法并没有什么太多难点,只是一些数据结构的操作。重要的是,在添加了缓存之后,我们要调用trimToSize方法,来判断缓存是否已满,满了就删除近期最少使用的算法。

 <5>trimToSize方法

public void trimToSize(int maxSize) {
    while (true) {
        K key;
        V value;
        synchronized (this) {
            if (size < 0 || (map.isEmpty() && size != 0)) {
                throw new IllegalStateException(getClass().getName()
                            + ".sizeOf() is reporting inconsistent results!");
            }

            if (size <= maxSize || map.isEmpty()) {
                break;
            }

            Map.Entry<K, V> toEvict = map.entrySet().iterator().next();
            key = toEvict.getKey();
            value = toEvict.getValue();
            map.remove(key);
            size -= safeSizeOf(key, value);
            evictionCount++;
        }
        entryRemoved(true, key, value, null);
    }
}

 我们对如下代码进行分析

if (size < 0 || (map.isEmpty() && size != 0)) {

    ...

}

如果这个map是为空的,并且这个size不等于0,或者size小于0的时候,这时会抛出异常。

if (size <= maxSize || map.isEmpty()) {
    break;
}

如果size小于我们的最大缓存,那就意味着不需要再删除缓存对象了,这时通过break跳出整个循环。

Map.Entry<K, V> toEvict = map.entrySet().iterator().next();

用迭代器获取队尾的那个对象。这个队尾的对象就是那个近期最少访问的元素。

map.remove(key);

这里会调用remove把这个元素删除掉。

总结:trimToSize这个方法其实它就是通过LinkedHashMap不断地去删除它队尾的元素,把最近最少访问的那个元素删除掉。

<6>所以LrcCache算法就是在内部维护了一个LinkedHashMap,这个LinkedHashMap它是按照顺序来排序的。所以说你调用存储它的put方法,它就会结合添加元素的数量判断是否存满,存满了就会删除。这就是LruCache缓存算法的原理。

16.Rxjava是如何实现缓存的

<1>设置一个Observable:检查内存缓存是否有该数据的缓存

String memoryCache = null;

    /*
     * 设置第1个Observable:检查内存缓存是否有该数据的缓存
     **/
    Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            // 先判断内存缓存有无数据
            if (memoryCache != null) {
                // 若有该数据,则发送
                emitter.onNext(memoryCache);
            } else {
                // 若无该数据,则直接发送结束事件
                emitter.onComplete();
            }

        }
    });

这个Observable它的作用是检查我们内存缓存当中是否有该数据的缓存。这只是我们模拟的一种读取的场景。我们创建一个Observable被观察者还是调用create方法。create是在我们rxjava当中最基本的一种创造事件序列的方法。在create方法内部我们传入了一个OnSubscribe对象。当我们的Observable被观察者被观察者调用的时候,这个OnSubscribe对象中的subscribe方法就会自动被调用。那么我们所要写的事件序列就会按照次序来依次触发。在这个subscribe内部,我们首先会判断内存缓存有没有数据,这里直接判断memoryCache是否为空,如果有数据就调用发射器的onNext方法;如果没有该数据,那么就调用发射器的onComplete来表示这个事件结束了。

<2>接着设置第二个Observable,检查磁盘缓存是否有该数据的缓存

String diskCache = "从磁盘缓存中获取数据";

    /*
     * 设置第2个Observable:检查磁盘缓存是否有该数据的缓存
     **/
    Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            // 先判断磁盘缓存有无数据
            if (diskCache != null) {
                // 若有该数据,则发送
                emitter.onNext(diskCache);
            } else {
                // 若无该数据,则直接发送结束事件
                emitter.onComplete();
            }

        }
    });

这个Observable它是检查我们磁盘缓存是否有该数据的缓存。它的创建方法也是通过create方法,在create方法内部会传入OnSuscribe这个对象。当我们第二个被观察者被调用的时候,它就会调用subscribe,按照顺序被我们依次触发。它的逻辑与内存缓存类似。它首先会判断磁盘缓存有无数据,通过字符串是否为空来判断。如果有数据它就调用发射器的onNext方法,并将我们字符串传入内部;没有数据就直接调用onComplete方法。

<3>第三个Observable是通过网络获取数据

Observable<String> network = Observable.just("从网络中获取数据");

<4>合并三个被观察者对象

Observable.concat(memory, disk, network)
                // 2. 通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件),即依次判断检查memory、disk、network
                .firstElement()

                // 3. 观察者订阅
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("cache", "最终获取的数据来源 =  " + s);
                    }
                });

首先我们通过concat合并memory、disk、network这三个被观察者事件。它就是我们检查内存缓存、磁盘缓存和发送网络请求三个事件。然后它会将这三个顺序串连成队列。接下来调用firstElement()方法,从我们前面创建好的串联队列当中,去取出并发送第一个有效事件,也就是next事件。这时候它就会依次判断然后检查memory、disk、network。

该例子逻辑如下:

首先调用firstElement,它就会取出第一个事件memory。那么这时候它就会判断内存缓存当中是否有数据缓存,如果有的话,也就是这个memory如果是不为空的话,那么它就可以直接读取啦,而不会进行下面的操作。

但是如果这个memory是空的话,就表示我们内存缓存当中没有数据。那么这时候它就会往下取,取到第二个,也就是disk,来判断磁盘缓存当中是否有数据缓存。如果这时候磁盘缓存不为空,这时候就会调用next事件,被我们的被观察者监听到,来依次进行响应。所以说它会依次进行判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!