Rxjava là gì

RxJava, RxAndroid cơ phiên bản.

Bạn đang xem: Rxjava là gì

I. Reactive sầu Programming là gì?

Reactive Programing cơ mà một phương pháp xây dựng triệu tập vào những luồng dữ liệu ko nhất quán cùng quan liêu gần kề sự biến đổi của những luồng dữ liệu không đồng hóa đó, khi gồm sự biến hóa sẽ sở hữu hành vi cách xử lý tương xứng. Vì đó là luồng tài liệu ko đồng điệu bắt buộc các module code cùng lúc chạy trên các thread không giống nhau từ kia rút ngắn thời gian xúc tiến nhưng mà ko làm blochồng main thread.

II. RxJava

RxJava cơ phiên bản là 1 trong những thỏng viện hỗ trợ các sự kiện không nhất quán được phát triển theo Observer Pattern. quý khách hàng hoàn toàn có thể tạo nên luồng tài liệu không đồng hóa trên bất kỳ thread nào, biến đổi tài liệu cùng áp dụng tài liệu bằng Observer. Thỏng viện RxJava cung ứng những nhiều loại Operator tuyệt vời nhất như maps, combine, merge , filter với những lắp thêm khác có thể được áp dụng mang lại luồng dữ liệu.

III. RxAndroid

RxAndroid được quan trọng đặc biệt thực hiện mang đến gốc rễ Android được cải tiến và phát triển dựa trên RxJava. Đặc biệt Schedulers được bổ sung cập nhật cho RxAndroid nhằm hỗ trợ mang lại đa luồng trong ứng dụng Android. Schedulers sẽ giúp các bạn phân chia luồng chạy mang lại từng module code làm thế nào để cho cân xứng. Một vài ba luồng chạy thông dụng được áp dụng qua Schedulers.

AndroidSchedulers.mainThread () Cung cấp quyền truy vấn vào Android Main Thread / UI Thread.Schedulers.newThread () Thread bắt đầu sẽ được tạo ra mọi khi một nhiệm vụ được chế tạo.

IV. Những yếu tố đặc biệt trong RxJava:

Về cơ bản RxJava bao gồm hai thành phần chính: Observable cùng Observer. Thêm vào đó, có những trang bị khác như Schedulers, Operators và Subscription là các thành phần nhập vai trò nhỏng nhiều luồng, thao tác làm việc tài liệu, với kết nối. Chúng ta đã cùng có tác dụng quen cùng với từng thành phần: Observable: Là luồng dữ liệu triển khai một trong những công việc cùng phát ra dữ liệu.Observer : Là thành phần đi kèm theo luôn luôn phải có của Observable. Nó nhận tài liệu được vạc ra vị Observable. Subcription: Là mối links thân Observable với ObVPS. cũng có thể có rất nhiều Obhệ thống ĐK một Observable độc nhất. Operator: Hỗ trợ mang đến bài toán sửa đổi tài liệu được phạt ra bởi vì Observable trước khi obhệ thống thừa nhận chúng. Schedulers: Scheduler quyết định thread mà lại Observable đang vạc ra tài liệu cùng bên trên thread như thế nào Obhệ thống sẽ nhận dữ liệu.

1. Cách tạo ra Observable

Chúng ta gồm 5 nhiều loại Observable đi kèm theo là 5 nhiều loại Obhệ thống khớp ứng. Mỗi nhiều loại Observable được áp dụng trong số trường phù hợp khác nhau dựa vào số lượng và một số loại phần tử được Observable phạt ra.

*
Đầu tiên bọn họ sẽ điểm sang 1 vài phương thức phổ cập để tạo thành Observable:

just:Available: Flowable, Observable, Maybe, Single Tạo một Observable phạt ra một nhà cửa ví dụ.defer:Available: Flowable, Observable, Maybe, Single, Completable ko tạo thành Observable cho tới lúc bao gồm ObVPS ĐK, với tạo nên một Observable new mọi khi tất cả ObVPS mới đăng ký.from:Available: Flowable, Observable Chuyển thay đổi các đối tượng người dùng và dạng hình dữ liệu không giống thành Observablesinterval:Available: Flowable, Observable. Định kỳ tạo ra một số vô hạn (Long), ngày càng tăng.

Xem thêm: Nghĩa Của Từ Dark Horse Là Gì ? Đố Bạn “A Dark Horse” Là Gì

fromCallable:Available: Flowable, Observable, Maybe, Single, Completable Khi tất cả observer đăng ký, Callable đang mang lại được Hotline cùng quý giá trả về của nó (hoặc nỉm ngoại lệ) được gửi kế tiếp Obhệ thống.

2. Cách sinh sản Observer

Đối với từng các loại ObVPS khác biệt bọn họ gồm biện pháp chế tạo cùng tiến hành khác biệt tuy thế hồ hết tương đối dễ dàng và đơn giản. Đây là ví dụ điển hình nhất để tạo ra Observer:

private ObVPS getAnimalsObserver() return new Observer()
Override public void onComplete() Log.d(TAG, "All items are emitted!"); ; onSubscribe(): Phương thức sẽ tiến hành Hotline lúc 1 ObVPS ĐK vào Observable. onNext(): Pmùi hương thức này sẽ được Hotline khi Observable bước đầu phát ra dữ liệu. onError(): Trong trường hòa hợp tất cả lỗi, cách thức onError() sẽ được Gọi.onComplete(): Lúc một Observable ngừng câu hỏi phát ra tài liệu, onComplete() sẽ tiến hành hotline.

3. Tạo Obhệ thống theo dõi Observable

Đây là các phương thức cơ bản để khiến cho ObVPS đăng ký quan sát và theo dõi Observable.

animalsObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(animalsObserver);subscribeOn(Schedulers.io ()): Báo mang lại Observable chạy trách nhiệm trên một chuỗi nền.observOn(AndroidSchedulers.mainThread ()): Yêu cầu Observer thừa nhận tài liệu bên trên luồng thiết yếu nhằm bạn cũng có thể tiến hành các hành vi tương quan đến giao diện.

4. Disposable

Disposable được áp dụng để diệt sự kết nối của Subserver với Subsevable Lúc không còn quan trọng bài toán này vô cùng hữu dụng để tránh vấn đề rò rỉ bộ lưu trữ. khi ObVPS liên kết được với Observable vào onSubcribe() ta sẽ nhận thấy Disposable. Để hủy sự kết nối trong onDestroy() của Activity các bạn nên gọi hàm dispose() của Disposable.

5. Operator

RxJava cung cấp tập thích hợp Khủng các operator cung ứng mang đến việc thao tác cùng với dữ liệu vậy cần operators được phân chia dựa trên các loại các bước chúng làm. Ví dụ như team sinh sản Observable: create, just, fromArray,... Nhóm thanh lọc dữ liệu: filter, skip, last, take, ... Nhóm tạo thành Observable từ dữ iệu của Observable không giống như: buffer, bản đồ, flatmap,...Lưu ý Lúc sử dụng các Operator thì công dụng của Operator trước đang truyền đến Operator sau.quý khách hoàn toàn có thể bài viết liên quan trên đây

V. Ví dụ:

Sau đây là ví dụ cụ thể đến từng nhiều loại Observable được kể phía trên:Trong các ví dụ mình sử dung Custom object Note:

public class Note int id; String note; // getters an setters

1. Observable & Observer:

Được áp dụng những độc nhất trong những toàn bộ. Observable hoàn toàn có thể phát ra ko hoặc các phần tử.

public class ObserverActivity extends AppCompatActivity private static final String TAG = ObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple Observable emitting multiple Notes * - * Observable : ObVPS */
Override protected void onCreate(Bundle savedInstanceState) super.onCreate(savedInstanceState); setContentView(R.layout.activity_observer); Observable notesObservable = getNotesObservable(); Obhệ thống notesObVPS = getNotesObserver(); notesObservable.observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribeWith(notesObserver); private Observer getNotesObserver() return new Observer()
Override public void onComplete() Log.d(TAG, "onComplete"); ; private Observable getNotesObservable() final List notes = prepareNotes(); return Observable.create(new ObservableOnSubscribe()
Override public void subscribe(ObservableEmitter emitter) throws Exception for (Note note : notes) if (!emitter.isDisposed()) emitter.onNext(note); // all notes are emitted if (!emitter.isDisposed()) emitter.onComplete(); ); private List prepareNotes() List notes = new ArrayList(); notes.add(new Note(1, "Buy tooth paste!")); notes.add(new Note(2, "Hotline brother!")); notes.add(new Note(3, "Watch Narcos tonight!")); notes.add(new Note(4, "Pay power bill!")); return notes;
Override protected void onDestroy() super.onDestroy(); disposable.dispose(); Output:

onSubscribeonNext: Buy tooth paste!onNext: điện thoại tư vấn brother!onNext: Watch Narcos tonight!onNext: Pay power bill!onComplete

2. Single & SingleObsever

Single luôn luôn phạt ra chỉ một cực hiếm hoặc một lỗi. Observable có thể thực hiện được quá trình này tuy thế Single luôn luôn đảm bảo rằng luôn luôn luôn luôn có một trong những phần tử được trả về. Chính vị chỉ tất cả 1 phần tử yêu cầu SingleObVPS không tồn tại onNext() cơ mà chỉ có onSuccess() nhằm dấn tài liệu trả về.

public class SingleObserverActivity extends AppCompatActivity private static final String TAG = SingleObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Single Observable emitting single lưu ý * Single Observable is more useful in making network calls * where you expect a single response object to be emitted * - * Single : SingleObhệ thống */ // TODO - link to lớn Retrofit tutorial
Override protected void onCreate(Bundle savedInstanceState) super.onCreate(savedInstanceState); setContentView(R.layout.activity_single_observer); Single noteObservable = getNoteObservable(); SingleObhệ thống singleObVPS = getSingleObserver(); noteObservable .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(singleObserver); private SingleObhệ thống getSingleObserver() return new SingleObserver()
Override public void onError(Throwable e) Log.d(TAG, "onError: " + e.getMessage()); ; private Single getNoteObservable() return Single.create(new SingleOnSubscribe()
Override public void subscribe(SingleEmitter emitter) throws Exception cảnh báo note = new Note(1, "Buy milk!"); emitter.onSuccess(note); );
Override protected void onDestroy() super.onDestroy(); disposable.dispose(); Output

onSubscribeonSuccess: Buy milk!

3. Maybe và MaybeObserver

Maybe có thể hoặc không phát ra một quý giá. Maybe được áp dụng khi bạn đã mong chờ một trong những phần tử được vạc ra tùy thuộc vào từng ngôi trường hòa hợp xảy ra. Nhỏng khi chúng ta query note by Id vào database nó có thể gồm hoặc cũng hoàn toàn có thể ko.

public class MaybeObserverActivity extends AppCompatActivity private static final String TAG = MaybeObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Consider an example getting a note from db using ID * There is possibility of not finding the note by ID in the db * In this situation, MayBe can be used * - * Maybe : MaybeObserver */
Override protected void onCreate(Bundle savedInstanceState) super.onCreate(savedInstanceState); setContentView(R.layout.activity_maybe_observer); Maybe noteObservable = getNoteObservable(); MaybeObVPS noteObhệ thống = getNoteObserver(); noteObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(noteObserver); private MaybeObhệ thống getNoteObserver() return new MaybeObserver()
Override public void onComplete() Log.e(TAG, "onComplete"); ; /** * Emits optional data (0 or 1 emission) * But for now it emits 1 Note always */ private Maybe getNoteObservable() return Maybe.create(new MaybeOnSubscribe()
Override public void subscribe(MaybeEmitter emitter) throws Exception Note note = new Note(1, "Hotline brother!"); if (!emitter.isDisposed()) emitter.onSuccess(note); );
Override protected void onDestroy() super.onDestroy(); disposable.dispose();

4.Completable & CompletableObserver

Completable ko phân phát ra ngẫu nhiên tài liệu nào cầm vào kia nó thông báo tinh thần của tác vụ thành công xuất xắc không thắng cuộc. Được áp dụng Lúc bạn muốn thực hiện một số trọng trách và không mong muốn hóng bất kỳ quý giá làm sao trả về. Một ngôi trường phù hợp Completable thường xuyên được sử dụng là cập nhật một trong những dữ liệu bên trên sever bằng phương pháp triển khai đề nghị PUT.

public class CompletableObserverActivity extends AppCompatActivity { private static final String TAG = CompletableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Completable won"t emit any cống phẩm, instead it returns * Success or failure state * Consider an example of making a PUT request to VPS to update * something where you are not expecting any response but the * success status * - * Completable : CompletableObserver */ // TODO - links to Retrofit tutorial
Override protected void onCreate(Bundle savedInstanceState) super.onCreate(savedInstanceState); setContentView(R.layout.activity_completable_observer); Note note = new Note(1, "trang chủ work!"); Completable completableObservable = updateNote(note); CompletableObserver completableObVPS = completableObserver(); completableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(completableObserver); /** * Assume this making PUT request to VPS to update the Note */ private Completable updateNote(cảnh báo note) return Completable.create(new CompletableOnSubscribe()
Override public void subscribe(CompletableEmitter emitter) throws Exception if (!emitter.isDisposed()) Thread.sleep(1000); emitter.onComplete(); ); private CompletableObVPS completableObserver() { return new CompletableObserver()
Override protected void onDestroy() super.onDestroy(); disposable.dispose(); Output

onSubscribeonComplete: lưu ý updated successfully!

5. Flowable và Observer

Được áp dụng lúc 1 Observable tạo thành số lượng Khủng các sự khiếu nại / dữ liệu mà ObVPS hoàn toàn có thể cách xử trí. Flowable hoàn toàn có thể được áp dụng khi nguồn tạo thành 10k+ sự kiện và OnVPS cần yếu tiêu trúc toàn bộ.Flowable sử dụng phương pháp Backpressure để xử lý dữ liệu tránh lỗi MissingBackpressureException cùng OutOfMemoryError.

public class FlowableObserverActivity extends AppCompatActivity private static final String TAG = FlowableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple example of Flowable just to lớn show the syntax * the use of Flowable is best explained when used with BackPressure * Read the below links khổng lồ know the best use cases khổng lồ use Flowable operator * https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#when-to-use-flowable * - * Flowable : SingleObVPS */
Override protected void onCreate(Bundle savedInstanceState) super.onCreate(savedInstanceState); setContentView(R.layout.activity_flowable_observer); Flowable flowableObservable = getFlowableObservable(); SingleObVPS obhệ thống = getFlowableObserver(); flowableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .reduce(0, new BiFunction()
Override public Integer apply(Integer result, Integer number) //Log.e(TAG, "Result: " + result + ", new number: " + number); return result + number; ) .subscribe(observer); private SingleObVPS getFlowableObserver() return new SingleObserver()
Override public void onError(Throwable e) Log.e(TAG, "onError: " + e.getMessage()); ; private Flowable getFlowableObservable() return Flowable.range(1, 100);
Override protected void onDestroy() super.onDestroy(); disposable.dispose(); Output