Home » Reactive Programming » Reactive Streams

Reactive Streams

Last Updated on August 11, 2024 by KnownSense

If you know Reactive programming, it’s just a set of principles with no coding standards or implementations. The standards are defined by an initiative called reactive streams. The purpose of this initiative is to define a set of standards for asynchronous stream processing by using non‑blocking components as defined on the reactive‑streams.org website.

Understanding the Four Key Components

Reactive Streams consists of four components, each with a different role to play in the reactive world.
Publisher: The code providing the data under the form of a data stream. The Publisher has one method – subscribe().
Subscriber: Since the publisher provides the data, we also need something to consume and process it. This is the role of a subscriber. We can think about Subscriber as a Sink. This has four methods that need to be overridden – onSubscribe(), onNext(), onError(), and onComplete(). 
Subscription: It is actually using a third component to connect to the publisher and receive data under the form of a data stream. If any of the subscribers want to receive events published by it, they need to subscribe to the given Publisher.
Processor: The processor is a combination between a subscriber and a publisher. The processor uses a subscription to receive data from a data stream. After it’s done processing, one or more subscribers can consume the data using other subscription elements.

Concept of Backpressure

Reactive Streams

One very important thing to remember about this standard is that it is using a push model to transfer the data. The publisher pushes events to the subscriber; however, there is also a feedback loop coming from the subscriber to the publisher. The subscriber must be able to control the rate at which the publisher is pushing events towards it, and to do that, it takes advantage of the backpressure concept. Using the feedback loop, the subscriber can tell the publisher to slow down or increase the rate at which it is publishing events. The backpressure mechanism works even if we have a longer chain, such as the one represented by the processor. If the bottom subscriber is having issues keeping up with the rate at which the processor sends events, it can tell it to slow down. On its turn, the processor can do the same thing to its publisher. However, all these concepts are just some standards without concrete implementation. Since it’s pretty inefficient to write an implementation on your own, it’s good to know that someone has already done it. In fact, there are many implementations of reactive streams, such as Akka, Vert.x, and RxJava.

Implementation

Let’s create a simple Flow in Java with a Publisher and a Subscriber, we will use the java.util.concurrent.Flow API introduced in Java 9. This API provides the basic building blocks for Reactive Streams, including Publisher, Subscriber, Subscription, and Processor.

Step1: Create the Subscriber

import java.util.concurrent.Flow;

public class SimpleSubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Subscribed");
        // Request the first item 
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        // Process the item (in this case, just print it)
        // Request the next item
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("All items processed");
    }
}

Step 2: Create the Publisher

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SimplePublisher {
    public static void main(String[] args) throws InterruptedException {
        // Create a Publisher
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        // Create a Subscriber
        SimpleSubscriber subscriber = new SimpleSubscriber();

        // Register the Subscriber with the Publisher
        publisher.subscribe(subscriber);

        // Publish items
        System.out.println("Publishing items...");
        String[] items = {"item1", "item2", "item3", "item4"};
        for (String item : items) {
            publisher.submit(item);
            Thread.sleep(500); // Simulate delay
        }

        // Close the Publisher
        publisher.close();

        // Wait for the subscriber to complete processing
        Thread.sleep(1000);
    }
}
Explanation:
  1. SubmissionPublisher:
    • This is a concrete implementation of the Publisher interface provided by the JDK. It allows you to publish items to one or more subscribers.
    • In the example, items are submitted one at a time using the submit method.
  2. SimpleSubscriber:
    • The SimpleSubscriber implements the Flow.Subscriber interface and overrides its methods.
    • The onSubscribe method is called when the subscriber is subscribed to the publisher. It requests the first item using subscription.request(1);.
    • The onNext method is called when a new item is published. After processing the item, the subscriber requests the next item.
    • The onError method handles any errors that occur during processing.
    • The onComplete method is called when all items have been processed.
  3. Flow Control:
    • The subscriber controls the flow of data by requesting items one at a time (subscription.request(1);), which is a simple form of backpressure. This ensures that the subscriber processes items at its own pace.
Step 3: Running the Example

When you run the SimplePublisher class, you should see the output where items are published by the Publisher and consumed by the Subscriber, one at a time.

OUTPUT
Publishing items...
Subscribed
Received: item1
Received: item2
Received: item3
Received: item4
All items processed

Conclusion

In this article we understand reactive streams and it’s key components. We also saw a example that illustrates the basic use of Java’s Flow API to create a simple reactive stream where a Publisher produces data, and a Subscriber consumes it one item at a time.

Scroll to Top