How to create custom operator in RxJava?

 

Adding RxJava 2 library  to a Java project in Android Studio

To use RxJava in a Gradle build, add the following as dependency.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.1'

For Maven, you can add RxJava via the following snippet.


    io.reactivex.rxjava2
    rxjava
    2.0.4

What are Operators in Rx?

Operators are methods created for solving transformations and handling API calls problems. I will show you one simple example for transformation with Map, and maybe in some further articles examples of handling API calls with Retrofit, RxJava on MVP architectural pattern.

Implementing Your Own Operators

You can implement your own Observable operators. This page shows you how.

If your operator is designed to originate an Observable, rather than to transform or react to a source Observable, use the create( ) method rather than trying to implement Observablemanually. Otherwise, you can create a custom operator by following the instructions on this page.

If your operator is designed to act on the individual items emitted by a source Observable, follow the instructions under Sequence Operators below. If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) follow the instructions under Transformational Operators below.

Sequence Operators

The following example shows how you can use the lift( ) operator to chain your custom operator (in this example: myOperator) alongside standard RxJava operators like ofType and map:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new myOperator()).map({"transformed by myOperator: " + it});

The following section shows how you form the scaffolding of your operator so that it will work correctly with lift( ).

Implementing Your Operator

Define your operator as a public class that implements the Operator interface, like so:

public class myOperator implements Operator<T> {
  public myOperator( /* any necessary params here */ ) {
    /* any necessary initialization here */
  }

  @Override
  public SubscriberT> call(final SubscriberT> s) {
    return new Subscriber(s) {
      @Override
      public void onCompleted() {
        /* add your own onCompleted behavior here, or just pass the completed notification through: */
        if(!s.isUnsubscribed()) {
          s.onCompleted();
        }
      }

      @Override
      public void onError(Throwable t) {
        /* add your own onError behavior here, or just pass the error notification through: */
        if(!s.isUnsubscribed()) {
          s.onError(t);
        }
      }

      @Override
      public void onNext(T item) {
        /* this example performs some sort of operation on each incoming item and emits the results */
        if(!s.isUnsubscribed()) {
          transformedItem = myOperatorTransformOperation(item);
          s.onNext(transformedItem);
        }
      }
    };
  }
}

Transformational Operators

The following example shows how you can use the compose( ) operator to chain your custom operator (in this example, an operator called myTransformer that transforms an Observable that emits Integers into one that emits Strings) alongside standard RxJava operators like ofType and map:

fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new myTransformer<Integer,String>()).map({"transformed by myOperator: " + it});

The following section shows how you form the scaffolding of your operator so that it will work correctly with compose( ).

Implementing Your Transformer

Define your transforming function as a public class that implements the Transformer interface, like so:

public class myTransformer implements Transformer<Integer,String> {
  public myTransformer( /* any necessary params here */ ) {
    /* any necessary initialization here */
  }

  @Override
  public Observable<String> call(Observable<Integer> source) {
    /* 
     * this simple example Transformer applies map() to the source Observable
     * in order to transform the "source" observable from one that emits
     * integers to one that emits string representations of those integers.
     */
    return source.map( new Func1<Integer,String>() {
      @Override
      public String call(Integer t1) {
        return String.valueOf(t1);
      }
    } );
  }
}

 

 

 

Categories: RxJava2

Leave a Reply

Your email address will not be published. Required fields are marked *

Social Media Auto Publish Powered By : XYZScripts.com