Writing Message Driven Bean

Message Driven Beans are a new part of EJB 2.0. The reason these beast where added is that there was no way in EJB 1.1 to handle asynchronous invocation. The primary reason behind this is that an EJB bean may never be invoke from other objects other than through its remote interface. Therefore a bean could never set itself up as a listener for asynchronous invocation.

With MDB this lack is filled. An MDB is a bean without a remote interface, where the container sets itself up as a listener for asynchronous invocation and handles the invocation of the concrete bean, which follows all the usual roles for EJB beans.

Message Driven Beans are primarily focused on JMS. An MDB is either a topic or a queue subscriber. One nice feature with MDB is that one gets multithreaded subscribers (even for Topics) without having to care about the subtle difficulties to write multithreaded code and to write multithreaded JMS message consumers.

What should you use your beans to then? Basically you would use MDB any time you are about to create a JMS subscriber. Typical conditions for doing this is:

Here we will write some typical MDB.

Hello World MDB

An MDB follows a typical EJB contract. It must implement the following two interfaces:

  • javax.ejb.MessageDrivenBean

  • javax.jms.MessageListener

An MDB must therefore typically contain the following four methods:

public void setMessageDrivenContext(MessageDrivenContext ctx);
public void ejbCreate();
public void ejbRemove();
public void onMessage(Message message);

The full program listing of a simple Hello World bean could look like this:


package test.bean;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.ejb.EJBException;

import javax.jms.MessageListener;
import javax.jms.Message;

public class MDB implements MessageDrivenBean, MessageListener{
    private MessageDrivenContext ctx = null;
    public MDB() {
	
    }
    public void setMessageDrivenContext(MessageDrivenContext ctx)
	throws EJBException {
	this.ctx = ctx;
    }
    
    public void ejbCreate() {}

    public void ejbRemove() {ctx=null;}

    public void onMessage(Message message) {
	System.err.println("Bean got message" + message.toString() );
    }
} 

To deploy this into JBoss we will have to write two deployment descriptors. One standard ejb-jar and one that is JBoss specific. We will chose to make this bean a Topic subscriber. Since we do not do anything we could typically chose to use container managed transaction with NotSupported(although this would in most cases not be the best thing to do)

     <?xml version="1.0"?>
     <!DOCTYPE ejb-jar>
     <ejb-jar>
       <enterprise-beans>
         <message-driven>
           <ejb-name>MDB</ejb-name>
           <ejb-class>test.bean.MDB</ejb-class>
           <message-selector></message-selector>
           <transaction-type>Container</transaction-type>
           <message-driven-destination>
             <destination-type>javax.jms.Topic</destination-type>
             <subscription-durability>NonDurable</subscription-durability>
           </message-driven-destination>
         </message-driven>
       </enterprise-beans>
       <assembly-descriptor>
         <container-transaction>
           <method>
             <ejb-name>MDB</ejb-name>
             <method-name>*</method-name>
           </method>
           <trans-attribute>NotSupported</trans-attribute>
         </container-transaction>
       </assembly-descriptor>
     </ejb-jar>

We also need to write a small deployment descriptor that is specific for JBoss. The full version of this is quite big, and with it it is possible to configure the MDB container quite a bit. For most users this is not necessary, and the may use the standard configuration. The most important part of the descriptor is the specification of the destination. We chose the testTopic since it is always available in JBoss.


     <?xml version="1.0" encoding="Cp1252"?>
     <jboss>
       <enterprise-beans>
         <message-driven>
           <ejb-name>MDB</ejb-name>
           <configuration-name>Standard Message Driven Bean</configuration-name>
           <destination-jndi-name>topic/testTopic</destination-jndi-name>
         </message-driven>
       </enterprise-beans>
     </jboss>

Then you will have to pack these into a jar. Here is one way to do it:

  mkdir dist
  mkdir dist/META-INF
  mkdir dist/test
  cp MDB.class dist/test
  cp MDB-jar.xml dist/META-INF/ejb-jar.xml
  cp MDB-jboss.xml dist/META-INF/jboss.xml
  cd dist
  java jar -cvf mdb.jar .

Copy the bean into the deploy directory of JBoss.

To send stuff to your bean you need a JMS publisher. This is standard JMS programming, but here is one way to do it:

import javax.naming.*;

import javax.jms.*;

public class Main {
  public static void main(String arg[]) {
    try {
      // Get access to JNDI
      Context context = new InitialContext();

      // Lookup the managed connection factory for a topic
      TopicConnectionFactory topicFactory = 
         (TopicConnectionFactory)context.lookup("TopicConnectionFactory");

      // Create a connection to the JMS provider
      TopicConnection topicConnection = topicFactory.createTopicConnection();
      
      // Creat a topic session
      TopicSession session = topicConnection.createTopicSession(
						   // No transaction
						   false, 
						   // Auto ack
						   Session.AUTO_ACKNOWLEDGE);

     // Lookup the destination you want to publish to
     Topic topic = (Topic)context.lookup("topic/testTopic");

     // Create a publisher
     TopicPublisher pub = session.createPublisher(topic);

     // Create a message
     TextMessage message = session.createTextMessage();
     message.setText("Hello World!");

     // Publish the message
     pub.publish(topic, message);

     // Close the stuff
     session.close();
     topicConnection.close();
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

You will typically have to include the following jars in your classpath: jbossmq.client.jar, jms.jar, jnp-client.jar and jta-spec1_0_1.jar.

MDB as a listener

We will here look at an example that does something more real. One nice way to use MDB is to have them act/emulate the listener pattern. The normal way with listeners is to have them register them self on the object emitting events. This is not possible with MDB, since the bean them self may only do some work when the receive an even (a message). Set up of MDB as a listener will therefore have to be done outside of the MDB. This could be as simple as defining a topic or queue and hardwire into the event generator to publish it's events/messages to that destination. It's also possible to create a more generic framework for message driven callback, something I have done with JMS and JMX. But that is for another document. Lets instead look on the MDB side.

One way to partition the logic in EJB is to have one bean that does the real work (contains the logic), this could be a stateless session bean or an entity bean, and one bean acting as a listener. Lets write a working, but simplified version of this patter. We start with a very simple stateless session bean, with just one method: doWork. To make it easy we let it take a String as its mission. This is straight forward. First we need a home interface:

package msgbean.interfaces;

import java.rmi.RemoteException;
import javax.ejb.*;

public interface WorkerHome  extends EJBHome {
   public Worker create() throws RemoteException, CreateException;
} // WorkerHome

We also need a remote interface

package msgbean.interfaces;

import java.rmi.RemoteException;
import javax.ejb.EJBObject;

public interface Worker extends EJBObject {
    public void doWork(String work) throws RemoteException;
    
} // Worker

And finally we need the bean class:

package msgbean.server;
import javax.ejb.*;
import java.rmi.RemoteException;
import javax.naming.*;

public class WorkerBean implements SessionBean {
    private SessionContext ctx;
    public WorkerBean() {
	
    }
    public void setSessionContext(javax.ejb.SessionContext ctx) throws RemoteException { this.ctx = ctx; }
    public void unsetSessionContext() throws RemoteException { this.ctx = null; }
    public void ejbActivate() throws RemoteException {}
    public void ejbPassivate() throws RemoteException {}
    public void ejbRemove() throws RemoteException {}
    public void ejbCreate() throws CreateException {}
    public void doWork(String work) {
	System.out.println("WorkerBean doing work: " + work);
    }
} // WorkerBean

We will write the deployment descriptor for the bean later, since we will pack it with the listener.

The next step is to write the listener bean. Here we will ad basically one thing: the ability to lookup the Worker bean and invoke it. We look up the bean through a an JNDI reference defined via ejb-ref. This might be done in ejbCreate().

Context initCtx = new InitialContext();
	    workerHome = (WorkerHome)initCtx.lookup("java:comp/env/ejb/worker");

In the onMessage() method we get what we need out of the message. Here we could have sent an object of some kind, known to the listener. For simplicity we here chose to send a TextMessage and send the content of the message to the worker.

	    Worker worker = workerHome.create();
	    if (message instanceof TextMessage) {
		TextMessage m = (TextMessage)message;
		worker.doWork(m.getText());
	    }

Here is the complete listing of the class:

package msgbean.server;

import javax.ejb.*;

import javax.naming.*;

import javax.jms.*;

import msgbean.interfaces.WorkerHome;
import msgbean.interfaces.Worker;

public class ListenerBean implements MessageDrivenBean, MessageListener{
    private MessageDrivenContext ctx = null;
    private WorkerHome workerHome = null;

    public ListenerBean() {
	
    }
    public void setMessageDrivenContext(MessageDrivenContext ctx)
	throws EJBException {
	this.ctx = ctx;
    }
    
    public void ejbCreate() throws CreateException {
	try {
	    Context initCtx = new InitialContext();
	    workerHome = (WorkerHome)initCtx.lookup("java:comp/env/ejb/worker");
	}catch(Exception ex) {
	    throw new CreateException("Could not get worker: " + ex);
	}

    }

    public void ejbRemove() {ctx=null;}

    public void onMessage(Message message) {
	try {
	    Worker worker = workerHome.create();

	    // Get the message, here we could have an ObjectMessage containg
	    // an object of a known class. We use Text here for simplicity
	    if (message instanceof TextMessage) {
		TextMessage m = (TextMessage)message;
		worker.doWork(m.getText());
	    }
	    
	}catch(Exception ex) {
	    throw new EJBException("Could not call worker " + ex);
	}

    }
} 

To deploy this into JBoss we need to write two deployment descriptors, one standard ejb, and one for JBoss. Lets begin with the standard one. For ease of use we include both beans into one jar. For the Message Driven Bean we have to decide if its a topic or not, and what kind of transactions it should run under. In this case we chose a topic and container managed transaction. We also have to specify an ejb-ref so that the listener can lookup the home of the WorkerBean:

     <message-driven>
       <ejb-name>ListenerBean</ejb-name>
       <ejb-class>msgbean.server.ListenerBean</ejb-class>
       <message-selector></message-selector>
       <transaction-type>Container</transaction-type>
       <ejb-ref>
         <description>The Workers home</description>
	 <ejb-ref-name>ejb/worker</ejb-ref-name>
	 <ejb-ref-type>Session</ejb-ref-type>
         <ejb-link>WorkerBean</ejb-link>
	 <home>msgbean.interfaces.WorkerHome</home>
	 <remote>msgbean.interfaces.Worker</remote>
       </ejb-ref>
       <message-driven-destination>
         <destination-type>javax.jms.Topic</destination-type>
         <subscription-durability>NonDurable</subscription-durability>
       </message-driven-destination>
     </message-driven>

We also have to ad an entry for the Worker bean:

     <session>
       <description>Worker bean</description>
       <display-name>Worker</display-name>
       <ejb-name>WorkerBean</ejb-name>
       <home>msgbean.interfaces.WorkerHome</home>
       <remote>msgbean.interfaces.Worker</remote>
       <ejb-class>msgbean.server.WorkerBean</ejb-class>
       <session-type>Stateless</session-type>
       <transaction-type>Container</transaction-type>
      </session>

Here is the complete deployment descriptor, including defintions of the transaction type.

     <!DOCTYPE ejb-jar>
     <ejb-jar>
       <enterprise-beans>

        <message-driven>
	  <ejb-name>ListenerBean</ejb-name>
	  <ejb-class>msgbean.server.ListenerBean</ejb-class>
          <message-selector></message-selector>
          <transaction-type>Container</transaction-type>
          <ejb-ref>
	    <description>The Workers home</description>
	    <ejb-ref-name>ejb/worker</ejb-ref-name>
	    <ejb-ref-type>Session</ejb-ref-type>
            <ejb-link>WorkerBean</ejb-link>
	    <home>msgbean.interfaces.WorkerHome</home>
	    <remote>msgbean.interfaces.Worker</remote>
           </ejb-ref>
           <message-driven-destination>
            <destination-type>javax.jms.Topic</destination-type>
            <subscription-durability>NonDurable</subscription-durability>
           </message-driven-destination>
         </message-driven>
         <session>
           <description>Worker bean</description>
           <display-name>Worker</display-name>
           <ejb-name>WorkerBean</ejb-name>
           <home>msgbean.interfaces.WorkerHome</home>
           <remote>msgbean.interfaces.Worker</remote>
           <ejb-class>msgbean.server.WorkerBean</ejb-class>
           <session-type>Stateless</session-type>
           <transaction-type>Container</transaction-type>
         </session>
       </enterprise-beans>
       <assembly-descriptor>
       <container-transaction>
         <method>
           <ejb-name>ListenerBean</ejb-name>
           <method-name>*</method-name>
         </method>
         <trans-attribute>Required</trans-attribute>
       </container-transaction>
       <container-transaction>
         <method>
	   <ejb-name>WorkerBean</ejb-name>
	   <method-intf>Remote</method-intf>
	   <method-name>*</method-name>
         </method>
         <trans-attribute>Required</trans-attribute>
       </container-transaction>
     </assembly-descriptor>
   </ejb-jar>

We also need to write a jboss.xml deployment descriptor. This is needed because it the destination JNDI name must be defined somewhere. Here we can not use on a default.

     <?xml version="1.0" encoding="Cp1252"?>
     <jboss>
       <enterprise-beans>
         <message-driven>
           <ejb-name>ListenerBean</ejb-name>
           <configuration-name>Standard Message Driven Bean</configuration-name>
           <destination-jndi-name>topic/testTopic</destination-jndi-name>
         </message-driven>
         <secure>false</secure>
       </enterprise-beans>
     </jboss>

We then have to compile the beans. You will have to add several jar-files to your class-path to have success with this. Among the most important, and special for MDB, is that you will need ejb2.0 from lib/ext to be able to compile. You also need to pack them into a jar-file. Do this as described above. Deploy by copying the jar-file to deploy. You may use the example publisher above to publish messages to the bean.

The adapter pattern

Another way to use MDB is as an adapter, for example between different messaging systems. This is rewarding, especially if you have an J2EE Connector resource adapter for the other system, since you then will get a very effective, multithreaded and pooled system, without any advanced programing. Since I have written such a resource adapter for the messaging server XmlBlaster we will here look at one way to adapt between JMS and XmlBlaster .

The adapter uses an MDB to subscribe to a topic. It then republish it to an XmlBlaster server through the XmlBlasterK2 J2EE Connector adapter.The code is pretty straight forward. A J2EE Connector resource adapter must be deployed into the JBoss server. It is then referenced from within the bean the same way you would reference a JDBC resource. You would look it up this way:

factory = (BlasterConnectionFactory)new InitialContext ().lookup ("java:comp/env/xmlBlaster");

And use it this way:

  con = factory.getConnection();
  // Construct Blaster Headers
  String key ="<key oid=\"" + message.getJMSMessageID() +"\" contentMime=\"text/xml\"></key>";
  String qos = "<qos></qos>";
  con.publish( new MessageUnit(key,msg.getBytes(),qos));

The complete bean is pretty straight forward (almost the same code could be used to write directly to a database for example):

package javaclients.j2ee.k2;

import javax.naming.InitialContext;
import javax.naming.NamingException;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.ejb.EJBException;

import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;

import javax.resource.ResourceException;

import org.xmlBlaster.j2ee.k2.client.*;

import org.xmlBlaster.util.XmlBlasterException;
import org.xmlBlaster.engine.helper.MessageUnit;

public class JmsAdapter implements MessageDrivenBean, MessageListener{
    private MessageDrivenContext ctx = null;
    private BlasterConnectionFactory factory = null;
    public JmsAdapter() {
	
    }
    public void setMessageDrivenContext(MessageDrivenContext ctx)
	throws EJBException {
	this.ctx = ctx;
	try {
	    factory = (BlasterConnectionFactory)new InitialContext ().lookup ("java:comp/env/xmlBlaster");
	} catch (NamingException ex) {
	    throw new EJBException ("XmlBlaster not found: "+ex.getMessage ());
	}catch(Throwable th) {
	    System.err.println("Throwable: " +th);
	    th.printStackTrace();
	    throw new EJBException("Throwable in setContext: " +th);
	}

    }
    
    public void ejbCreate() {}

    public void ejbRemove() {ctx=null;}

    public void onMessage(Message message) {

	BlasterConnection con = null;
	try {
	    // Get message to handle
	    System.err.println("Got message: " + message);

	    if (message instanceof TextMessage) {
		 String msg = ((TextMessage)message).getText();

		 // Get connection
		 con = factory.getConnection();
		 
		 // Construct Blaster Headers - howto hanlde key here?
		 String key ="<key oid=\"" + message.getJMSMessageID() +"\" contentMime=\"text/xml\"></key>";
		 String qos = "<qos></qos>";
		 con.publish( new MessageUnit(key,msg.getBytes(),qos));
		 
	    } else {
		System.err.println("Got message type I cant handle");
	    }
	    
	}catch(ResourceException re) {
	    System.err.println("Resource ex: " +re);
	    re.printStackTrace();
	} catch(XmlBlasterException be) {
	    System.err.println("Blaster ex: " +be);
	    be.printStackTrace();
	}catch(JMSException je) {
	    System.err.println("JMSException ex: " +je);
	    je.printStackTrace();
	}catch(Throwable th) {
	    System.err.println("Throwable: " +th);
	    th.printStackTrace();
	    
	}finally {   
	    try {
		if (con != null)
		    con.close ();
	    }
	    catch (Exception ex) {}
	    
	}
    }
} // MessageBeanImpl

The deployment descriptors for this bean follows the normal way to write them, except that you will have to add entries for the resource-ref. Here is the ejb deployment descriptor:

     <?xml version="1.0"?>
     <!DOCTYPE ejb-jar>
     <ejb-jar>
       <enterprise-beans>
         <message-driven>
	   <ejb-name>JmsAdapter</ejb-name>
	   <ejb-class>javaclients.j2ee.k2.JmsAdapter</ejb-class>
           <message-selector></message-selector>
           <transaction-type>Container</transaction-type>
           <resource-ref>
             <res-ref-name>xmlBlaster</res-ref-name>
             <res-type>org.xmlBlaster.j2ee.k2.client.BlasterConnectionFactory</res-type>
             <res-auth>Container</res-auth>
           </resource-ref>
         <message-driven-destination>
           <destination-type>javax.jms.Topic</destination-type>
           <subscription-durability>NonDurable</subscription-durability>
         </message-driven-destination>
       </message-driven>
     </enterprise-beans>
     <assembly-descriptor>
       <container-transaction>
         <method>
           <ejb-name>JmsAdapter</ejb-name>
           <method-name>*</method-name>
         </method>
         <trans-attribute>Required</trans-attribute>
       </container-transaction>
     </assembly-descriptor>
   </ejb-jar>

And here is the jboss.xml descriptor:

     <?xml version="1.0" encoding="Cp1252"?>

     <jboss>
       <resource-managers>
         <resource-manager>
           <res-name>xmlBlaster</res-name>
           <res-jndi-name>java:/XmlBlasterDS</res-jndi-name>
         </resource-manager>
       </resource-managers>
       <enterprise-beans>
         <message-driven>
           <ejb-name>JmsAdapter</ejb-name>
	   <configuration-name>Standrad Message Driven Bean</configuration-name>

           <destination-jndi-name>topic/testTopic</destination-jndi-name>
           <resource-ref>
             <res-ref-name>xmlBlaster</res-ref-name>
             <resource-name>xmlBlaster</resource-name>
           </resource-ref>
         </message-driven>
         <secure>false</secure>
       </enterprise-beans>
     </jboss>