rxjava2 producer-consumer example with backpressure implementation
This post is a small example how to implement a producer and consumer using rxjava2 . The producer produces a sequence of Integer objects which are consumed by the consumer. The producer only produces as much items as are requested by the reactive stream. The reader of this post should be familiar with the reactive streams concept .
project setup
I used a maven project, the only dependency needed for this example besides logging is rxjava:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
the producer
The producer is initialized with the integer values defining the range that is to be produced. It extends the Flowable<Integer>
class and therefore must implement the subscribeActual
method. In this method a new Subscription
object is created which keeps track of the current value to be produced and which emits the values to the subscriber on request. By implementing the Subscription
as an anonymous class it is easily possible to access the subscriber that the subscription belongs to. Care must be taken as the call to the subscribers onNext()
method can result in a new recursive call to request()
by the subscriber.
/**
* class to produce Integer values using a Flowable.
*/
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
the consumer
The consumer extends DefaultSubscriber<Integer>. On start, it requests one item, and after processing the next item, it requests the next one. To build up a little backpressure on the producer, there’s a little delay every 5 items.
/**
* TestConsumer derived from DefaultSubscriber. requests one item at a time.
*/
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
putting it together
In the main method of the test program, a producer and a consumer are created. Then the producer is set to run on the Schedulers.computation()
threadpool and emit its value on the Schedulers.single()
thread to make producing and consuming parallel. The consumer does a blocking subscribe to keep the main thread running until all items are processed.
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
Basically that is all that is needed for a simple reactive producer-consumer implementation.