My experiences about a JMS interchange between HornetQ and ActiveMQ over Apache ServiceMix

I recently had a project where I had to deploy some Apache Camel routes, deployed over ServiceMix, that had to connect to a HornetQ messaging system, deployed on a JBoss Application Server.

My first intention was to wrap the HornetQ libraries as OSGi bundles and deploy them on Apache ServiceMix, but it was a real nightmare, with numerous class loading problems, terrible! There is an example of a similar use case of mine, written by Torsten Mielke, but I couldn’t establish the connection between Apache ServiceMix 6.0.0 and JBoss EAP 6.3. On the other hand, there is a HornetQ issue to make it OSGi compliant, but it also didn’t work out in my case.

My final approach to the problem was to set up HornetQ bridges between JBoss EAP 6.3 and the JMS installation of ServiceMix: Apache ActiveMQ. You can find the code of my proof of concept in one of my GitHub repositories. I honestly think that this solution desvirtues the concept of an ESB, because the routing is made outside, but it works!


A .NET class library for accessing to the HornetQ REST interface

This is the third article of my series about the HornetQ REST interface, in the first post I wrote about the building and deployment of a customized version of the Web application that implements the interface, in the second post I talked about the development of a Java class library for accessing to the HornetQ REST interface, this time I want to develop a .NET API with the same goal.

I was an enthusiastic VB6 programmer long, long time ago. I also participated in some projects that made use of Visual Basic .NET and I even made some things in ASP.NET, but, to be perfectly honest, I’ve been focused in Java based technologies last years, so I’ve had to bring myself up to date on .NET!

The first task was to setup a development environment. I decided to use the free tool Microsoft Visual Web Developer 2010 Express and I also installed the extension NuGet Package Manager, which made the setup of Microsoft ASP.NET Web API Client Libraries easier. These libraries helped me to access to the HornetQ REST interface, like RESTEasy helped me in Java. This post of Mike Wasson was very useful to get a quick introduction, although he programs in C#!

My idea was to rewrite the library I created in Java using Visual Basic .NET, as simple as that. The first thing I realized is that nowadays both languages have similar capacities, a quick proof, this is the messaging interface in Java:


public interface MessagingInterface {

    void start() throws MessagingException;

    <T> void sendMessage(T message) throws MessagingException;

    <T> T receiveNextMessage(Class<T> type) throws MessagingException;

    void ackLastMessageReceived() throws MessagingException;

    void stop() throws MessagingException;
}

And this .NET:


Public Interface MessagingInterface(Of T)

    Sub ClientStart()

    Sub SendMessage(ByRef Mensaje As T)

    Function ReceiveNextMessage() As T

    Sub AckLastMessageReceived()

    Sub ClientStop()

End Interface

The HTTP messages to send in each case are determined by the HornetQ REST Interface user’s manual, so the algorithms are the same in Java and in Visual Basic .NET, I just had to adapt the code to the language and the special features of .NET
Another interesting comparison, this the Java method to send a message:


@Override
public <T> void sendMessage(T message) throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();

        if (response.getStatus() == 307) {
            this.msgCreateLink = response.getLocation();

            response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();
        }

        if (response.getResponseStatus().equals(Response.Status.CREATED)) {
            this.msgCreateLink = response.getHeaderAsLink("msg-create-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

And this is the .NET one:


Public Sub SendMessage(ByRef Message As T) Implements MessagingInterface(Of T).SendMessage
    Dim Response As HttpResponseMessage
    Dim XmlFormatter As XmlMediaTypeFormatter

    Try
        If Not Me.IsStarted Then
            Throw New InvalidOperationException("The client is not started")
        End If

        XmlFormatter = New XmlMediaTypeFormatter
        XmlFormatter.UseXmlSerializer = True

        Response = Me.HornetQHttpClient.PostAsync(Me.MsgCreateUri, 
                                                  Message, 
                                                  XmlFormatter).Result

        If Response.StatusCode = HttpStatusCode.RedirectKeepVerb Then
            Me.MsgCreateUri = Response.Headers.GetValues("Location").First
            Response = Me.HornetQHttpClient.PostAsync(Me.MsgCreateUri, 
                                                      Message, 
                                                      XmlFormatter).Result
        End If

        If Response.StatusCode = HttpStatusCode.Created Then
            Me.MsgCreateUri = Response.Headers.GetValues("msg-create-next").First
        Else
            Throw New MessagingException("Response code " + Response.StatusCode + " not supported")
        End If
    Catch ex As MessagingException
        Throw ex
    Catch ex As Exception
        Throw New MessagingException(ex.Message, ex)
    End Try
End Sub

References


A Java class library for accessing to the HornetQ REST interface

This is the second post of the series about the HornetQ REST interface, which I started to write last month. This time I’ve developed a Java class library that pretends to simplify the access to the HornetQ REST interface. You may be wondering, why not to use JMS if you’re programming in Java language? The main reason is that it’s difficult to communicate through firewalls using JMS in JBoss 4.2.1, which it’s a requisite in my case. You might implement HTTP tunnelling, but this could be inefficient and not scalable, so I think that an architecture that mixes JMS and REST may be a good solution when you have to send/receive messages to/from multiple partners. For example, a provider can send you a message through the REST interface and you can consume with JMS, and vice versa.

First of all, I’d like to point out two additional requisites of my case:

  • All messages must be persistent.
  • Messages must not be duplicated.

These goals can be achieved by setting up the elements default-durable-send and dups-ok of the HornetQ REST Interface Web Application configuration file (rest-config.xml) to the values true and false respectively.

The HornetQ REST Interface user’s manual define a protocol for sending and receiving messages, where you have to send several HTTP messages, read different HTTP headers, interpret return codes and so on. The idea is to write a class library that hides this complexity and offers a simple interface that allows sending and receiving messages:


public interface MessagingInterface {

    void start() throws MessagingException;

    <T> void sendMessage(T message) throws MessagingException;

    <T> T receiveNextMessage(Class<T> type) throws MessagingException;

    void ackLastMessageReceived() throws MessagingException;

    void stop() throws MessagingException;
}

The class HornetQRESTClient implements the former interface. It uses the RESTEasy supporting library in order to simplify the REST calls and has three main properties:

  • serverURL: URL where is deployed the HornetQ REST Interface Web Application
  • queue: the queue or topic where you want to send messages or receive from.
  • msgSubscriberId: the id of the subscriber for the case of the topics.

The start method setup the environment and send an HTTP HEAD request, which returns the headers you need for sending and receiving messages. Here you have an excerpt of the most interesting code:


    ...
    request = new ClientRequest(this.serverURL + (this.isTopic ? "/topics/" : "/queues/") + this.queue);

    response = request.head();
    if (response.getResponseStatus().equals(Response.Status.OK)) {
        this.msgCreateLink = response.getHeaderAsLink("msg-create");                
        this.msgPullConsumerCreationLink = response.getHeaderAsLink(this.isTopic ? 
                                                                        "msg-pull-subscriptions" : 
                                                                        "msg-pull-consumers");
    } else if (response.getResponseStatus().equals(Response.Status.NOT_FOUND)) {
        throw new MessagingException("The queue " + queue + " is not registered");
    } else {
        throw new MessagingException("Response code  " + response.getStatus() + " not supported");
    }
    ...

The method sendMessage allows you to send a message encapsulated on an object susceptible to be marshalled with JAXB:


@Override
public <T> void sendMessage(T message) throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();

        if (response.getStatus() == 307) {
            this.msgCreateLink = response.getLocation();

            response = this.msgCreateLink.request().body(MediaType.APPLICATION_XML, message).post();
        }

        if (response.getResponseStatus().equals(Response.Status.CREATED)) {
            this.msgCreateLink = response.getHeaderAsLink("msg-create-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

On the other hand, I’ve decided to implement the manual acknowledgement method for receiving messages via pull, so I’m continuously polling the server to see if messages are available by calling the method receiveNextMessage and I call the method ackLastMessageReceived, once I’ve processed a received message.  I’ll analyse consuming messages via push in a forthcoming post. Here you have the code of both methods:


@Override
public <T> T receiveNextMessage(Class<T> type) throws MessagingException {
    T message = null;
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgAcknowledgementLink != null) {
            throw new IllegalStateException("There is a message waiting for acknowledgement");
        }

        // Creates the consumer, if needed
        if (this.msgPullConsumerLink == null) {
            if (this.isTopic) {
                response = this.msgPullConsumerCreationLink.request()
                        .formParameter("autoAck", "false")
                        .formParameter("durable", "true")
                        .formParameter("name", this.msgSubscriberId)
                        .post();
            } else {
                response = this.msgPullConsumerCreationLink.request().formParameter("autoAck",
                                                                                    "false").post();
            }

            if (response.getResponseStatus().equals(Response.Status.CREATED)) {
                this.msgPullConsumerLink = response.getLocation();                    
                this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
            } else if (response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
                this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");              
            } else {
                throw new MessagingException("Response code  " + response.getStatus() + " not supported");
            }
        }

        response = this.nextMsgLink.request()
                .header("Accept-Wait", SECONDS_WAITING)
                .header("Accept", MediaType.APPLICATION_XML)
                .post();

        if (response.getResponseStatus().equals(Response.Status.OK)) {
            message = (T) response.getEntity(type);
            this.msgAcknowledgementLink = response.getHeaderAsLink("msg-acknowledgement");
        } else if (response.getResponseStatus().equals(Response.Status.SERVICE_UNAVAILABLE)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
        } else if (response.getResponseStatus().equals(Response.Status.PRECONDITION_FAILED)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }

    return message;
}

@Override
public void ackLastMessageReceived() throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgAcknowledgementLink == null) {
            throw new MessagingException("No messages waiting for acknowledgement");
        }

        response = this.msgAcknowledgementLink.request().formParameter("acknowledge", "true").post();

        if (response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
            this.nextMsgLink = response.getHeaderAsLink("msg-acknowledge-next");                
            this.msgAcknowledgementLink = null;
        } else {
            throw new MessagingException("Response code  " + response.getStatus() + " not supported");
        }
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

The method stop cleans up the consumer created for consuming messages just in the case of queues. I’ve decided not to do it for topics, because I need durable subscriptions and I’ve realized through testing that if you delete the consumer, the subscription is deleted too, and the manual says:

“…A consumer timeout for durable subscriptions will not delete the underlying durable JMS subscription though, only the server-side consumer resource (and underlying JMS session)…”

So, I just wait for the timeout. Here you have the code:


@Override
public void stop() throws MessagingException {
    ClientResponse response;

    try {
        if (!this.isStarted) {
            throw new IllegalStateException("The client is not started");
        }

        if (this.msgPullConsumerLink != null && (!this.isTopic)) {
            response = this.msgPullConsumerLink.request().delete();

            if (!response.getResponseStatus().equals(Response.Status.NO_CONTENT)) {
                throw new MessagingException("Response code  " + response.getStatus() + " not supported");
            }
        }

        this.isStarted = false;
        this.initLinks();
    } catch (MessagingException ex) {
        throw ex;
    } catch (Exception ex) {
        throw new MessagingException(ex);
    }
}

Finally, I’d like to show the code of a simple unit test:


@Test
public void queueTest() throws MessagingException {
    MessagingInterface messaging;
    Order orderToSend, orderReceived;

    messaging = RESTMessagingClientFactory.getHornetQRESTClient(SERVER_URL, QUEUE);

    messaging.start();

    // Purge the queue
    while((orderReceived = messaging.receiveNextMessage(Order.class)) != null) {
        messaging.ackLastMessageReceived();
    }

    orderToSend = new Order("Test item", 5, 10.2f);        
    messaging.sendMessage(orderToSend);

    orderReceived = messaging.receiveNextMessage(Order.class);        
    messaging.ackLastMessageReceived();

    messaging.stop();

    assertNotNull(orderReceived);
    assertEquals(orderToSend, orderReceived);                
}

In order to run the test, I had to deploy the following queue and topic by adding this configuration to my hornetq-jms.xml file:


   <queue name="orders">
      <entry name="/queues/orders"/>
   </queue>
   <topic name="orders">
      <entry name="/topics/orders"/>
   </topic>

The source code of this library can be downloaded following this link.


References


Researching the HornetQ REST interface

I’ve been studying the HornetQ REST interface during the last weeks, because I consider it’s a very good solution if you have an application that uses JMS and you want to integrate messages from a third party that doesn’t use Java language. This is the first post of the ones I’ve decided to write about this issue and, in this case, the first task is to configure and install the interface.

As usual, it’s important to review the user’s manual of the technology, this time the HornetQ REST interface one.  One important restriction in my case was that I had to use the version 4.2.1 GA of the JBoss Application Server, with HornetQ 2.2.5 as its messaging provider. Therefore, the pom.xml file proposed by the manual had to be changed, in order to add the dependency on the RESTEasy libraries and the 1.5 Java version for both the source and target code:

<project xmlns="http://maven.apache.org/POM/4.0.0" 
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.wordpress.fcosfc.hornetq</groupId>
  <artifactId>HornetQRESTInterface</artifactId>
  <packaging>war</packaging>
  <version>1.0</version>
  <name>HornetQRESTInterface</name>
  <repositories>
    <repository>
      <id>jboss</id>
      <url>http://repository.jboss.org/nexus/content/groups/public/</url>
    </repository>
  </repositories>

  <dependencies>
    <dependency>
      <groupId>org.hornetq.rest</groupId>
      <artifactId>hornetq-rest</artifactId>
      <version>2.2.2.Final</version>
    </dependency>
    <dependency>
      <groupId>org.jboss.resteasy</groupId>
      <artifactId>resteasy-jaxrs</artifactId>
      <version>2.2.2.GA</version>
      <exclusions>
        <exclusion>
          <artifactId>commons-logging</artifactId>
          <groupId>commons-logging</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.jboss.resteasy</groupId>
      <artifactId>resteasy-jaxb-provider</artifactId>
      <version>2.2.2.GA</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.0.2</version>
        <configuration>
          <source>1.5</source>
          <target>1.5</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

Here you have a link to my Maven project, I hope it will be useful for you.


References


Trying to set up a JMS bridge between HornetQ and Oracle AQ

I’ve been interested in setting up a JMS bridge between HornetQ and Oracle Advanced Queuing (AQ) for a while, even I wrote about this issue in a response to a question of the HornetQ user’s forum. I resume the matter in this post with some improvements, although, to be perfectly honest, I am not happy with the configuration.

In theory, the bridge is possible because both systems are JMS 1.1. compliant. If you use a HornetQ 2.2.5 standalone installation, you simply have to add some managed beans to the hornetq-beans.xml file, as is described in the user manual. The key point here is to use the proper connection factory and set up the destination following the rules of the Weblogic manual: “Interoperating with Oracle AQ JMS”. Here you have an excerpt of the hornetq-beans.xml file:

<bean name="JMSBridge">
  <constructor>
  ...
    <!-- concatenate JMS messageID to the target's message header -->
    <parameter>true</parameter>
  ...
</bean>
...
<!-- TargetCFF describes the ConnectionFactory used to connect to the target destination -->
<bean name="TargetCFF">
  <constructor>
    <parameter>
      <inject bean="TargetJNDI" />
    </parameter>
    <parameter>QueueConnectionFactory</parameter>
  </constructor>
</bean>
...
<!-- TargetDestinationFactory describes the Destination used as the target -->
<bean name="TargetDestinationFactory">
  <constructor>
    <parameter>
      <inject bean="TargetJNDI" />
    </parameter>
    <parameter>Queues/testQueue</parameter>
  </constructor>
</bean>
...
<!-- JNDI is a Hashtable containing the JNDI properties required -->
<!-- to connect to the *target* JMS resources                    -->
<bean name="TargetJNDI">
  <constructor>
    <map keyClass="java.lang.String" valueClass="java.lang.String">
      <entry>
        <key>java.naming.factory.initial</key>
        <value>oracle.jms.AQjmsInitialContextFactory</value>
      </entry>
      <entry>
        <key>db_url</key>
        <value>jdbc:oracle:thin:@oraclehost:1521:test</value>
      </entry>
      <entry>
        <key>java.naming.security.principal</key>
        <value>test</value>
      </entry>
      <entry>
        <key>java.naming.security.credentials</key>
        <value>secret</value>
      </entry>
    </map>
  </constructor>
</bean>

The former configuration runs properly with the DUPLICATES_OK quality of service, but when you moves to a ONCE_AND_ONLY_ONCE one, changing the TargetCFF to XAQueueConnectionFactory, you get a NullPointerException:

  oracle.jms.AQjmsException: Error creating the db_connection
    at oracle.jms.AQjmsDBConnMgr.getConnection
    at oracle.jms.AQjmsDBConnMgr.
    at oracle.jms.AQjmsXAConnection.
    at oracle.jms.AQjmsXAQueueConnectionFactory.createAllXAConnection
    at oracle.jms.AQjmsXAQueueConnectionFactory.createXAQueueConnection

After a research on Oracle Support notes, I found that the Oracle API has a bug (10102373) that it will be fixed on the future release 12.1 There are workarounds for not receiving duplicate messages, for example: to save the JMS Message ID in a Oracle table and implement a unique index on that column, you have to take into account that the JMS Messege ID sent by HornetQ is a property that can be recovered with the method get_string_property of the Oracle type aq$_jms_message.

But, why Weblogic can interoperate with Oracle AQ in XA configurations? I suspect that the reason is because uses datasources with pre-created database connections. So, I’ve tried to deploy the bridge within JBoss AS 6.0.0, with HornetQ as its default JMS provider. First of all, I created an Oracle XA datasource. After that, I set up the configuration file jms-bridge-jboss-beans.xml in a similar way as before, the only change is to change the db_url parameter by the parameter datasource:

<bean name="TargetJNDI">
  <constructor>
    <map keyClass="java.lang.String" valueClass="java.lang.String">
      <entry>
        <key>java.naming.factory.initial</key>
        <value>oracle.jms.AQjmsInitialContextFactory</value>
      </entry>
      <entry>
        <key>datasource</key>
        <value>jdbc/testOracleXA</value>
      </entry>
    </map>
  </constructor>
</bean>

But, when I tried to run the former configuration, the bridge didn’t start. Another bug, a HornetQ one in this case, prevented me from reaching my goal. To be perfectly honest, I didn’t patch the installation, but I restart the managed bean through the JMX console and I got another exception, a ClassCastException, because JBoss pass a wrapped Oracle connection to the class AQjmsInitialContextFactory, which is waiting for an native internal connection. I really felt powerless!

Finally, I adopted the solution of setting up an Oracle Weblogic server as a mediator,  the guru Edwin Biemond describes this configuration in this article.


Security notes about Hornetq as a JBoss JMS provider

I’ve been recently involved in a project where my client had to share information with some partners, using HornetQ 2.2.5 queues deployed on a JBoss 4.2.1 GA application server, and I had to face up to a security problem. Here you have the case: the first step that a remote client has to perform when it tries to send a message to a queue is to lookup for a connection factory and the queue in a JNDI tree; at this point, you should not simply open your JBoss JNDI port (1099 by default), even having configured your firewall to open the port just to your partners’ servers and having an user authentication process for JNDI, because they might list all the names of your global JNDI namespace and exploit any security bug you may have. On the other hand, although I would have opened the port, our JBoss application server runs into a demilitarized zone (DMZ) with a NAT (Network Address Translation) standard configuration, in this situation you can’t use RMI.

The solution to my problem was to force the partners to access JNDI over HTTP an secure that access in a manner that they  just could perform lookup operations over a controlled read-only context, so they got their connection factories and queues from that context and, later on, opened their JMS sessions, through a HTTPS tunnel, after an user authentication process.

In order to run a basic test case,  the first step is to create a role for the remote users (named remoteRole in this example) and another for local ones (localRole) on JBoss. On the other hand, I created one key store for the SSL connection:

keytool -genkeypair -alias test -keyalg RSA -keysize 1024 -dname OU=TEST,CN=TEST,CN=LOCAL
        -keypass test1234 -keystore hornetq.test.keystore -storepass test1234

The following task is to deploy the netty servlet that provide the HTTP transport, please follow this link where you have the WAR file I setup.  The next step is to edit the file  hornetq-configuration.xml file  of the HornetQ installation and to add a servlet connector (please, notice that the path to the test keystore is referred to the client file system and you have to provide it with the keystore) and its corresponding acceptor :

<connectors>
...  
   <connector name="netty-servlet">
      <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
      <param key="host" value="${jboss.bind.address:localhost}"/>
      <param key="port" value="8443"/>
      <param key="use-servlet" value="true"/>
      <param key="servlet-path" value="/messaging/HornetQServlet"/>
      <param key="ssl-enabled" value="true"/>
      <param key="key-store-path" value="C:\\Software\\KeyStores\\hornetq.test.keystore"/>
      <param key="key-store-password" value="test1234"/>
 </connector>
...
</connectors>

<acceptors>
..  
   <acceptor name="netty-invm">
      <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
      <param key="use-invm" value="true"/>
      <param key="host" value="org.hornetq"/>
   </acceptor>
...
</acceptors>

The following task is to be sure that the HTTPS connector of the JBoss 4.2.1 AS is running, if not you have to edit the file server.xml located in the folder \deploy\jboss-web.deployer

I also add a security constraint to this  hornetq-configuration.xml configuration file, so the remote users just can send messages to our queues:

...
<security-settings>
...
  <security-setting match="jms.queue.ReadOnly.#">
    <permission type="consume" roles="localRole"/>
    <permission type="send" roles="remoteRole"/>
  </security-setting>
...
</security-settings>
...

The following task is to edit the hornetq-jms.xml file and add a connection factory and a queue, deployed under the readonly context:

...
<connection-factory name="ReadOnly.NettyConnectionFactory">
  <xa>true</xa>
  <connectors>
    <connector-ref connector-name="netty-servlet"/>
  </connectors>
  <entries>
    <entry name="readonly/XAConnectionFactory"/>
  </entries>
</connection-factory>
...
<queue name="ReadOnly.TestQueue">
  <entry name="readonly/TestQueue"/>
</queue>
...

The next step is to code a basic messages producer, in order to be used by the remote part:

public class RemoteJMSProducer {

    public void sendMessages() {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session = null;
        Queue queue;
        MessageProducer producer;
        TextMessage message;
        Context initialContext;
        Properties jndiConfig;

        try {
            jndiConfig = new Properties();
            jndiConfig.put(Context.INITIAL_CONTEXT_FACTORY,
                           "org.jboss.naming.HttpNamingContextFactory");
            jndiConfig.put(Context.PROVIDER_URL,
                           "http://remote-jboss-server/invoker/ReadOnlyJNDIFactory");
            initialContext = new InitialContext(jndiConfig);

            connectionFactory = (ConnectionFactory)
                                 initialContext.lookup("readonly/XAConnectionFactory");
            connection = connectionFactory.createConnection("remoteUser", "test");
            queue = (Queue) initialContext.lookup("readonly/TestQueue");

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(queue);
            message = session.createTextMessage();

            for (int i = 0; i < 10; i++) {
                message.setText("This is the test message number " + i);
                producer.send(message);
            }

            System.out.println("Messages sent!");
        } catch (NamingException ex) {
            System.err.println("JNDI exception: " + ex.getMessage());
        } catch (JMSException ex) {
            System.err.println("JMS exception: " + ex.getMessage());
        } finally {
            try {
                session.close();
            } catch (Exception ignore) {
            }
            try {
                connection.close();
            } catch (Exception ignore) {
            }
        }
    }

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        RemoteJMSProducer remoteJMSProducer;

        remoteJMSProducer = new RemoteJMSProducer();
        remoteJMSProducer.sendMessages();
    }
}

The former class doesn’t use the typical org.jnp.interfaces.NamingContextFactory, but the org.jboss.naming.HttpNamingContextFactory as the initial context factory. On the other hand, I’d like to point out that the provider URL is http://remote-jboss-server/invoker/ReadOnlyJNDIFactory. In order to run the example, you need to put the following libraries on your classpath:

  • hornetq-core-client.jar
  • hornetq-jms-client.jar
  • netty.jar
  • concurrent.jar
  • jboss-client.jar
  • jboss-common-client.jar
  • jboss-j2ee.jar
  • jboss-remoting.jar
  • jboss-serialization.jar
  • jbosssx-client.jar

Please, realize that if you try to perform a list or a rename operation, you get an exception.


References


Tracking of a Weblogic JMS bridge

I’ve been talking about JMS bridges between Weblogic and HornetQ for a while, today I’d like to deal with the issue of tracking, which is especially useful when you need to know the time when a message was dispatched to the destination system.

The idea is to configure Weblogic to debug messaging bridge runtime events, add a custom logging handler that captures and saves these events into a table and to analyze them, in order to track the pass of the messages through the bridge.

Therefore, the first task is to enable the debugging of the messaging bridge events in Weblogic. One of the methods to achieve this goal is to add the following lines to the setDomainEnv.cmd script:

set JAVA_OPTIONS=%JAVA_OPTIONS% -Dweblogic.debug.DebugMessagingBridgeStartup=true
set JAVA_OPTIONS=%JAVA_OPTIONS% -Dweblogic.debug.DebugMessagingBridgeRuntime=true

The next step is to create a table to give persistence to the events:

CREATE TABLE wl_log
(log_id        NUMBER              CONSTRAINT wl_log_pk PRIMARY KEY,
 log_timestamp TIMESTAMP           NOT NULL,
 msg_id        VARCHAR2(25 CHAR)   NOT NULL,
 log_level     VARCHAR2(25 CHAR)   NOT NULL,
 subsystem     VARCHAR2(50 CHAR)   NOT NULL,
 message       VARCHAR2(4000 CHAR) NOT NULL);

CREATE SEQUENCE wl_log_s
INCREMENT BY 1
START WITH 1
MAXVALUE 999999999999
MINVALUE 1
CACHE 20;

The following task is to create a custom logging handler that will capture and save the events into the former table (this class has references to a properties file, where I put the parameters needed to connect to the Oracle database):

public class WLCustomLoggingHandler extends Handler {
    private Connection dbConn = null;
    private PreparedStatement psmt = null;

    public WLCustomLoggingHandler() throws IOException,
                                           ClassNotFoundException,
                                           SQLException {
        super();

        Properties dbProperties;

        dbProperties = new Properties();
        dbProperties.load(this.getClass().getResourceAsStream(
                                            "WLCustomLoggingHandlerDB.properties"));
        Class.forName(dbProperties.getProperty("DRIVER"));
        this.dbConn =
                DriverManager.getConnection(dbProperties.getProperty("URL"),
                                            dbProperties.getProperty("USER"),
                                            dbProperties.getProperty("PASS"));
        this.psmt =
                this.dbConn.prepareStatement("INSERT INTO wl_log " +
                                             "(log_id, log_timestamp, msg_id, " +
                                             "log_level, subsystem, message) " +
                                             "VALUES(wl_log_s.nextval, ?, ?, ?, ?, ?)");

        this.setErrorManager(new ErrorManager() {
            public void error(String msg, Exception ex, int code) {
                System.err.println("Error reported by TestWLLoggingHandler "
                                   + msg + ex.getMessage());
                LoggingHelper.getServerLogger().removeHandler(WLCustomLoggingHandler.this);
            }
        });
    }

    @Override
    public void publish(LogRecord record) {
        WLLogRecord wlLogRecord = (WLLogRecord)record;
        if (this.isLoggable(wlLogRecord)) {
            try {
                psmt.setEscapeProcessing(true);
                psmt.setTimestamp(1, new Timestamp(wlLogRecord.getMillis()));
                psmt.setString(2, wlLogRecord.getId());
                psmt.setString(3, wlLogRecord.getLevel().getName());
                psmt.setString(4, wlLogRecord.getLoggerName());
                psmt.setString(5, wlLogRecord.getMessage());
                psmt.executeUpdate();
                this.flush();
            } catch (SQLException ex) {
                this.reportError("WLCustomLoggingHandlerpublish: ",
                                 ex,
                                 ErrorManager.WRITE_FAILURE);
            }
        }
    }

    @Override
    public void flush() {
        try {
            this.dbConn.commit();
        } catch (SQLException ex) {
            this.reportError("WLCustomLoggingHandlerflush: ",
                             ex,
                             ErrorManager.FLUSH_FAILURE);
        }
    }

    @Override
    public void close() throws SecurityException {
        try {
            this.dbConn.close();
        } catch (SQLException ex) {
            this.reportError("WLCustomLoggingHandlerclose: ",
                             ex,
                             ErrorManager.CLOSE_FAILURE);
        }
    }
}

We just want to analyze the events regarding to the messaging bridge, so we have to create a logging filter:

public class WLCustomLoggingFilter implements Filter {
    public WLCustomLoggingFilter() {
        super();
    }

    @Override
    public boolean isLoggable(LogRecord record) {
        if (record instanceof WLLogRecord) {
            WLLogRecord wlLogRecord = (WLLogRecord)record;

            return wlLogRecord.getSubsystem().equals("MessagingBridge") ||
                wlLogRecord.getSubsystem().equals("MessagingBridgeRuntime");
        } else {
            return false;
        }
    }
}

The next step is to code a Weblogic startup class:

public class WLCustomLoggingHandlerStartupClass {
    public WLCustomLoggingHandlerStartupClass() {
        super();
    }

    public void configureLogger() {
        Logger logger = null;
        Handler handler = null;

        try {
            logger = LoggingHelper.getServerLogger();
            handler = new WLCustomLoggingHandler();
            handler.setFilter(new WLCustomLoggingFilter());
            logger.addHandler(handler);
        } catch (Exception ex) {
            System.err.println("WLCustomLoggingHandlerStartupClass.configureLogger: "
                               + ex.getMessage());
            ex.printStackTrace();
            try {
                logger.removeHandler(handler);
            } catch (Exception ignore) {
            }
        }
    }

    public static void main(String[] args) {
        WLCustomLoggingHandlerStartupClass testWLCustomLoggingHandlerStartupClass =
            new WLCustomLoggingHandlerStartupClass();
        testWLCustomLoggingHandlerStartupClass.configureLogger();
    }
}

This class has to be configured in Weblogic by going to the Enviroment->Startup and Shutdown Classes of the console (obviously, the corresponding jar file containing the former code has to be available on the weblogic classpath):

Weblogic Startup and Shutdown Classes
The last step is to analyze the log entries in order to know when each message is dispatched to its destination, when the bridge was up and running, etc. MessagingBridge Subsystem Messages are helpful to achieve this goal, especially:

  • BEA-200028: The bridge “arg01” has started transferring messages.
  • BEA-200024: Bridge “arg0” refused to obtain connections and transfer any messages, because it has been stopped or suspended, or is shutting down.
  • BEA-200015: An error occurred in bridge “arg0” during the transfer of messages (e).

But, the pass of a message through a bridge has to be discovered by analyzing the log entries with id BEA-000000, of course the sender program has to save the message and its ID on persistent storage at the time it sends it. The bridge dequeues each message from the source and enqueues it to the destination, all in the context of a XA batch transaction (10 messages by default):

03/03/12 10:12:11,859000000	"Bridge: TestBridge (processMessages()) received message:
                                 JMS Message Class: TextMessage
                                 JMSMessageID: ID:B3AD885BC54B4391976BEDA9EC00E8F4
                                 JMSCorrelationID: null
                                 JMSDeliveryMode: PERSISTENT
                                 JMSDestination: TestQueue
                                 JMSExpiration: 0
                                 JMSPriority: 8
                                 JMSRedelivered: false
                                 JMSReplyTo: null
                                 JMSTimestamp: 1330851452000 (Sun Mar 04 09:57:32 CET 2012)
                                 JMSType: null
                                 Transaction Id: BEA1-00018D19CC5B9E472B1E
                                 This is a test message: 010"
03/03/12 10:12:11,859000000	"Bridge: TestBridge (processMessages()) successfully sent message:
                                 JMS Message Class: TextMessage
                                 Old JMS MessageID: ID:B3AD885BC54B4391976BEDA9EC00E8F4
                                 New JMS MessageID: ID:2064fac7-65da-11e1-9824-005056c00008
                                 This is a test message: 010"
03/03/12 10:12:12,671000000	Bridge: TestBridge (processMessages()) committed the transaction

References