RxJava, RxAndroid cơ bản.

Bạn đang xem: Rxjava là gì

I. Reactive Programming là gì?

Reactive Programing mà một phương pháp lập trình tập trung vào các luồng dữ liệu không đồng bộ và quan sát sự thay đổi của các luồng dữ liệu không đồng bộ đó, khi có sự thay đổi sẽ có hành động xử lý phù hợp. Vì đây là luồng dữ liệu không đồng bộ nên các module code cùng lúc chạy trên các thread khác nhau từ đó rút ngắn thời gian thực thi mà không làm block main thread.

II. RxJava

RxJava cơ bản là một thư viện cung cấp các sự kiện không đồng bộ được phát triển theo Observer Pattern. Bạn có thể tạo luồng dữ liệu không đồng bộ trên bất kỳ thread nào, thay đổi dữ liệu và sử dụng dữ liệu bằng Observer. Thư viện RxJava cung cấp nhiều loại Operator tuyệt vời như map, combine, merge , filter và nhiều thứ khác có thể được áp dụng cho luồng dữ liệu.

III. RxAndroid

RxAndroid được đặc biệt sử dụng cho nền tảng Android được phát triển dựa trên RxJava. Đặc biệt Schedulers được bổ sung cho RxAndroid nhằm hỗ trợ cho đa luồng trong ứng dụng Android. Schedulers sẽ giúp bạn phân chia luồng chạy cho từng module code sao cho phù hợp. Một vài luồng chạy phổ biến được sử dụng qua Schedulers.

AndroidSchedulers.mainThread () Cung cấp quyền truy cập vào Android Main Thread / UI Thread.Schedulers.newThread () Thread mới sẽ được tạo ra mỗi khi một nhiệm vụ được tạo.

IV. Những thành phần quan trọng trong RxJava:

Về cơ bản RxJava có hai thành phần chính: Observable và Observer. Thêm vào đó, có những thứ khác như Schedulers, Operators và Subscription là các thành phần đóng vai trò như đa luồng, thao tác dữ liệu, và kết nối. Chúng ta sẽ cùng làm quen với từng thành phần: Observable: Là luồng dữ liệu thực hiện một số công việc và phát ra dữ liệu.Observer : Là thành phần đi kèm không thể thiếu của Observable. Nó nhận dữ liệu được phát ra bởi Observable. Subcription: Là mối liên kết giữa Observable và Observer. Có thể có nhiều Observer đăng ký một Observable duy nhất. Operator: Hỗ trợ cho việc sửa đổi dữ liệu được phát ra bởi Observable trước khi observer nhận chúng. Schedulers: Scheduler quyết định thread mà Observable sẽ phát ra dữ liệu và trên thread nào Observer sẽ nhận dữ liệu.

1. Cách tạo Observable

Chúng ta có 5 loại Observable đi kèm là 5 loại Observer tương ứng. Mỗi loại Observable được sử dụng trong các trường hợp khác nhau dựa vào số lượng và loại phần tử được Observable phát ra.

*

Đầu tiên chúng ta sẽ điểm qua một vài phương pháp phổ biến để tạo ra Observable:just:Available: Flowable, Observable, Maybe, Single Tạo một Observable phát ra một item cụ thể.defer:Available: Flowable, Observable, Maybe, Single, Completable không tạo ra Observable cho đến khi có Observer đăng ký, và tạo một Observable mới mỗi khi có Observer mới đăng ký.from:Available: Flowable, Observable Chuyển đổi các đối tượng và kiểu dữ liệu khác thành Observablesinterval:Available: Flowable, Observable. Định kỳ tạo ra một số vô hạn (Long), ngày càng tăng.

Xem thêm: Bảo Vệ Môi Trường Là Gì, Bảo Vệ Môi Trường

fromCallable:Available: Flowable, Observable, Maybe, Single, Completable Khi có observer đăng ký, Callable đã cho được gọi và giá trị trả về của nó (hoặc ném ngoại lệ) được chuyển tiếp đến Observer.

2. Cách tạo Observer

Đối với mỗi loại Observer khác nhau chúng ta có cách tạo và thực thi khác nhau nhưng đều khá đơn giản. Đây là ví dụ điển hình nhất để tạo ra Observer:

private Observer getAnimalsObserver() { return new Observer() {
Override public void onComplete() { Log.d(TAG, “All items are emitted!”); } }; }onSubscribe(): Phương thức sẽ được gọi khi một Observer đăng ký vào Observable. onNext(): Phương thức này sẽ được gọi khi Observable bắt đầu phát ra dữ liệu. onError(): Trong trường hợp có lỗi, phương thức onError() sẽ được gọi.onComplete(): Khi một Observable hoàn thành việc phát ra dữ liệu, onComplete() sẽ được gọi.

3. Tạo Observer theo dõi Observable

Đây là các phương thức cơ bản để khiến cho Observer đăng ký theo dõi Observable.

animalsObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(animalsObserver);subscribeOn(Schedulers.io ()): Báo cho Observable chạy nhiệm vụ trên một chuỗi nền.observOn(AndroidSchedulers.mainThread ()): Yêu cầu Observer nhận dữ liệu trên luồng chính để bạn có thể thực hiện các hành động liên quan đến giao diện.

4. Disposable

Disposable được sử dụng để hủy sự kết nối của Subserver với Subsevable khi không còn cần thiết việc này rất hữu dụng để tránh việc rò rỉ bộ nhớ. Khi Observer kết nối được với Observable trong onSubcribe() ta sẽ nhận được Disposable. Để hủy sự kết nối trong onDestroy() của Activity bạn nên gọi hàm dispose() của Disposable.

5. Operator

RxJava cung cấp tập hợp lớn các operator hỗ trợ cho việc thao tác với dữ liệu vậy nên operators được phân chia dựa trên loại công việc chúng làm. Ví dụ như nhóm tạo Observable: create, just, fromArray,… Nhóm lọc dữ liệu: filter, skip, last, take, … Nhóm tạo Observable từ dữ iệu của Observable khác như: buffer, map, flatmap,…Lưu ý khi sử dụng nhiều Operator thì kết quả của Operator trước sẽ truyền cho Operator sau.Bạn có thể tìm hiểu thêm tại đây

V. Ví dụ:

Sau đây là ví dụ cụ thể cho từng loại Observable được đề cập phía trên:Trong các ví dụ mình sử dung Custom object Note:

public class Note { int id; String note; // getters an setters}

1. Observable & Observer:

Được sử dụng nhiều nhất trong số tất cả. Observable có thể phát ra không hoặc nhiều phần tử.

public class ObserverActivity extends AppCompatActivity { private static final String TAG = ObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple Observable emitting multiple Notes * – * Observable : Observer */
Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_observer); Observable notesObservable = getNotesObservable(); Observer notesObserver = getNotesObserver(); notesObservable.observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribeWith(notesObserver); } private Observer getNotesObserver() { return new Observer() {
Override public void onComplete() { Log.d(TAG, “onComplete”); } }; } private Observable getNotesObservable() { final List notes = prepareNotes(); return Observable.create(new ObservableOnSubscribe() {
Override public void subscribe(ObservableEmitter emitter) throws Exception { for (Note note : notes) { if (!emitter.isDisposed()) { emitter.onNext(note); } } // all notes are emitted if (!emitter.isDisposed()) { emitter.onComplete(); } } }); } private List prepareNotes() { List notes = new ArrayList(); notes.add(new Note(1, “Buy tooth paste!”)); notes.add(new Note(2, “Call brother!”)); notes.add(new Note(3, “Watch Narcos tonight!”)); notes.add(new Note(4, “Pay power bill!”)); return notes; }
Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output:

onSubscribeonNext: Buy tooth paste!onNext: Call brother!onNext: Watch Narcos tonight!onNext: Pay power bill!onComplete

2. Single & SingleObsever

Single luôn phát ra chỉ một giá trị hoặc một lỗi. Observable có thể thực hiện được công việc này nhưng Single luôn luôn đảm bảo rằng luôn luôn có 1 phần tử được trả về. Chính vì chỉ có 1 phần tử nên SingleObserver không có onNext() mà chỉ có onSuccess() để nhận dữ liệu trả về.

public class SingleObserverActivity extends AppCompatActivity { private static final String TAG = SingleObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Single Observable emitting single Note * Single Observable is more useful in making network calls * where you expect a single response object to be emitted * – * Single : SingleObserver */ // TODO – link to Retrofit tutorial
Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_single_observer); Single noteObservable = getNoteObservable(); SingleObserver singleObserver = getSingleObserver(); noteObservable .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(singleObserver); } private SingleObserver getSingleObserver() { return new SingleObserver() {
Override public void onError(Throwable e) { Log.d(TAG, “onError: ” + e.getMessage()); } }; } private Single getNoteObservable() { return Single.create(new SingleOnSubscribe() {
Override public void subscribe(SingleEmitter emitter) throws Exception { Note note = new Note(1, “Buy milk!”); emitter.onSuccess(note); } }); }
Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output

onSubscribeonSuccess: Buy milk!

3. Maybe & MaybeObserver

Maybe có thể hoặc không phát ra một giá trị. Maybe được sử dụng khi bạn đang mong đợi một phần tử được phát ra tùy theo từng trường hợp xảy ra. Như khi chúng ta query note by Id trong database nó có thể có hoặc cũng có thể không.

public class MaybeObserverActivity extends AppCompatActivity { private static final String TAG = MaybeObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Consider an example getting a note from db using ID * There is possibility of not finding the note by ID in the db * In this situation, MayBe can be used * – * Maybe : MaybeObserver */
Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_maybe_observer); Maybe noteObservable = getNoteObservable(); MaybeObserver noteObserver = getNoteObserver(); noteObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(noteObserver); } private MaybeObserver getNoteObserver() { return new MaybeObserver() {
Override public void onComplete() { Log.e(TAG, “onComplete”); } }; } /** * Emits optional data (0 or 1 emission) * But for now it emits 1 Note always */ private Maybe getNoteObservable() { return Maybe.create(new MaybeOnSubscribe() {
Override public void subscribe(MaybeEmitter emitter) throws Exception { Note note = new Note(1, “Call brother!”); if (!emitter.isDisposed()) { emitter.onSuccess(note); } } }); }
Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}

4.Completable & CompletableObserver

Completable không phát ra bất kỳ dữ liệu nào thay vào đó nó thông báo trạng thái của tác vụ thành công hay thất bại. Được sử dụng khi bạn muốn thực hiện một số nhiệm vụ và không mong đợi bất kỳ giá trị nào trả về. Một trường hợp Completable thường được sử dụng là cập nhật một số dữ liệu trên máy chủ bằng cách thực hiện yêu cầu PUT.

Xem thêm: vị trí tiếng anh là gì

public class CompletableObserverActivity extends AppCompatActivity { private static final String TAG = CompletableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Completable won”t emit any item, instead it returns * Success or failure state * Consider an example of making a PUT request to server to update * something where you are not expecting any response but the * success status * – * Completable : CompletableObserver */ // TODO – link to Retrofit tutorial
Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_completable_observer); Note note = new Note(1, “Home work!”); Completable completableObservable = updateNote(note); CompletableObserver completableObserver = completableObserver(); completableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(completableObserver); } /** * Assume this making PUT request to server to update the Note */ private Completable updateNote(Note note) { return Completable.create(new CompletableOnSubscribe() {
Override public void subscribe(CompletableEmitter emitter) throws Exception { if (!emitter.isDisposed()) { Thread.sleep(1000); emitter.onComplete(); } } }); } private CompletableObserver completableObserver() { return new CompletableObserver() {
Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output

onSubscribeonComplete: Note updated successfully!

5. Flowable & Observer

Được sử dụng khi một Observable tạo ra số lượng lớn các sự kiện / dữ liệu mà Observer có thể xử lý. Flowable có thể được sử dụng khi nguồn tạo ra 10k+ sự kiện và Onserver không thể tiêu thụ tất cả.Flowable sử dụng phương pháp Backpressure để xử lý dữ liệu tránh lỗi MissingBackpressureException và OutOfMemoryError.

public class FlowableObserverActivity extends AppCompatActivity { private static final String TAG = FlowableObserverActivity.class.getSimpleName(); private Disposable disposable; /** * Simple example of Flowable just to show the syntax * the use of Flowable is best explained when used with BackPressure * Read the below link to know the best use cases to use Flowable operator * https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#when-to-use-flowable * – * Flowable : SingleObserver */
Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_flowable_observer); Flowable flowableObservable = getFlowableObservable(); SingleObserver observer = getFlowableObserver(); flowableObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .reduce(0, new BiFunction() {
Override public Integer apply(Integer result, Integer number) { //Log.e(TAG, “Result: ” + result + “, new number: ” + number); return result + number; } }) .subscribe(observer); } private SingleObserver getFlowableObserver() { return new SingleObserver() {
Override public void onError(Throwable e) { Log.e(TAG, “onError: ” + e.getMessage()); } }; } private Flowable getFlowableObservable() { return Flowable.range(1, 100); }
Override protected void onDestroy() { super.onDestroy(); disposable.dispose(); }}Output

Chuyên mục: Hỏi Đáp