Using HTML5 Server-Sent Events (SSE)

Eric J. Bruno
September 2016

You’ve probably heard of HTML5 WebSockets , which is a powerful method to support full duplex, reliable messaging over HTTP/S. But have you explored HTML5’s Server-Sent Events (SSE), which is a simpler (albeit not as full featured) method of sending dynamic updates from an HTTP server to code in a browser? It works outside the browser as well, between applications written in any language. I like SSE because it doesn’t require a separate WebSockets server, it works over HTTP and HTTPS, it’s firewall friendly, and it’s simple.

In this article, we’ll explore simple messaging with HTML5's SSE, using a Java Servlet as the server and some JavaScript code in a web page as the client. We’ll also implement a general purpose server in Java, create Java clients (that don’t need to run in a browser) to send and receive messages through an SSE server implemented as a Java Servlet. The server implementation presented supports both topic-based publish-and-subscribe, and queue-based point-to-point messaging.

What Are Server-Sent Events?

HTML5 SSE messaging is based on two main components: The text/event-stream MIME type where text-based messages are sent according to a simple protocol, and the EventSource interface with event listeners to receive messages. Let’s examine the protocol first.

SSE message data is simple: it’s text based, formatted as name-value pairs where the text contains a field name, a colon, and the field’s value (or data). Comments, denoted as a line that starts with a colon followed by optional text, are ignored. These are useful as “keep alive” heartbeats to keep the HTTP connection open. A single message is made up of all the fields and data parsed until an empty message (made up of just a carriage return or line feed) is received. Here are some examples:

event: message\n
data: this is an important message\n
\n
Listing 1 - A simple HTML5 SSE event

This HTTP message results in a MessageEvent object that’s created with a data field set to “this is an important message”. The second example is similar:

event: message\n
data: this is an important message\n
data: that has two lines\n
\n
Listing 2 – A single SSE event with two lines of data

This message results in a MessageEvent object with a data field set to “this is an important message\nthat has two lines”. Simply split the text String by a carriage return (or newline) character in your application to retrieve the individual lines of data.

SSE Field Names

Each SSE event message contains three components, or fields: an event field, a data field, and a carriage return (indicating end of message). The complete set of pre-defined field names include:

The following example results in two MessageEvent objects:

:First name
event: message\n
data: Eric\n
\n
:Last name
data: Bruno\n
\n
Listing 3 - Two discrete SSE event messages (separated by an empty line)

The first Message object’s data field will contain the text “Eric”, while the second Message object’s data field (delivered after the first), will contain the text “Bruno”. Both message streams contain comment fields—“First name” for the first message, “Last name” for the second—but are ignored.

Note that although the second message stream doesn’t specifically contain the event type, it’s implied to be of type “message” by default unless otherwise specified. You can create custom event types and then provide handlers to receive only the MessageObjects with data sent for the applicable event type. To explore this in detail, let’s look at the EventSource interface, along with some sample JavaScript code to illustrate how to receive and parse SSE event messages in a browser.

Using the EventSource Interface

HTML5 defines an EventSource interface (see Listing 4), which you implement to receive SSE events as MessageEvent objects. This interface extends the DOM EventTarget interface, which defines methods to add and remove event listeners, and also to dispatch messages accordingly. These EventTarget functions are implemented by the browser.

[Constructor(DOMString url, optional EventSourceInit eventSourceInitDict)]
interface EventSource : EventTarget {
  readonly attribute DOMString url;
  readonly attribute boolean withCredentials;

  // ready state
  const unsigned short CONNECTING = 0;
  const unsigned short OPEN = 1;
  const unsigned short CLOSED = 2;
  readonly attribute unsigned short readyState;

  // networking
  attribute EventHandler onopen;
  attribute EventHandler onmessage;
  attribute EventHandler onerror;
  void close();
};

dictionary EventSourceInit {
  boolean withCredentials = false;
};
Listing 4 - The IDL that defines the HTML5 EventSource interface.

The EventSource constructor requires a server URL (where the events are sent from) and an optional parameter that specifies whether or not security credentials are used. Let’s look at the code in Listing 5 as an example.

var people = new EventSource("http://my.example.com/chat");
people.onmessage = function (event) {
  var data = event.data.split('\n');
  var name = data[0];
  var action = data[1];
  console.log( name + " has “ +  action + “ the chat!");
};                
people.onerror = function(event) {
    console.log("SSE onerror " + event);
}
people.onopen = function() {
    console.log("SSE onopen");
}
Listing 5 - JavaScript code to set up an SSE EventSource

The JavaScript code in this example is set to receive SSE events sent as people join an online chat session. The EventSource object is created with the URL for the chat server. Additionally, the onmessage, onerror, and onopen functions are defined. You don’t need to define onerror or onopen, but you do need to define onmessage to receive the events. The code in the onmessage() function is implemented to handle SSE events sent to comply with the format originally shown in Listing 2. Here’s a specific example:

:Person joined chat
event: message\n
data: Eric Bruno\n
data: joined\n
\n

When this SSE event is received by the code in Listing 5, the following text will be logged to the console:

Eric Bruno has joined the chat!

Subsequently, the following SSE event will be sent when Eric Bruno leaves the chat:

:Person joined chat
event: message\n
data: Eric Bruno\n
data: left\n
\n

However, since “joined” and “left” are effectively two different events in this example, you can handle them separately by defining two new SSE event types along with JavaScript handlers for each, as shown in Listing 6.

var chat = new EventSource('http://my.example.com/chat');
chat.addEventListener('joined', joinedChatHandler, false);
chat.addEventListener('left', leftChatHandler, false);
Listing 6 - Setting up custom SSE event type listeners.

The optimized SSE message, when a person joins the chat, now looks like this:

:Person joined chat
event: joined\n
data: Eric Bruno\n
\n

The handler functions also need to be defined, as shown in Listing 7.

function joinedChatHandler(event) {
    console.log( event.data + " has joined the chat!");                
}
function leftChatHandler(event) {
    console.log( event.data + " has left the chat!");                
}
Listing 7 - The custom SSE event handler functions.

Note that the MessageEvent object’s data field no longer needs to contain anything other than the name of the person. The “action” data (“joined” or “left”) has been removed since it’s now indicated by the event type.

Each SSE connection has a state with the following possible values:

By default, when an EventSource object is created, its readyState is set to CONNECTING. It will then transition to OPEN when connected, or CLOSED if it fails entirely. Once open, the onopen event handler will be called, if defined in your code. If an error occurs at any time, the onerror event handler will be called (again, if defined). When a message is dispatched, it’s delivered via the onmessage event handler.

Connectionless Push Notifications

Given the volume of HTML requests that come from mobile devices, the HTML5 SSE specification has explicit support for connectionless push notifications. This design pattern helps to conserve power on mobile devices via a proxy server that maintains the connection to the web server on behalf of the device. This allows the device to sleep and resume as user activity dictates, while the proxy maintains the connection. For SSE, here’s how it works:

  1. The mobile browser loads an HTML5 web page that contains an EventSource request
  2. The mobile device connects to the server as referenced within the EventSource object
  3. An HTML5 SSE connection is established, ready to send and receive messages
  4. To conserve power when idle, the mobile device decides to go into low-power mode, and takes the following steps:

  5. a. The mobile browser contacts a carrier network service to request that a proxy server maintain the HTTP connection
    b. The carrier proxy server connects to the same server in step 2 above and sets up an HTML5 SSE connection with it
    c. The mobile browser disconnects from the server and allows the device to enter low power mode
  6. Some time later, the server sends an SSE message to the proxy server
  7. The proxy server sends a mobile push message, as defined by the Open Mobile Alliance (OMA), to wake up the mobile device to handle the SSE message

Remote Patient Monitoring IoT Demo

For the first example of how to use SSE, I’ve created a demonstration application that represents a remote patient monitoring system. In this scenario, there are three simulated medical devices that, when used, transmit their data. These include a blood pressure cuff, pulse oximeter (for heart rate and blood oxygenation percentage), and weight scale. The readings from these devices are simulated, and are strictly to make the demo more realistic. When the patient takes a reading (triggered by clicking on a button in the demo user interface), the data is transmitted from the simulated devices to the HTML5 code over SSE.

In the web page, shown in Figure 1, you’ll see three main sections: a data section for each medical device, instructional text, and a toolbar along the top. The HTML5 UI is meant to resemble a tablet form factor, to be used by a remotely monitored patient. The HTML5 code for this UI is available for download here .

Figure 1 - The HTML5 remote patient monitoring application UI

When the page loads, an HTML5 DOM page-load event listener is set to call the onPageLoad() function (see Listing 8), which in turn calls setupEventSource() function.

<html>
  <head>
    <script>
      var source; 
      ...

      function onPageLoad() {
        setupEventSource();
      }

      window.addEventListener("load", onPageLoad, true);
    </script>
    ...
  <head>
<html>
Listing 8 - Handling the HTML5 page load event

A JavaScript SSE Client

The setupEventSource() JavaScript function handles all of the SSE messaging, from subscribing to messages on the server, to handling the message events, as shown in Listing 9.

function setupEventSource() {
    if (typeof(EventSource) == "undefined") {
      Alert("Error: Server-Sent Events are not supported in your browser");
      return;
    }

    // Check if already connected to avoid getting duplicate messages
    if (typeof(source) !== "undefined") {
        source.close();
    }
    
    <b>console.log("Setting up new event source");
    source = new EventSource("sse?msg=healthcare");</b>
    
    source.onopen = function() {
        console.log("SSE onopen");
    }
    
    source.onmessage = function(event) {
        console.log("SSE onmessage: " + event.data);
        ...
    }

    source.onerror = function(event) {
        console.log("SSE onerror " + event);
        ...
    }
}
Listing 9 - Setting up the SSE event listener

First, recall that variable 'source' is defined globally (see Listing 8 above). Next, two things are checked up front: first, that the browser supports SSE by checking for the presence of the EventSource interface. Second, that the SSE event source hasn’t already been set up. If so, the previous event source is closed, and a new one created. This can occur if, say, the user refreshes the browser.

When the EventSource object is created, it’s given the URL of the server that sources the events. In this case, it’s the same host that served up the web page (which is http://localhost if you run this locally), and then a path to a Servlet named 'sse' that handles the requests and sends the SSE event messages. A parameter is sent to indicate the type of messages that the client is interested in. This gives me the ability in the future to support subscriptions to different sets of SSE event messages.

Once connected, the onopen() method will be called. Subsequently, when messages are available, the onmessage() method will be called (see Listing 10). If an error occurs for any reason (i.e. the network connection to the HTTP server is lost), the onerror() function will be called with a parameter to indicate what happened.

source.onmessage = function(event) {
    console.log("SSE onmessage: " + event.data);

    if ( event.data==='start' ) {
        // safely ignore this
        return;
    }
    
    // Parse the message and update the values
    var obj = JSON.parse(event.data);
    if ( obj.id == lastMsgId ) {
        // ignore duplicate
        console.log("ignoring duplicate message <b></b>");
        return;
    }

    lastMsgId = obj.id
    
    if ( obj.type === 'devicedata' ) {
      setSystolic(obj.systolic);
      setDiastolic(obj.diastolic);
      setSPO2(obj.spo);
      setHeartrate(obj.heartrate);
      setWeight(obj.weight);
    }
}
Listing 10 - The onmessage function is called when an SSE event is received

When the connection is first created, the code in this example sends a “start” message to indicate success. This isn’t required, but I find it’s useful when debugging, and the overhead is negligent. The data sent by the SSE server in this example is in JSON format. Therefore, the next step is to convert it to an object via the JSON.parse() function, built into the JavaScript engine of most browsers.

It’s a good idea to compare the message’s ID to that of the previous message, then store it. This ensures you don’t process a duplicate message if, for some reason, it’s redelivered. Finally, the appropriate HTML5 UI components are updated with the received medical device data. This processing occurs for each new SSE event message received.

Next, let’s look at the server side, where SSE subscription requests are handled, and event messages are sent.

A Java Servlet SSE Server

For this example, I chose to implement the SSE server as a Java EE Servlet. Conceptually, HTML5 SSE processing is straightforward, but in practice there are a few tricks to ensure it works smoothly. First, SSE message subscription begins when a request arrives, handled by the Servlet processRequest method, shown in Listing 11.

protected void processRequest(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
  try {
      response.setContentType("text/html;charset=UTF-8");
      String param = "";
      ...

      // Is this a message subsciption request?
      param = request.getParameter("msg");
      if ( param != null && param.length() > 0 ) {
          // <b> Set up new event listener </b>

          // Important: set content type and header
          response.setContentType("text/event-stream");
          response.setCharacterEncoding("UTF-8");
          response.setHeader("Connection", "keep-alive");

          // Store the listener's writer to send messages
          PrintWriter out = response.getWriter();
          synchronized ( listeners ) {
              listeners.add(out);
          }
          out.write("event: message\n");
          out.write("retry: 300000\n");
          out.write("data: " + "start" + "\n\n");

          // Send heartbeats continuously
          while ( true ) {
              // Sending SSE heartbeat
              out.write(": \n\n");
              if ( out.checkError() ) { 
                  // Subscriber error, break out of loop
                  break;
              }

              Utility.delay(HEARTBEAT_INTERVAL);
          }
          
          // Remove listener and return to stop sending messages
          removeListener(out);
          return;
      }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
}

Listing 11 - The Java Servlet handles SSE subscription requests

The following steps are taken:

  1. Check for the presence of the HTTP request parameter named “msg”. This is an indication that a client is subscribing for SSE messages. If your code supports different sets of messages, you could switch off the value of the “msg” parameter.
  2. Set the content type to “text/event-stream”, the encoding to “UTF-8”, and the Connection header parameter to “keep-alive”.
  3. Store the client’s writer object (which is used to send back messages) to a list so that healthcare device messages can be broadcast to it.
  4. Set the connection retry interval to 5 minutes (300,000 milliseconds) in case of connection timeout. This is done as part of the “start” message that’s sent.

Finally, the code enters a loop to continuously send empty messages as part of a connection heartbeat in order to keep the HTML5 SSE connection active. If the code returns from the processRequest method here, or fails to send heartbeats, the connection will be lost and the client will need to reconnect. Notice this loop, and hence the request itself, doesn’t exit unless an SSE error occurs, which could be a result of the browser closing.

Medical device data is sent to the Servlet asynchronously via code written to drive the demo (or from Bluetooth, for example, if this were a real implementation), which is then broadcast to all SSE listeners using the PrintWriter objects (stored in the list in step 3 above). This starts in the onDeviceData method (see Listing 12), which takes the simulated device readings and packages the update formatted as a JSON string.

public void onDeviceData(String device, String data) {
    // parse the device data
    ...

    // Format the JSON data (just a String)
    String msg = "{ " +
        "  \"id\":" + (++id) + "," +
        "  \"demomessagecount\":" + demoMessageCount + "," +
        "  \"demomessage\":" + "\""+ demoMessage + "\"," + 
        "  \"diastolic\":" + diastolic + "," +
        "  \"systolic\":" + systolic + "," +
        "  \"spo\":" + spo + "," +
        "  \"heartrate\":" + heartrate + "," +
        "  \"weight\":" + weight + "," +
        "  \"type\":" + "\""+ "devicedata" + "\"" +
        " }";

    // Send of the SSE event message with data
    sendUpdate("devicedata", msg);
}
Listing 12 - Handle device data and package it as part of a JSON string

The sendUpdate method sends the data as part of an actual SSE event, as shown in Listing 13. Most of the code is error and safety checking.

private void sendUpdate(String type, String msg) {
    if ( listeners == null || listeners.isEmpty() )
        return;

    try {
        // Clone the list of listeners to safely iterate
        //
        Vector<PrintWriter> toSend;
        synchronized ( listeners ) {
            toSend = (Vector<PrintWriter>)listeners.clone();
        }

        // Send update to all listeners
        //
        Iterator<PrintWriter> iter = toSend.iterator();
        while ( iter.hasNext() ) {
            PrintWriter out = iter.next();
            if ( out == null ) {
                continue;
            }
            
            try {
                // Send SSE update
                //
                out.write("event: message\n");
                out.write("data: " + msg + "\n\n");
                out.flush();
            } 
            catch( Exception e ) {
                // Bad listener. Remove from original list and move on
                e.printStackTrace();

                try { 
                    removeListener(out);
                } 
                catch ( Exception e1 ) { }
            }
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
}
Listing 13 - The SSE message sending code

The bulk of the processing is in cloning the list of listeners, and then iterating over it. For each listener, the SSE event is sent as a data message, as expected. If an error occurs while writing the data out to a listener, it’s handled in the Exception’s catch handler, where the error is logged and the listener is removed from the original list. The assumption here is that the listener's connection has been closed.

Running the Healthcare SSE Demo Application

To start the demo, click the “START” link in the upper right of the UI toolbar, next to the simulated battery icon. This text is defined as an HTML text link, shown in Listing 14.

<a href="#" 
   id="liveId" 
   style="font-family:'Arial'; 
   font-size:18px; 
   color: graytext;" 
   onclick="onClickText()">
START
</a>
Listing 14 - HTML5 definition of the START text link

When clicked, the onclick attribute defines the onClickText() JavaScript function to be called. This function (see Listing 15), starts the simulated readings. It also changes the link’s START text to read STOP, which you click to stop the simulation.

function onClickText() {
    var live= document.getElementById('liveId'); 
    if ( live.innerHTML === 'START' ) {
        live.innerHTML = 'STOP';
        startSimulation();
    }
    else {
        live.innerHTML = 'START';
        stopSimulation();
    }
}
Listing 15 - The JavaScript code to start and stop the healthcare simulation

The startSimulation() function sends an HTTP POST message to the server, handled by the Java Servlet, along with data indicating the simulation should start (see Listing 16).

function startSimulation() {
    var request = new XMLHttpRequest();
    request.open('POST', 'sse?SIM=start', true);
    request.setRequestHeader('X-Requested-With', 'XMLHttpRequest');  
    request.send(null);
}
Listing 16 - JavaScript code for a POST request to start the demo

When the demo is started, the numbers for the healthcare devices will continually update (see Figure 2) with simulated values.

Figure 2 - When started, the demo updates the simulated healthcare device readings

Next, let’s look at a more general SSE server-side implementation, that you can use to send any messages, along with client applications written in Java and JavaScript.

Publish-Subscribe and Queue Messaging with SSE

For the next phase of SSE messaging, I set out to implement generic publish-and-subscribe (or topic-based) messaging, as well as point-to-point (or queue-based) messaging. You can read more about these forms of messaging here but in summary:

First, two Java Servlets were implemented: one for queue-based messaging named QueueServlet, and one for publish-and-subscribe named TopicServlet. Next, the base class, Destination, was defined (see Listing 17) to contain the data relevant to both queues and topics.

public class Destination {
    String name;
    final Vector<AsyncContext> listeners = new Vector<>();
}
Listing 17 - The Destination base class

TopicServlet contains a class Topic that extends Destination, merely adding a constructor with no variables. QueueServlet, howevers, contains a class Queue (see Listing 18) that extends Destination with data relevant to queue-based messaging, such as a queue data structure to hold undelivered messages in FIFO order.

class Queue extends Destination {
    int listenerCount = 0;
    final ConcurrentLinkedQueue<String> messageQ =
        new ConcurrentLinkedQueue<String>();
    Runnable runnable; 
    
    public Queue(String name) {
        this.name = name;
    }
    private Queue() { }
}
Listing 18 - The Queue Destination class

Both Servlets share some common code, such as SSE connection heartbeat code (similar to the healthcare Servlet example earlier in the article), code to send SSE event messages, subscriber management code, and so on. This code has been put into a base class call Messenger – see Listing 19.

public abstract class Messenger extends HttpServlet {
    static final int HEARTBEAT_INTERVAL = 5000; // milliseconds 
    @Resource private ManagedExecutorService managedExecutorService;
    Logger logger = null;
    final ConcurrentHashMap<String, Destination> destinations = new ConcurrentHashMap<>();
    
    protected Runnable _heartbeat(AsyncContext ac) {
        ac.setTimeout(0);
        
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    // Continually send heartbeats or connection will time out
                    //
                    while ( true ) {
                        out.write(": \n\n"); 
                        if ( out.checkError() ) {  // checkError calls flush()
                            System.out.println("Subscriber error");
                            break;
                        }

                        try {
                            Thread.sleep(HEARTBEAT_INTERVAL);
                        } catch ( Exception e ) { }
                    }
                }
                catch ( Exception e1 ) { }

                removeListener(ac);
                ac.complete();
            }
        };

        managedExecutorService.submit(runnable);
        return runnable;
    }
    
    protected boolean sendUpdate(String data, Vector<AsyncContext> listeners) {
        if ( listeners != null ) {
            // Clone the list of listeners for safety
            Vector<AsyncContext> toSend = null;
            synchronized ( listeners ) {
                toSend = (Vector<AsyncContext>)listeners.clone();
            }

            // Send update to each listnner of this destination
            for ( AsyncContext ac: toSend ) {
                try {
                    sendSSEMessage( ac.getResponse().getWriter(), data );
                    return true;
                }
                catch( Exception e ) {
                    logger.log(Level.WARNING, "Listener error: " + e.toString() + ". Removing from list" );
                    removeListener(ac);
                }
            }
        }
        return false;
    }

    protected boolean sendSSEMessage(PrintWriter out, String data) {
        out.write("event: message\n");
        out.write("data: " + data + "\n\n");
        return ! out.checkError(); // no need to call flush()
    }

    protected void removeListener(AsyncContext ac) {
        ...
    }    
}
Listing 19 - The SSE Message Servlet base class

The first difference between this SSE implementation and the one in the healthcare sample is the use of asynchronous Servlet contexts. This allows the Servlet to return when a requestor subscribes or publishes a message, and heartbeat processing is be performed in a Runnable handled by a thread pool. The code in method _heartbeat creates a Runnable per caller, provides its reference to a ManagedExectutorService object, and returns. This Java Executor handles the mechanics of the thread pool.

The sendUpdate and sendSSEMessage methods work together to send messages as SSE event messages to the proper subscribers for the appropriate topic or queue. Next, let’s look specifically at the Topic-based messaging implementation.

Topic-based SSE Messaging Implementation

The TopicServlet implementation is straightforward, with only three methods of its own (remember it inherits some functionality from the Messenger base class described above). First, the processRequest method (see Listing 20) allows HTTP-based callers to either publish or subscribe to a topic by name.

protected void processRequest(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {

    response.setContentType("text/html;charset=UTF-8");

    // Check for publish and subscribe requsts
    String publisher = request.getParameter("publish");
    String subscriber = request.getParameter("subscribe");
    String topicName = request.getParameter("name");

    if ( subscriber != null ) {
        onSubscribe(topicName, request, response);
    }
    else if ( publisher != null ) {
        String data = request.getParameter("data");
        onPublish(topicName, data, request, response);
    }
    else {
        try (PrintWriter out = response.getWriter()) {
            // output some informational HTML here…
        }
    }
}
Listing 20 - The TopicServlet code to process HTTP requests

An HTTP client cannot be both a publisher and subscriber in the same call – this requires two separate calls. To subscribe, the HTTP parameter “subscribe” must be specified, as well as the topic name as a value to the “name” parameter, as in this URL:

http://server/topicservlet?subscribe&name=MyTopic

Likewise, to publish, the URL will look similar with the addition of the actual data (as text), as in:

http://server/topicservlet?subscribe&name=MyTopic&data=MyData

A wider range of data types can be sent using HTTP posts. The code to handle subscriptions is similar to earlier examples on SSE processing, with some additional code (see Listing 21).

private void onSubscribe(   String topicName, 
                            HttpServletRequest request,
                            HttpServletResponse response) {
    try {
        if ( topicName == null || topicName.length() == 0 ) {
            response.getWriter().println("Error: no topic name provided");
            return;
        }

        // Set content type
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Connection", "keep-alive");
        response.setHeader("Cache-Control", "no-cache");

        // Store to send updates
        final AsyncContext ac;
        synchronized ( destinations ) {
            Topic topic = (Topic)destinations.get(topicName);
            if ( topic == null ) {
                topic = new Topic(topicName);
                destinations.put(topicName, topic);
            }

            // Add new subscriber for this topic
            ac = request.startAsync(request, response);
            topic.listeners.add(ac);
        }

        _heartbeat(ac);
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
}
Listing 21 - Topic subscriber handling code

First, the code sets the SSE content type, encoding and so on, as seen earlier. Next, the subscriber is added as a listener to the specified Topic object, which is created if it’s a new topic name, and the asynchronous Servlet Context processing—along with SSE heartbeats—is started via calls to request.startAsynch and _heartbeat, respectively.

Message publishing is handled by the onPublish method—shown in Listing 22—which aside from some basic error checking simply calls the base class’ sendUpdate method.

private void onPublish( String topicName, String data, 
                        HttpServletRequest request, 
                        HttpServletResponse response) {
    try {
        PrintWriter out = response.getWriter();
        ...
        
        Topic topic = (Topic)destinations.get(topicName);
        if ( topic != null ) {
            sendUpdate(data, topic.listeners);
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
}
Listing 22 - Publishing an SSE message to a topic

That’s it for topic-based SSE messaging. Let’s look a queue-based messaging now, which is a little more involved.

Queue-based SSE Messaging Implementation

The QueueServlet implementation is similar to TopicServlet, except it has to more carefully manage message delivery. Specifically, it needs to store messages for future delivery when there are no active listeners for a queue, and it also needs to ensure only one listener receives each message even when there are multiple listeners for the same queue. To begin, the processRequest method (see Listing 23), where HTTP Servlet requests are handled, is similar to the publish-and-subscribe implementation.

protected void processRequest(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    
    response.setContentType("text/html;charset=UTF-8");

    // Check for send and listen requests
    String sender = request.getParameter("send");
    String listener = request.getParameter("listen");
    String queueName = request.getParameter("name");
    
    if ( listener != null ) {
        onListen(queueName, request, response);
    }
    else if ( sender != null ) {
        String data = request.getParameter("data");
        onSend(queueName, data, request, response);
    }
    else {
        // output informational HTML...
    }
}
Listing 23 - The QueueServlet HTTP request handler

Just as with the TopicServlet implementation, requests can be made to send or listen to a queue, exclusively, and a queue name must be provided. The onListen method is almost identical to TopicServlet’s onSubscribe method in terms of setting the response content type, setting up the AsynchContext processing, SSE heartbeat processing, and storing the listener. The onSend method is where things begin to vary (see Listing 24).

private void onSend(String queueName, String data,
                    HttpServletRequest request, 
                    HttpServletResponse response) {
    try {
        PrintWriter out = response.getWriter();
        ...

        Queue queue = getQueue(queueName);

        // First, attempt delivery
        boolean delivered = sendUpdate(data, queue.listeners);
        if ( ! delivered ) {
            // Message not delivered, enqueue message
            queue.messageQ.add(data);
        }
    }
    catch ( Exception e ) {
        e.printStackTrace();
    }
}
Listing 24 - QueueServlet message delivery

First, message delivery is attempted (the details on this are described below). However, unlike with TopicServlet, if there are no current listeners, the message is placed in an internal FIFO queue for future delivery. As we’ll see later, this internal queue is serviced when new listeners connect, and during heartbeat processing for those listeners after they connect.

Next, let’s look at the differences in the sendUpdate method (see Listing 25), which is overridden from the Messenger base class implementation.

@Override
protected boolean sendUpdate(String data, Vector<AsyncContext> listeners) {
    // Find one listener to send the message to:
    // -Temporarily remove the listener from the list of listeners
    // -Remove the message from the internal list of messages
    // -Deliver message
    // -If successful, replace listener in list
    // -If failure, remove listener and replace message in list
    // 
    AsyncContext listenerAC = null;
    try {
        if ( listeners != null ) {
            synchronized ( listeners ) {
                listenerAC = listeners.remove(0);
            }
        }
    }
    catch ( Exception e ) { }
        
    if ( listenerAC == null ) {
         return false; // No listeners
    }
    
    try {
        // Attempt message delivery
        boolean delivered = sendSSEMessage( listenerAC.getResponse().getWriter(), data );
        if ( delivered ) {
            // Replace listener at head of queue
            synchronized ( listeners ) {
                listeners.add(0, listenerAC);
            }

            return true;
        }
        else {
            return false;
        }
    }
    catch( Exception e ) {
        // Bad listener. Remove from original list and move on
        //
        logger.log(Level.WARNING, "Listener error: " + e.toString() + ". Removing from list" );
        removeListener(listenerAC);
        return sendUpdate(data, listeners);
    }
}
Listing 25 - The QueueServlet's overridden sendUpdate implementation

Recall that each queued message must be delivered to, at most, one listener, even when there are multiple listeners for the specified queue. To guarantee this, and to allow parallel message delivery processing of the remainder of the queued messages to other listeners, the first listener for this queue is removed from the Queue object’s internal list. Next, the message is attempted to be delivered to this listener. If successful, the listener is placed back into the list of listeners, and the method returns true.

If message delivery fails, this listener is not added back into the list, and the sendUpdate method calls itself recursively. This call will perform the same processing using the updated list, and will return true if the message is delivered successfully, or false otherwise (leaving the message in the queue for future delivery).

The Messenger base class defines an empty method, processDestination, which gets called periodically during heartbeat processing for the purpose of optional message maintenance. In the TopicServlet implementation, this method isn’t needed and is left empty. QueueServlet, however, overrides so it can check the applicable queue to see if there are undelivered messages. If there are, then message delivery is resumed for that queue. As a result, periodic heartbeat processing also serves as a message delivery pump.

Next, let take a look at the topic and queue-based messaging in action.

SSE Java Clients

The code for the publisher, whether to a topic or a queue, is nearly identical. The only difference is in the URL. For instance, this URL is used to publish to a topic:

http://server/topicservlet?publish&name=MyTopic&data=MyData

The following URL is used to send a message to a queue:

http://server/queueservlet?send&name=MyQueue&data=MyData

The code to send the message to the Servlet (either one) is shown in Listing 26. In fact, in the sample code (available for download here) the same code is used to send to either servlet.

public void sendMessage(String serverURL, String data) {
    try {
        // Construct the request
        data = URLEncoder.encode(data, "UTF-8");
        
        serverURL += "&data=" + data;

        // Send the request
        URL url= new URL(serverURL);
        URI uri = new URI(
                url.getProtocol(), url.getUserInfo(), 
                url.getHost(), url.getPort(), 
                url.getPath(), url.getQuery(), url.getRef() );
        
        url = new URL( uri.toASCIIString() );

        URLConnection conn = url.openConnection();
        conn.setDoOutput(true);
        BufferedReader rd = 
            new BufferedReader(
                new InputStreamReader(
                        conn.getInputStream()));
        rd.close();
    }
    catch (Exception e) {
        e.printStackTrace();
    }
}
Listing 26 - Sending an SSE message to a queue or topic

First, the data is URL encoded since it’s included as an HTTP request parameter. Next, the Servlet is connected as with any HTTP server. Finally, opening the connection and reading from the InputStream triggers the request to complete, and the message is sent to the server.

Listening for data, either on a queue or topic, requires basic Java HTTP processing code (see Listing 27).

public void listen(String serverURl) {
    try {
        String data = URLEncoder.encode("", "UTF-8");

        URL url = new URL(serverURl);
        URLConnection conn = url.openConnection();
        conn.setDoOutput(true);
        conn.setConnectTimeout(0);

        BufferedReader rd = 
            new BufferedReader(
                new InputStreamReader(
                    conn.getInputStream()));

        // Begin InputReader loop waiting for messages 
        String line;
        while ((line = rd.readLine()) != null) {
            if ( line == null || line.length() <= 0 ) 
                continue;

            // Did we get a heartbeat or useful data?
            if ( line.startsWith("data:") ) {
                System.out.println(line.substring("data:".length()));
            }
        }

        rd.close();
    }
    catch (Exception e) {
        e.printStackTrace();
    }
}
Listing 27 - Java code to listen for an SSE message

As with the message sending code, an HTTP connection is made to the Servlet. Next, an InputReader is opened on the connection, a loop is entered waiting for data to arrive from the InputReader. When it does, the fact that a BufferedReader is used ensures it gets entire lines of text, which contains the data. This is convenient since SSE messages each end in a line-break.

You can build publishers and subscribers in any programming language. For instance, the following HTML5 page (see Listing 28) contains JavaScript code that subscribes to a queue named “q1”. The Java code from above can be used to send messages to that queue, or another JavaScript client, or one written in Python, and so on. HTML5 SSE messaging is language and platform independent.

<html>
<head>
    <title>SSE Test Page</title>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <script>
        var source;

        function setupEventSource() {
            if (typeof(EventSource) !== "undefined") {
              // Close  already connected to avoid duplicated messages
              if (typeof(source) !== "undefined") {
                  source.close();
              }
              
              source = new EventSource("queue?listen&name=q1");
              
              source.onerror = function(event) {
                  console.log("SSE onerror " + event);
                  if ( event.target.readyState == 2 ) {
                      // Clean up and reestablish connection
                      try { 
                          source.close(); 
                      } catch ( err ) { }

                      setTimeout(function() { setupEventSource(); }, 1000); 
                  }
              }
              
              source.onopen = function() {
                  console.log("SSE onopen");
              }
              
              source.onmessage = function(event) {
                  console.log("SSE onmessage: " + event.data);
              }
          } 
          else {
            Alert("Error: Server-Sent Events are not supported in your browser");
          }
        }

        function onPageLoad() {
            setupEventSource();
        }
        
        window.addEventListener("load", onPageLoad, true);
    </script>
</head>
<body>
    <div>SSE Test Page</div>
</body>
</html>
Listing 28 - HTML5 JavaScript code that subscribes to an SSE queue

You can download the code for QueueServlet, TopicServlet, both Java clients, and the JavaScript sample here

Conclusion: HTML5 SSE Rocks!

Hopefully the code included in this article servers as a good starting point for your HTML5 projects. The SSE based queue and topic Servlets are quite usable as-is. But to be truly reliable, you’ll need to add some sort of persistence to queue-based messages to ensure they’re not lost of the server goes down.

As shown throughout this article, HTML5 SSE is versatile, powerful, and easy to use for firewall friendly, reliable distributed software message delivery. It works well across platforms and languages, and doesn’t require much to set up.

Happy coding!