rxjava2 producer-consumer example with backpressure implementation

This post is a small example how to implement a producer and consumer using rxjava2. The producer produces a sequence of Integer objects which are consumed by the consumer. The producer only produces as much items as are requested by the reactive stream. The reader of this post should be familiar with the reactive streams concept.

project setup

I used a maven project, the only dependency needed for this example besides logging is rxjava:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.8</version>
</dependency>

the producer

The producer is initialized with the integer values defining the range that is to be produced. It extends the Flowable<Integer> class and therefore must implement the subscribeActual method. In this method a new Subscription object is created which keeps track of the current value to be produced and which emits the values to the subscriber on request. By implementing the Subscription as an anonymous class it is easily possible to access the subscriber that the subscription belongs to. Care must be taken as the call to the subscribers onNext() method can result in a new recursive call to request() by the subscriber.

/**
 * class to produce Integer values using a Flowable.
 */
class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

the consumer

The consumer extends DefaultSubscriber<Integer>. On start, it requests one item, and after processing the next item, it requests the next one. To build up a little backpressure on the producer, there’s a little delay every 5 items.

/**
 * TestConsumer derived from DefaultSubscriber. requests one item at a time.
 */
class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

 putting it together

In the main method of the test program, a producer and a consumer are created. Then the producer is set to run on the Schedulers.computation() threadpool and emit its value on the Schedulers.single() thread to make producing and consuming parallel. The consumer does a blocking subscribe to keep the main thread running until all items are processed.

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

Basically that is all that is needed for a simple reactive producer-consumer implementation.

Leave a Reply

Your email address will not be published. Required fields are marked *