Wednesday, July 25, 2012

How to Get Started with ActiveMQ

http://www.openlogic.com/wazi/bid/188010


As your company grows, you may need an enterprise solution that facilitates passing information between applications and the employees that control them. A message broker is a program that routes, transforms, aggregates, or stores messages between applications based on rules you specify. In a way, what email does for the employees, message queuing software does for applications. Proprietary message brokers include Microsoft BizTalk Server, Oracle Message Broker, and SAP NetWeaver Process Integration. If you prefer open source software (and we know you do) consider Apache ActiveMQ.
ActiveMQ is intimately linked with Java, and fully implements the Java Message Service. JMS, a part of Java Enterprise Edition, is a standard allowing distributed application components to send, receive, read, and create messages between themselves. MQ software is based on a publish/subscribe model, where one side publishes a message and the other, if interested, subscribes to the message flow. Messages between components are passed indirectly, with the message queuing software acting as the broker. Message queuing software addresses some of the shortcomings of passing messages directly, such as the necessity of knowing who (if anyone) is receiving a sent message.
Though developers use ActiveMQ to create software for enterprise messaging using the Java classes JMS provides, you are not limited to using it with Java. Other alternatives include .Net, C/C++, Delphi, or scripting languages like Ruby, Perl, Python, and PHP.
For this article, we installed ActiveMQ on a CentOS 6.2 64-bit server with OpenJDK installed, version 1.6.0. Even though the ActiveMQ docs refer to version 4.x, the latest stable release is 5.6.0. Download and unpack it; use the binary distribution, not the source, to keep things simple. (There seems to be also a CentOS 5.x repository, but since we are running CentOS 6.2, we did not test it.) If you want to make the installation directory available to others, you can move it to /opt or another globally available location. Change to the resulting directory, then again to bin/linux-x86-64, and run ./activemq start. To test whether everything is running smoothly, grep netstat for 61616, the default port ActiveMQ listens on (netstat -an | grep 61616). You should be able to reach ActiveMQ's admin page from a browser at localhost:8161/admin.

Configuration via XML

Before going to the part that will be addressed to the developers writing ActiveMQ clients, a few words about configuration are in order. ActiveMQ uses XML files for many configuration tasks. If you look at the right side of the admin page, you will notice Queue, Topic, and Subscribers views, and their corresponding XML files, which now are empty. The XML configuration files are in the conf/ directory, and it's (almost) mandatory you take a peek, as they are very well commented and will get you started. The project's examples page shows you how to compile examples; it requires the Ant build tool.
The nice fellows at Javablogging have a straightforward example of how to create and listen to messages. The following code, for instance, effectively creates a queue, session, and message:
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    // URL of the JMS server. DEFAULT_BROKER_URL will just mean
    // that JMS server is on localhost
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the queue we will be sending messages to
    private static String subject = "TESTQUEUE";

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server and starting it
        ConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Destination represents here our queue 'TESTQUEUE' on the
        // JMS server. You don't have to do anything special on the
        // server to create it, it will be created automatically.
        Destination destination = session.createQueue(subject);

        // MessageProducer is used for sending messages (as opposed
        // to MessageConsumer which is used for receiving them)
        MessageProducer producer = session.createProducer(destination);

        // We will send a small text message saying 'Hello' in Japanese
        TextMessage message = session.createTextMessage("¤³¤ó¤Ë¤Á¤Ï");

        // Here we are sending the message!
        producer.send(message);
        System.out.println("Sent message '" + message.getText() + "'");

        connection.close();
    }
}
As you can see, it's simple to get started, even if you're not a Java master, though you should know Java if you want to do any serious work. The receiving component implementation looks like this:
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the queue we will receive messages from
    private static String subject = "TESTQUEUE";

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server
        ConnectionFactory connectionFactory
            = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Creating session for seding messages
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Getting the queue 'TESTQUEUE'
        Destination destination = session.createQueue(subject);

        // MessageConsumer is used for receiving (consuming) messages
        MessageConsumer consumer = session.createConsumer(destination);

        // Here we receive the message.
        // By default this call is blocking, which means it will wait
        // for a message to arrive on the queue.
        Message message = consumer.receive();

        // There are many types of Message and TextMessage
        // is just one of them. Producer sent us a TextMessage
        // so we must cast to it to get access to its .getText()
        // method.
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Received message '"
                + textMessage.getText() + "'");
        }
        connection.close();
    }
}
If you set up everything right, you should be able to go to the Queues submenu in the admin panel and see TESTQUEUE in the Name column.
I won't show you examples from every language ActiveMQ supports, but I will share some words regarding writing C++ code with and for ActiveMQ. Probably one of the best locations to start with is the Apache ActiveMQ source tree, from which I will extract some code from the two files that matter right now, namely producers/SimpleProducer.cpp for the sending part and, conversely, consumers/SimpleAsyncConsumer.cpp for the receiving part, both to be found in the source tree listed above.
//...the sending part...


// Create a Connection, after creating a connection factory
//...
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}

// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}

// Create a MessageProducer from the Session to the Topic or Queue
//.......
// Create the Thread Id String
//.....
// Create a message
string text = (string)"Hello world! from thread " + threadIdStr;

for( unsigned int ix=0; ixTextMessage* message = session->createTextMessage( text );

message->setIntProperty( "Integer", ix );

// Tell the producer to send the message
printf( "Sent message #%d from thread %s
", ix+1, threadIdStr.c_str() );
producer->send( message );
}
The code to be found in the link above is commented just enough that all you have to do is read it, then compile and run it for a pretty good understanding of how to write C++ code with ActiveMQ. Here comes the other side:
//...includes and fun stuff go here...
//declare variables...
//...
void runConsumer() {

try {
// Create a Connection, after creating a connection factory
//...
// Create a Session
//...
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}

// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );

} catch (CMSException& e) {
e.printStackTrace();
}
}
static int count = 0;

try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";

if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
//...
//Exception handling and cleanups go here...

// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );

// Start it up and it will listen forever.
consumer.runConsumer();
}

Conclusion

For more information on writing code in your favorite programming language, look no further than ActiveMQ's source tree. You can access it online or check out the repository with Subversion to your machine with a command like this: svn co https://svn.apache.org/repos/asf/activemq/trunk activemq. Be sure to have SSH installed on your machine, although that's almost a given on most Linux systems nowadays. And be sure to read the excellent documentation.

No comments:

Post a Comment