A .NET class library for accessing to the HornetQ REST interface

This is the third article of my series about the HornetQ REST interface, in the first post I wrote about the building and deployment of a customized version of the Web application that implements the interface, in the second post I talked about the development of a Java class library for accessing to the HornetQ REST interface, this time I want to develop a .NET API with the same goal.

I was an enthusiastic VB6 programmer long, long time ago. I also participated in some projects that made use of Visual Basic .NET and I even made some things in ASP.NET, but, to be perfectly honest, I’ve been focused in Java based technologies last years, so I’ve had to bring myself up to date on .NET!

The first task was to setup a development environment. I decided to use the free tool Microsoft Visual Web Developer 2010 Express and I also installed the extension NuGet Package Manager, which made the setup of Microsoft ASP.NET Web API Client Libraries easier. These libraries helped me to access to the HornetQ REST interface, like RESTEasy helped me in Java. This post of Mike Wasson was very useful to get a quick introduction, although he programs in C#!

My idea was to rewrite the library I created in Java using Visual Basic .NET, as simple as that. The first thing I realized is that nowadays both languages have similar capacities, a quick proof, this is the messaging interface in Java:


public interface MessagingInterface {

    void start() throws MessagingException;

    <T> void sendMessage(T message) throws MessagingException;

    <T> T receiveNextMessage(Class<T> type) throws MessagingException;

    void ackLastMessageReceived() throws MessagingException;

    void stop() throws MessagingException;
}

And this .NET:


Public Interface MessagingInterface(Of T)

    Sub ClientStart()

    Sub SendMessage(ByRef Mensaje As T)

    Function ReceiveNextMessage() As T

    Sub AckLastMessageReceived()

    Sub ClientStop()

End Interface

The HTTP messages to send in each case are determined by the HornetQ REST Interface user’s manual, so the algorithms are the same in Java and in Visual Basic .NET, I just had to adapt the code to the language and the special features of .NET
Another interesting comparison, this the Java method to send a message:


@Override
public <T> void sendMessage(T message) throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();

        if (response.getStatus() == 307) {
            this.msgCreateLink = response.getLocation();

            response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();
        }

        if (response.getResponseStatus().equals(Response.Status.CREATED)) {
            this.msgCreateLink = response.getHeaderAsLink("msg-create-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

And this is the .NET one:


Public Sub SendMessage(ByRef Message As T) Implements MessagingInterface(Of T).SendMessage
    Dim Response As HttpResponseMessage
    Dim XmlFormatter As XmlMediaTypeFormatter

    Try
        If Not Me.IsStarted Then
            Throw New InvalidOperationException("The client is not started")
        End If

        XmlFormatter = New XmlMediaTypeFormatter
        XmlFormatter.UseXmlSerializer = True

        Response = Me.HornetQHttpClient.PostAsync(Me.MsgCreateUri, 
                                                  Message, 
                                                  XmlFormatter).Result

        If Response.StatusCode = HttpStatusCode.RedirectKeepVerb Then
            Me.MsgCreateUri = Response.Headers.GetValues("Location").First
            Response = Me.HornetQHttpClient.PostAsync(Me.MsgCreateUri, 
                                                      Message, 
                                                      XmlFormatter).Result
        End If

        If Response.StatusCode = HttpStatusCode.Created Then
            Me.MsgCreateUri = Response.Headers.GetValues("msg-create-next").First
        Else
            Throw New MessagingException("Response code " + Response.StatusCode + " not supported")
        End If
    Catch ex As MessagingException
        Throw ex
    Catch ex As Exception
        Throw New MessagingException(ex.Message, ex)
    End Try
End Sub

References


A Java class library for accessing to the HornetQ REST interface

This is the second post of the series about the HornetQ REST interface, which I started to write last month. This time I’ve developed a Java class library that pretends to simplify the access to the HornetQ REST interface. You may be wondering, why not to use JMS if you’re programming in Java language? The main reason is that it’s difficult to communicate through firewalls using JMS in JBoss 4.2.1, which it’s a requisite in my case. You might implement HTTP tunnelling, but this could be inefficient and not scalable, so I think that an architecture that mixes JMS and REST may be a good solution when you have to send/receive messages to/from multiple partners. For example, a provider can send you a message through the REST interface and you can consume with JMS, and vice versa.

First of all, I’d like to point out two additional requisites of my case:

  • All messages must be persistent.
  • Messages must not be duplicated.

These goals can be achieved by setting up the elements default-durable-send and dups-ok of the HornetQ REST Interface Web Application configuration file (rest-config.xml) to the values true and false respectively.

The HornetQ REST Interface user’s manual define a protocol for sending and receiving messages, where you have to send several HTTP messages, read different HTTP headers, interpret return codes and so on. The idea is to write a class library that hides this complexity and offers a simple interface that allows sending and receiving messages:


public interface MessagingInterface {

    void start() throws MessagingException;

    <T> void sendMessage(T message) throws MessagingException;

    <T> T receiveNextMessage(Class<T> type) throws MessagingException;

    void ackLastMessageReceived() throws MessagingException;

    void stop() throws MessagingException;
}

The class HornetQRESTClient implements the former interface. It uses the RESTEasy supporting library in order to simplify the REST calls and has three main properties:

  • serverURL: URL where is deployed the HornetQ REST Interface Web Application
  • queue: the queue or topic where you want to send messages or receive from.
  • msgSubscriberId: the id of the subscriber for the case of the topics.

The start method setup the environment and send an HTTP HEAD request, which returns the headers you need for sending and receiving messages. Here you have an excerpt of the most interesting code:


    ...
    request = new ClientRequest(this.serverURL + (this.isTopic ? "/topics/" : "/queues/") + this.queue);

    response = request.head();
    if (response.getResponseStatus().equals(Response.Status.OK)) {
        this.msgCreateLink = response.getHeaderAsLink("msg-create");                
        this.msgPullConsumerCreationLink = response.getHeaderAsLink(this.isTopic ? 
                                                                        "msg-pull-subscriptions" : 
                                                                        "msg-pull-consumers");
    } else if (response.getResponseStatus().equals(Response.Status.NOT_FOUND)) {
        throw new MessagingException("The queue " + queue + " is not registered");
    } else {
        throw new MessagingException("Response code  " + response.getStatus() + " not supported");
    }
    ...

The method sendMessage allows you to send a message encapsulated on an object susceptible to be marshalled with JAXB:


@Override
public <T> void sendMessage(T message) throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();

        if (response.getStatus() == 307) {
            this.msgCreateLink = response.getLocation();

            response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();
        }

        if (response.getResponseStatus().equals(Response.Status.CREATED)) {
            this.msgCreateLink = response.getHeaderAsLink("msg-create-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

On the other hand, I’ve decided to implement the manual acknowledgement method for receiving messages via pull, so I’m continuously polling the server to see if messages are available by calling the method receiveNextMessage and I call the method ackLastMessageReceived, once I’ve processed a received message.  I’ll analyse consuming messages via push in a forthcoming post. Here you have the code of both methods:


@Override
public <T> T receiveNextMessage(Class<T> type) throws MessagingException {
    T message = null;
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgAcknowledgementLink != null) {
            throw new IllegalStateException("There is a message waiting for acknowledgement");
        }

        // Creates the consumer, if needed
        if (this.msgPullConsumerLink == null) {
            if (this.isTopic) {
                response = this.msgPullConsumerCreationLink.request()
                        .formParameter("autoAck", "false")
                        .formParameter("durable", "true")
                        .formParameter("name", this.msgSubscriberId)
                        .post();
            } else {
                response = this.msgPullConsumerCreationLink.request().formParameter("autoAck",
                                                                                    "false").post();
            }

            if (response.getResponseStatus().equals(Response.Status.CREATED)) {
                this.msgPullConsumerLink = response.getLocation();                    
                this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
            } else if (response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
                this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");              
            } else {
                throw new MessagingException("Response code  " + response.getStatus() + " not supported");
            }
        }

        response = this.nextMsgLink.request()
                .header("Accept-Wait", SECONDS_WAITING)
                .header("Accept", MediaType.APPLICATION_XML)
                .post();

        if (response.getResponseStatus().equals(Response.Status.OK)) {
            message = (T) response.getEntity(type);
            this.msgAcknowledgementLink = response.getHeaderAsLink("msg-acknowledgement");
        } else if (response.getResponseStatus().equals(Response.Status.SERVICE_UNAVAILABLE)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
        } else if (response.getResponseStatus().equals(Response.Status.PRECONDITION_FAILED)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }

    return message;
}

@Override
public void ackLastMessageReceived() throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgAcknowledgementLink == null) {
            throw new MessagingException("No messages waiting for acknowledgement");
        }

        response = this.msgAcknowledgementLink.request().formParameter("acknowledge", "true").post();

        if (response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");                
            this.msgAcknowledgementLink = null;
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

The method stop cleans up the consumer created for consuming messages just in the case of queues. I’ve decided not to do it for topics, because I need durable subscriptions and I’ve realized through testing that if you delete the consumer, the subscription is deleted too, and the manual says:

“…A consumer timeout for durable subscriptions will not delete the underlying durable JMS subscription though, only the server-side consumer resource (and underlying JMS session)…”

So, I just wait for the timeout. Here you have the code:


@Override
public void stop() throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgPullConsumerLink != null && (!this.isTopic)) {
            response = this.msgPullConsumerLink.request().delete();

            if (!response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
                throw new MessagingException("Response code  " + response.getStatus() + " not supported");
            }
        }

        this.isStarted = false;
        this.initLinks();
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

Finally, I’d like to show the code of a simple unit test:


@Test
public void queueTest() throws MessagingException {
    MessagingInterface messaging;
    Order orderToSend, orderReceived;

    messaging = RESTMessagingClientFactory.getHornetQRESTClient(SERVER_URL, QUEUE);

    messaging.start();

    // Purge the queue
    while((orderReceived = messaging.receiveNextMessage(Order.class)) != null) {
        messaging.ackLastMessageReceived();
    }

    orderToSend = new Order("Test item", 5, 10.2f);        
    messaging.sendMessage(orderToSend);

    orderReceived = messaging.receiveNextMessage(Order.class);        
    messaging.ackLastMessageReceived();

    messaging.stop();

    assertNotNull(orderReceived);
    assertEquals(orderToSend, orderReceived);                
}

In order to run the test, I had to deploy the following queue and topic by adding this configuration to my hornetq-jms.xml file:


   <queue name="orders">
      <entry name="/queues/orders"/>
   </queue>
   <topic name="orders">
      <entry name="/topics/orders"/>
   </topic>

The source code of this library can be downloaded following this link.


References