Andrea Girardi - It's my blog!

Tag: Apache Camel

Route a message to MongoDB

The requirement is very simple. Route an XML message from rabbitMQ to MongoDB. MongoDB BSON as the data storage and network transfer format for “documents”. BSON is a binary-encoded serialization of JSON-like documents. So, the source message is in a XML format, after getting it from rabbitMQ is necessary to translate into a JSON format compatible with MongoDB.

To translate a message from XML to JSON is possible to use Marshalling function available in Camel extension XmlJson. The pom.xml file needs this new entry:

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-xmljson</artifactId>
	<version>${org.camel.version}</version>
</dependency>

Of course, to send communicate with MongoDB another entry has to be added to pom.xml:

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-mongodb</artifactId>
	<version>${org.camel.version}</version>
</dependency>

Now, is possible to change the route on camel ApplicationContext file:

<!-- Camel route -->    
<camelContext xmlns="http://camel.apache.org/schema/spring">
	<dataFormats>
		<xmljson id="xmljson"/>
	</dataFormats>
	<route> 
		<from uri="spring-amqp:TPDirect:TPQueue:TPRouting?type=direct&amp;autodelete=true&amp;durable=true" />
		<marshal ref="xmljson"/>
		<log message="From XML to Json: DONE!" />
		<convertBodyTo type="java.lang.String"/>
		<to uri="mongodb:myDb?database=flights&amp;collection=tickets&amp;operation=save" />
	</route>                                 
</camelContext>
 
 
<!-- Mongo DB -->
<bean id="myDb" class="com.mongodb.Mongo">
	<constructor-arg index="0" value="localhost"/>
</bean>

Bean myDB contains the information to reach MongoDB. It’s also possible to define it using the full url:

<bean id="myDb" class="com.mongodb.Mongo"> 
    <constructor-arg index="0"> 
        <bean class="com.mongodb.MongoURI"> 
            <constructor-arg index="0" value="mongodb://username:password@host:port/db" /> 
        </bean> 
    </constructor-arg> 
</bean>

To avoid this exception:

Caused by: No type converter available to convert from type: byte[] to the required type: org.apache.camel.component.mongodb.converters.MongoDbBasicConverters with value

The JSON translated message has to be converted into String.

That’s all!

Camel and RabbitMQ : Finally, how to!

Define a RabbitMQ broker endpoint in Camel is possible with the Bluelock camel-spring-amqp (https://github.com/Bluelock/camel-spring-amqp) library. It’s an Apache Camel component that allow to natively communicate with a RabbitMQ broker and it’s implemented using Spring’s AMQP.

For first, with Eclipse IDE create a new Maven Project with Artifact ID camel-arthetype-spring. This allow to use Spring DSL to configure Camel route and excute the run:camel goal of Camel Mavel Pluing (Camel Maven Plugin) in a forked JVM from Maven.

To resolve the dependencies, these entries are mandatory:

<dependency>
	<groupId>com.bluelock</groupId>
	<artifactId>camel-spring-amqp</artifactId>
	<version>1.2</version>
</dependency>
 
<!-- Camel dependencies -->
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-test</artifactId>
	<version>${org.camel.version}</version>          
</dependency>
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-spring</artifactId>
	<version>${org.camel.version}</version>        
	<exclusions>
		<exclusion>
			<artifactId>spring-tx</artifactId>
			<groupId>org.springframework</groupId>
		</exclusion>
		<exclusion>
			<artifactId>spring-context</artifactId>
			<groupId>org.springframework</groupId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-xstream</artifactId>
	<version>${org.camel.version}</version>            
</dependency>

At this point edit the camel-context.xml available on src/main/resource/META-INF/spring folder:

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->
 
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<route>	
			<from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&amp;autodelete=true&amp;durable=true" />
			<log message="Message available on a RabbitMQ Queue" />			
			<process ref="processorTest" />
		</route>
	</camelContext>
 
	<rabbit:connection-factory id="amqpConnectionFactory" />
	<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
	<rabbit:admin connection-factory="amqpConnectionFactory"/>
 
	<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
	    <property name="host" value="10.211.55.20"/>
	    <property name="port" value="5672"/>
	    <property name="username" value="guest"/>
	    <property name="password" value="guest"/>
	    <property name="virtualHost" value="/"/>
	</bean>
 
    <bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
    <bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
    <bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
        <property name="converters">
            <map>
                <entry key="application/json" value-ref="jsonMessageConverter"/>
                <entry key="application/xml" value-ref="textMessageConverter"/>
            </map>
        </property>
        <property name="fallbackConverter" ref="textMessageConverter"/>
    </bean>
 
</beans>

In this case, a route starting from a RabbitMQ queue to a system log and a listener on your queue that will be active until you terminate your maven came:run process has been created.

It’s very important to note that, on Spring XML & has to be quoted &amp;

The following Java code should be used send a message to the Exchange defined on Camel Route:

@Test
public void test() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException {
	ConnectionFactory factory = new ConnectionFactory();
	factory.setHost("10.211.55.20");
	factory.setPort(5672);
	factory.setVirtualHost("/");
	factory.setUsername("guest");
	factory.setPassword("guest");
	Connection connection = factory.newConnection();
	Channel channel = connection.createChannel();
 
	channel.exchangeDeclare("KipcastDirect", "direct", 
		   true, 	/* durable */
		   true, 	/* autodelete */
		   null); 	/* */
 
	byte[] messageBodyBytes = "Hello, world!".getBytes();
 
	AMQP.BasicProperties.Builder basic = new AMQP.BasicProperties.Builder();
	AMQP.BasicProperties minBasic = basic.build();
 
	minBasic = basic.priority(0).deliveryMode(1).build();
 
	channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
	System.out.println(" [x] Sent ");
 
	channel.close();
}

To test if listener works, for first run the queue listener and, after that, run the Junit class to send message. The output will be:

[pache.camel.spring.Main.main()] MainSupport                    INFO  Apache Camel 2.10.3 starting
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Apache Camel 2.10.3 (CamelContext: camel-1) is starting
[pache.camel.spring.Main.main()] ManagementStrategyFactory      INFO  JMX enabled.
[pache.camel.spring.Main.main()] DefaultTypeConverter           INFO  Loaded 177 type converters
[pache.camel.spring.Main.main()] SpringAMQPComponent            INFO  Found AMQP ConnectionFactory in registry for 10.211.55.20
[pache.camel.spring.Main.main()] SpringAMQPComponent            INFO  Found AMQP Template in registry
[pache.camel.spring.Main.main()] SpringAMQPComponent            INFO  Found AMQP Administrator in registry
[pache.camel.spring.Main.main()] SpringAMQPEndpoint             INFO  Creating endpoint for KipcastDirect:KipcastQueue:KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer             INFO  Declared exchange KipcastDirect
[pache.camel.spring.Main.main()] SpringAMQPConsumer             INFO  Declared queue KipcastQueue
[pache.camel.spring.Main.main()] SpringAMQPConsumer             INFO  Declaring binding KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer             INFO  Started AMQP Async Listeners for spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&amp;durable=true&amp;type=direct
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Route: route1 started and consuming from: Endpoint[spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&amp;durable=true&amp;type=direct]
[pache.camel.spring.Main.main()] ultManagementLifecycleStrategy INFO  StatisticsLevel at All so enabling load performance statistics
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Total 1 routes, of which 1 is started.
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.505 seconds
[     SimpleAsyncTaskExecutor-1] route1                         INFO  Message available on a RabbitMQ Queue

At this point you can have fun with Camel and RabbitMQ!!!!!

NOTES:

1 – Please be carefoul: the URI (from and to) on Camel Spring DSL context and JUnit class must refer to same Exchange and Queue to prevent a reply-text=PRECONDITION_FAILED – parameters for queue ‘QUEUE’ in vhost ‘/’ not equivalen error or similar. To check the queues / exchanges configuration parameter use:

rabbitmqadmin -V / list queue
rabbitmqadmin -V test list exchanges

if you like this post, please click on the advertise :)

Copyright © 2017 Andrea Girardi – It's my blog!

Theme by Anders NorenUp ↑