Monday , April 12 2021

How to implement reactive polling in Java

There are no comments

Suppose it is necessary to stream data from an external source, but that the data source does not support push natively, so it is necessary to resort to periodic polling. How to implement this in Java, in the simplest way possible, while remaining responsive?

General idea

Before we immerse ourselves in the code, we first discuss the requirements and the general idea. What we need to do is basically to activate a recovery operation at a fixed rate, blocking the client until new data are available. In addition, suppose we want to remain responsive, so we should not lock indefinitely, but unlock after a certain maximum time has elapsed, after which the client can react accordingly (try again, stop or do something else) .

To meet these requirements, we will implement a variant of the Token Bucket algorithm, which is commonly used for Traffic Shaping. In this algorithm, a fixed number of tokens is periodically placed in a virtual bucket of a specified capacity. At the same time, another thread waiting to perform some operations (for example, sending a data packet on the network) checks the contents of the bucket and if there are enough tokens in it, remove them from the bucket and run the & # 39; operation. In this article, we will simplify the algorithm by simulating a bucket with a capacity of one and using only one consumption thread.


Since our bucket has the capacity of one, it will only have two states (full and empty). This can be represented by a single Boolean value, true empty meaning and full empty meaning:

private Boolean going to take = true; // let's start to recover immediately

Furthermore, we need to plan an activity that will "fill the bucket" periodically at a fixed speed. This is done using a ScheduledExecutor service:

empty beginning() {
    ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
    es.scheduleAtFixedRate(This::scheduleFetch, FETCH_INTERVAL, FETCH_INTERVAL, TimeUnit.MILLISECONDS);

What does the scheduleFetch appearance? Simply set the fetch variable to true (fills the bucket) and notify another (recovery) thread, which at that time might wait for the status of our bucket to be changed. For a discussion of why the next two methods need to be synchronized, see this stack overflow question.

synchronized empty scheduleFetch() {
    going to take = true;

Next, we will provide an operation that will be returned immediately if the bucket is full or blocked for a set period of time, waiting for it to become full, returning the most recent bucket status and emptying it at the end:

synchronized Boolean awaitFetch() throws InterruptedException {
    Self (!going to take)
    try {
        return going to take;
    } finally {
        going to take = false;

Since we will not stop WAIT_LIMIT any longer, this method is guaranteed to return no more than WAIT_LIMIT. We need this guarantee to ensure reactivity, as we will see shortly. In total, the operation reports to the caller whether a recovery is allowed, returning no more than WAIT_LIMIT milliseconds.

With this post, and assuming that the actual recovery operation (sending a request on the network, interpreting the answer etc.) be implemented in the doFetch method, we can finally implement our survey blocking method:

List poll() throws InterruptedException {
    return awaitFetch() ? doFetch() : null;

Here, there are no signals to the customer that new data are not yet available. In fact, this is the exact protocol needed by the source connectors in Kafka Connect and the described implementation is used in the PLC4X source connector.


There are two main parameters in this program: WAIT_LIMIT and FETCH_INTERVAL. The former controls the responsiveness of the client – the longer WAIT_LIMIT is low, the faster the control is returned to the client in the event that new data is not available.

The second parameter controls the maximum request speed (sampling). It is in fact a higher limit because the actual sampling rate may be lower, ie when the recovery operation takes longer than FETCH_INTERVAL.


Although the proposed solution works, there are alternatives. One of these alternatives is to directly retrieve data in the scheduled periodic activity instead of notifying the recovery thread (client). However, since it is necessary to block the client thread waiting for new data, it is necessary to transfer the retrieved results from the periodic task to the client, for example through a blocking queue.

Another alternative is to use a utility class already prepared for this type of activity, such as RateLimiter from the Google Guava library. This would further simplify the implementation. However, you will need to add another library dependency to your project, which, depending on the circumstances, may be appropriate for you or not.


Simple reactive polling can be implemented in a surprisingly simple way using a variant of the Token Bucket algorithm, using two low-level synchronization primitives of the Java platform: waiting and notification. Although common knowledge implies that you should never create problems with basic synchronization primitives and use abstractions in java.util.concurrent, this example shows that sometimes it is correct to break the rules if the work is done.

Source link


  1. replacement mercedes key

    I wanted to check up and let you know how really I liked discovering your blog today.

  2. My spouse and i got quite happy that John managed to do his preliminary research through the entire precious recommendations he grabbed out of your web pages. It is now and again perplexing to just continually be giving out points which others may have been selling. So we grasp we have got the writer to give thanks to for that. The main illustrations you have made, the simple web site navigation, the relationships you will help foster – it’s got many impressive, and it’s really helping our son in addition to the family know that the matter is excellent, which is quite mandatory. Thank you for everything!

  3. I’m commenting to let you be aware of of the beneficial discovery my child developed browsing the blog. She came to find a lot of details, including how it is like to have a great coaching character to get certain people completely comprehend a number of complex issues. You actually did more than her desires. I appreciate you for coming up with such effective, dependable, edifying and cool guidance on that topic to Ethel.

  4. over the counter hiv drugs

    I am continuously browsing online for ideas that can benefit me. Thanks! over the counter hiv drugs

  5. over the counter atherosclerosis drugs

    You got a very superb website, Sword lily I discovered it through google. over the counter atherosclerosis drugs

Leave a Reply

Your email address will not be published.