Pertanyaan Menciptakan Observable tanpa menggunakan Observable.create


Saya menggunakan RxJava di aplikasi Android saya dan saya ingin memuat data dari database.

Dengan cara ini, saya menciptakan sebuah Observable baru menggunakan Observable.create() yang mengembalikan daftar EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

Meskipun itu bekerja dengan benar, saya membaca menggunakan itu Observable.create() sebenarnya bukan praktik terbaik untuk Rx Java (lihat sini).

Jadi saya mengubah metode ini dengan cara ini.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

Apakah ini pendekatan yang lebih baik menggunakan Rx Java? Mengapa? Apa sebenarnya perbedaan antara dua metode ini?

Selain itu, karena database memuat daftar elemen, masuk akal untuk mengeluarkan seluruh daftar sekaligus? Atau haruskah saya mengeluarkan satu barang sekaligus?


32
2017-12-10 14:45


asal


Jawaban:


Kedua metode tersebut mungkin terlihat mirip dan berperilaku serupa tetapi fromCallable berurusan dengan kesulitan tekanan balik untuk Anda sedangkan create versi tidak. Berurusan dengan tekanan balik di dalam OnSubscribe Implementasinya berkisar dari peleburan pikiran yang sederhana sampai terang-terangan; namun, jika dihilangkan, Anda mungkin mendapatkannya MissingBackpressureExceptions sepanjang batas asynchronous (seperti observeOn) atau bahkan pada batas-batas kelanjutan (seperti concat).

RxJava mencoba menawarkan dukungan backpressure yang tepat untuk sebanyak mungkin pabrik dan operator, namun ada beberapa pabrik dan operator yang tidak dapat mendukungnya.

Masalah kedua dengan manual OnSubscribe implementasi adalah kurangnya dukungan pembatalan, terutama jika Anda menghasilkan banyak onNext panggilan. Banyak dari ini dapat diganti dengan metode pabrik standar (seperti from) atau kelas pembantu (seperti SyncOnSubscribe) yang berhubungan dengan semua kerumitan untuk Anda.

Anda mungkin menemukan banyak perkenalan dan contoh yang (masih) digunakan create karena dua alasan.

  1. Jauh lebih mudah untuk memperkenalkan datastream berbasis push dengan menunjukkan bagaimana push of event bekerja dengan cara yang imperatif. Menurut saya, sumber-sumber seperti itu menghabiskan terlalu banyak waktu bersama create secara proporsional alih-alih berbicara tentang metode pabrik standar dan menunjukkan bagaimana tugas umum tertentu (seperti tugas Anda) dapat dicapai dengan aman.
  2. Banyak dari contoh-contoh ini diciptakan saat RxJava tidak memerlukan dukungan backpressure atau bahkan dukungan pembatalan sinkronisasi yang tepat atau hanya porting dari contoh-contoh Rx.NET (yang sampai saat ini tidak mendukung backpressure dan pembatalan sinkron bekerja entah bagaimana, milik C # I tebak.) Menghasilkan nilai dengan menelepon padaNext adalah bebas khawatir saat itu. Namun, penggunaan seperti itu menyebabkan buffer bloat dan penggunaan memori yang berlebihan, oleh karena itu, tim Netflix datang dengan cara membatasi penggunaan memori dengan meminta pengamat untuk menyatakan berapa banyak item yang mereka bersedia untuk melanjutkan. Ini dikenal sebagai backpressure.

Untuk pertanyaan kedua, yaitu jika seseorang harus membuat Daftar atau urutan nilai, itu tergantung pada sumber Anda. Jika sumber Anda mendukung beberapa jenis iterasi atau streaming elemen data individual (seperti JDBC), Anda dapat menghubungkannya dan memancarkan satu per satu (lihat SyncOnSubscribe). Jika tidak mendukungnya atau Anda memerlukannya dalam formulir Daftar, maka simpanlah apa adanya. Anda selalu dapat mengkonversi antara dua formulir melalui toListdan flatMapIterable jika diperlukan.


34
2017-12-10 16:03



Seperti yang dijelaskan di responnya Anda terhubung, dengan Observable.create Anda mungkin harus melanggar persyaratan lanjutan RxJava.

Misalnya, Anda harus menerapkan Tekanan balik, atau cara berhenti berlangganan.

Dalam kasus Anda, Anda ingin memancarkan item, tanpa harus berurusan dengan backpressure atau berlangganan. Begitu Observable.fromCallable adalah panggilan yang bagus. RxJava akan menangani sisanya.


2
2017-12-10 15:34