Monday, 13 August 2018

Java 9 Reactive Streams

Java 9 Reactive Streams allows us to implement non-blocking asynchronous stream processing. This is a major step towards applying reactive programming model to core java programming.

Java 9 Reactive Streams


Reactive Streams is about asynchronous processing of stream, so there should be a Publisher and a Subscriber. The Publisher publishes the stream of data and the Subscriber consumes the data.

Oracle Java 9, Oracle Java Certification, Oracle Java Tutorial and Material, Oracle Java Study Materials

Sometimes we have to transform the data between Publisher and Subscriber. Processor is the entity sitting between the end publisher and subscriber to transform the data received from publisher so that subscriber can understand it. We can have a chain of processors.

Oracle Java 9, Oracle Java Certification, Oracle Java Tutorial and Material, Oracle Java Study Materials

It’s very clear from the above image that Processor works both as Subscriber and a Publisher.

Java 9 Flow API


Java 9 Flow API implements the Reactive Streams Specification. Flow API is a combination of Iterator and Observer pattern. Iterator works on pull model where application pulls items from the source, whereas Observer works on push model and reacts when item is pushed from source to application.

Java 9 Flow API subscriber can request for N items while subscribing to the publisher. Then the items are pushed from publisher to subscriber until there are no more items left to push or some error occurs.

Oracle Java 9, Oracle Java Certification, Oracle Java Tutorial and Material, Oracle Java Study Materials

Java 9 Flow API Classes and Interfaces


Let’s have a quick look at Flow API classes and interfaces.
  • java.util.concurrent.Flow: This is the main class of Flow API. This class encapsulates all the important interfaces of the Flow API. This is a final class and we can’t extend it.
  • java.util.concurrent.Flow.Publisher: This is a functional interface and every publisher has to implement it’s subscribe method to add the given subscriber to receive messages.
  • java.util.concurrent.Flow.Subscriber: Every subscriber has to implement this interface. The methods in the subscriber are invoked in strict sequential order. There are four methods in this interface:
    • onSubscribe: This is the first method to get invoked when subscriber is subscribed to receive messages by publisher. Usually we invoke subscription.request to start receiving items from processor.
    • onNext: This method gets invoked when an item is received from publisher, this is where we implement our business logic to process the stream and then request for more data from publisher.
    • onError: This method is invoked when an irrecoverable error occurs, we can do cleanup taks in this method, such as closing database connection.
    • onComplete: This is like finally method and gets invoked when no other items are being produced by publisher and publisher is closed. We can use it to send notification of successful processing of stream.
  • java.util.concurrent.Flow.Subscription: This is used to create asynchronous non-blocking link between publisher and subscriber. Subscriber invokes its request method to demand items from publisher. It also has cancel method to cancel the subscription i.e. closing the link between publisher and subscriber.
  • java.util.concurrent.Flow.Processor: This interface extends both Publisher and Subscriber, this is used to transform the message between publisher and subscriber.
  • java.util.concurrent.SubmissionPublisher: A Publisher implementation that asynchronously issues submitted items to current subscribers until it is closed. It uses Executor framework We will use this class in reactive stream examples to add subscriber and then submit items to them.

Java 9 Reactive Stream Example


Let’s start with a simple example where we will implement Flow API Subscriber interface and use SubmissionPublisher to create publisher and send messages.

Stream Data


Let’s say we have an Employee class that will be used to create the stream message to be sent from publisher to subscriber.

package com.oraclejavacentral.reactive.beans;

public class Employee {

private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Employee(int i, String s) {
this.id = i;
this.name = s;
}
public Employee() {
}
@Override
public String toString() {
return "[id="+id+",name="+name+"]";
}
}

We also have a utility class to create a list of employees for our example.

package com.oraclejavacentral.reactive_streams;

import java.util.ArrayList;
import java.util.List;

import com.oraclejavacentral.reactive.beans.Employee;

public class EmpHelper {

public static List<Employee> getEmps() {

Employee e1 = new Employee(1, "Pankaj");
Employee e2 = new Employee(2, "David");
Employee e3 = new Employee(3, "Lisa");
Employee e4 = new Employee(4, "Ram");
Employee e5 = new Employee(5, "Anupam");
List<Employee> emps = new ArrayList<>();
emps.add(e1);
emps.add(e2);
emps.add(e3);
emps.add(e4);
emps.add(e5);
return emps;
}

}

Subscriber


package com.oraclejavacentral.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.oraclejavacentral.reactive.beans.Employee;

public class MySubscriber implements Subscriber<Employee> {

private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item");
}

@Override
public void onNext(Employee item) {
System.out.println("Processing Employee "+item);
counter++;
this.subscription.request(1);
}

@Override
public void onError(Throwable e) {
System.out.println("Some error happened");
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("All Processing Done");
}

public int getCounter() {
return counter;
}

}

◈ Subscription variable to keep reference so that request can be made in onNext method.
◈ counter variable to keep count of number of items processed, notice that it’s value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.
◈ Subscription request is invoked in onSubscribe method to start the processing. Also notice that it’s again called in onNext method after processing the item, demanding next item to process from the publisher.
◈ onError and onComplete doesn’t have much here, but in real world scenario they should be used to perform corrective measures when error occurs or cleanup of resources when processing completes successfully.

Reactive Stream Test Program


We will use SubmissionPublisher as Publisher for our examples, so let’s look at the test program for our reactive stream implementation.

package com.oraclejavacentral.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.oraclejavacentral.reactive.beans.Employee;

public class MyReactiveApp {

public static void main(String args[]) throws InterruptedException {

// Create Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

// Register Subscriber
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);

List<Employee> emps = EmpHelper.getEmps();

// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));

// logic to wait till processing of all messages are over
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// close the Publisher
publisher.close();

System.out.println("Exiting the app");

}

}

The most important piece of above code is subscribe and submit methods invocation of publisher. We should always close publisher to avoid any memory leaks.

We will get following output when above program is executed.

Subscribed
Publishing Items to Subscriber
onSubscribe requested 1 item
Processing Employee [id=1,name=Pankaj]
Processing Employee [id=2,name=David]
Processing Employee [id=3,name=Lisa]
Processing Employee [id=4,name=Ram]
Processing Employee [id=5,name=Anupam]
Exiting the app
All Processing Done

Note that if we won’t have logic for main method to wait before all the items are processed, then we will get unwanted results.

Message Transformation Example


Processor is used to transform the message between a publisher and subscriber. Let’s say we have another subscriber which is expecting a different type of message to process. Let’s say this new message type is Freelancer.

package com.oraclejavacentral.reactive.beans;

public class Freelancer extends Employee {

private int fid;

public int getFid() {
return fid;
}

public void setFid(int fid) {
this.fid = fid;
}
public Freelancer(int id, int fid, String name) {
super(id, name);
this.fid = fid;
}
@Override
public String toString() {
return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";
}
}

We have a new subscriber to consume Freelancer stream data.

package com.oraclejavacentral.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.oraclejavacentral.reactive.beans.Freelancer;

public class MyFreelancerSubscriber implements Subscriber<Freelancer> {

private Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed for Freelancer");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item for Freelancer");
}

@Override
public void onNext(Freelancer item) {
System.out.println("Processing Freelancer "+item);
counter++;
this.subscription.request(1);
}

@Override
public void onError(Throwable e) {
System.out.println("Some error happened in MyFreelancerSubscriber");
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("All Processing Done for MyFreelancerSubscriber");
}

public int getCounter() {
return counter;
}

}

Processor


The important part is the implementation of Processor interface. Since we want to utilize the SubmissionPublisher, we would extend it and use it wherever applicable.

package com.oraclejavacentral.reactive_streams;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

import com.oraclejavacentral.reactive.beans.Employee;
import com.oraclejavacentral.reactive.beans.Freelancer;

public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> {

private Subscription subscription;
private Function<Employee,Freelancer> function;
public MyProcessor(Function<Employee,Freelancer> function) {  
    super();  
    this.function = function;  
  }  
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(Employee emp) {
submit((Freelancer) function.apply(emp));  
    subscription.request(1);  
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("Done");
}

}

◈ Function will be used to convert Employee object to Freelancer object.
◈ We will convert incoming Employee message to Freelancer message in onNext method and then use SubmissionPublisher submit method to send it to the subscriber.
◈ Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.

Message Transformation Test


package com.oraclejavacentral.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.oraclejavacentral.reactive.beans.Employee;
import com.oraclejavacentral.reactive.beans.Freelancer;

public class MyReactiveAppWithProcessor {

public static void main(String[] args) throws InterruptedException {
// Create End Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

// Create Processor
MyProcessor transformProcessor = new MyProcessor(s -> {
return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});

//Create End Subscriber
MyFreelancerSubscriber subs = new MyFreelancerSubscriber();

//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber

List<Employee> emps = EmpHelper.getEmps();

// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));

// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}

// Closing publishers
publisher.close();
transformProcessor.close();

System.out.println("Exiting the app");
}

}

Read the comments in the program to properly understand it, most important change is the creation of producer-processor-subscriber chain. We will get following output when above program is executed.

Subscribed for Freelancer
Publishing Items to Subscriber
onSubscribe requested 1 item for Freelancer
Processing Freelancer [id=1,name=Pankaj,fid=101]
Processing Freelancer [id=2,name=David,fid=102]
Processing Freelancer [id=3,name=Lisa,fid=103]
Processing Freelancer [id=4,name=Ram,fid=104]
Processing Freelancer [id=5,name=Anupam,fid=105]
Exiting the app
All Processing Done for MyFreelancerSubscriber
Done

Cancel Subscription


We can use Subscription cancel method to stop receiving message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.

Here is a sample code where subscriber is consuming only 3 messages and then canceling the subscription.

@Override
public void onNext(Employee item) {
System.out.println("Processing Employee "+item);
counter++;
if(counter==3) {
this.subscription.cancel();
return;
}
this.subscription.request(1);
}

Note that in this case, our logic to halt main thread before all the messages are processed will go into infinite loop. We can add some additional logic for this scenario, may be some global variable to look for if subscriber has stopped processing or canceled subscription.

Back Pressure


When publisher is producing messages in much faster rate than it’s being consumed by subscriber, back pressure gets built. Flow API doesn’t provide any mechanism to signal about back pressure or to deal with it. But we can devise our own strategy to deal with it, such as fine tuning the subscriber or reducing the message producing rate.

Related Posts