Introduction to RxJava 2 Reactive Programming in Android

Published by inkskull on

Introduction to RxJava 2 Reactive Programming in Android

 

In reactive programming, the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers.Reactive programming is a general programming term that is focused on reacting to changes, such as data values or events. A callback is an approach to reactive programming done imperatively.

Reactivex is a project which provides implementations for this concept for different programming languages.

Check out the Project In Github

RxJava is the Java implementation of this concept. RxJava is published under the Apache 2.0 license. RxJava provides Java API for asynchronous programming with observable streams.

Reactive Manifesto

The Reactive Manifesto is an online document that about high standard for applications within the software development industry. Simply put, reactive systems are:

  • Responsive – systems should respond in a timely manner
  • Message Driven – systems should use async message-passing between components to ensure loose coupling
  • Elastic – systems should stay responsive under high load
  • Resilient – systems should stay responsive when some components fail

 

Why doing asynchronous programming

Reactive programming provides a simple way of asynchronous programming. This allows to simplify the asynchronously processing of potential long running operations. It also provides a defined way of handling multiple events, errors and termination of the event stream. Reactive programming provides also a simplified way of running different tasks in different threads.

It is also possible to convert the stream before its received by the observers. And you can chain operations, e.g., if a API call depends on the call of another API Last but not least, reactive programming reduces the need for state variables, which can be the source of errors.

Adding RxJava 2 library  to a Java project in Android Studio

To use RxJava in a Gradle build, add the following as dependency.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.1'

For Maven, you can add RxJava via the following snippet.

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.4</version>
</dependency>

Building blocks for RxJava

The build blocks for RxJava code are the following:

  • observables representing sources of data
  • subscribers (or observers) listening to the observables
  • Operator Items emitted by an Observable can be transformed, modified, and filtered through Operators before notifying the subscribed Observer object(s).

Observable

Observables are the sources for the data. Usually they start providing data once a subscriber starts listening. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error. Sources may never terminate, for example, an observable for a button click can potentially produce an infinite stream of events.

An Observable is the stream abstraction in RxJava. It is similar to an Iterator in that, given a sequence, it iterates through and produces those items in an orderly fashion. A consumer can then consume those items through the same interface, regardless of the underlying sequence.

Say we wanted to emit the numbers 1, 2, 3, in that order. To do so, we can use the Observable<T>#create(OnSubscribe<T>) method.

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
   @Override public void call(Subscriber<? super Integer> subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});

Invoking subscriber.onNext(Integer) emits an item in the stream and, when the stream is finished emitting, subscriber.onCompleted() is then invoked.

This approach to creating an Observable is fairly verbose. For this reason, there are convenience methods for creating Observable instances which should be preferred in almost all cases.

The simplest way to create an Observable is using Observable#just(...). As the method name suggests, it just emits the item(s) that you pass into it as method arguments.

Observable.just(1, 2, 3); // 1, 2, 3 will be emitted, respectively

Observer or Subscriber

The next component to the Observable stream is the Observer (or Observers) subscribed to it. Observers are notified whenever something “interesting” happens in the stream. Observers are notified via the following events:

  • Observer#onNext(T) – invoked when an item is emitted from the stream
  • Observable#onError(Throwable) – invoked when an error has occurred within the stream
  • Observable#onCompleted() – invoked when the stream is finished emitting items.

To subscribe to a stream, simply call Observable<T>#subscribe(...) and pass in an Observer instance.

Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       Log.d("Test", "In onCompleted()");
   }

   @Override public void onError(Throwable e) {
       Log.d("Test", "In onError()");
   }

   @Override public void onNext(Integer integer) {
       Log.d("Test", "In onNext():" + integer);
   }
});

The above code will emit the following in Logcat:

In onNext(): 1
In onNext(): 2
In onNext(): 3
In onNext(): 4
In onCompleted()

There may also be some instances where we are no longer interested in the emissions of an Observable. This is particularly relevant in Android when, for example, an Activity/Fragment needs to be reclaimed in memory.

To stop observing items, we simply need to call Subscription#unsubscribe() on the returned Subscription object.

Subscription subscription = someInfiniteObservable.subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       // ...
   }

   @Override public void onError(Throwable e) {
       // ...
   }

   @Override public void onNext(Integer integer) {
       // ...
   }
});

// Call unsubscribe when appropriate
subscription.unsubscribe();

As seen in the code snippet above, upon subscribing to an Observable, we hold the reference to the returned Subscription object and later invoke subscription#unsubscribe() when necessary. In Android, this is best invoked within Activity#onDestroy() or Fragment#onDestroy().

Operator

Items emitted by an Observable can be transformed, modified, and filtered through Operators before notifying the subscribed Observer object(s). Some of the most common operations found in functional programming (such as map, filter, reduce, etc.) can also be applied to an Observable stream. Let’s look at map as an example:

Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() {
   @Override public Integer call(Integer integer) {
       return integer * 3;
   }
}).subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       // ...
   }

   @Override public void onError(Throwable e) {
       // ...
   }

   @Override public void onNext(Integer integer) {
       // ...
   }
});

The code snippet above would take each emission from the Observable and multiply each by 3, producing the stream 3, 6, 9, 12, 15, respectively. Applying an Operator typically returns another Observable as a result, which is convenient as this allows us to chain multiple operations to obtain a desired result.

Given the stream above, say we wanted to only receive even numbers. This can be achieved by chaining a filter operation.

Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() {
   @Override public Integer call(Integer integer) {
       return integer * 3;
   }
}).filter(new Func1<Integer, Boolean>() {
   @Override public Boolean call(Integer integer) {
       return integer % 2 == 0;
   }
}).subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       // ...
   }

   @Override public void onError(Throwable e) {
       // ...
   }

   @Override public void onNext(Integer integer) {
       // ...
   }
});

There are many operators built-in the RxJava toolset that modify the Observable stream; if you can think of a way to modify the stream, chances are, there’s an Operator for it.

Thanks Happy Coding 😀

 

 

 

 

 

 

 

 

Categories: RxJava2

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Social Media Auto Publish Powered By : XYZScripts.com