Home kellton

Main navigation

  • Services
    • Digital Business Services
      • Digital Experience
        • Product Strategy & Consulting
        • Product Design
        • Product Management
      • Product Engineering
        • Digital Application Development
        • Mobile Engineering
        • IoT & Wearables Solutions
        • Quality Engineering
      • Data Engineering & AI
        • Data Engineering
        • Data Science & ML
        • Generative AI & ChatGPT
        • Visualisation & Analytics
        • Integration & API
        • RPA
      • Cloud Engineering
        • Cloud Consulting
        • Cloud Migration
        • Cloud Managed Services
        • DevSecOps
      • NextGen Services
        • Blockchain
        • Web3
        • Metaverse
    • SAP
      • SAP Services
        • S/4HANA Implementations
        • SAP AMS Support
        • SAP Automation
        • SAP Security & GRC
        • SAP Value Added Solutions
        • Other SAP Implementations
  • Platforms & Products
    • Kellton4Health
    • Kellton4NFT
    • Kellton4Commerce
    • KLGAME
    • tHRive
    • Optima
  • Industries
    • Fintech, Banking, Financial Services & Insurance
    • Retail, E-Commerce & Distribution
    • Pharma, Healthcare & Life Sciences
    • Non-Profit, Government & Education
    • Travel, Logistics & Hospitality
    • HiTech, SaaS, ISV & Communications
    • Manufacturing, Automotive & Chemicals
    • Oil,Gas & Mining
    • Energy & Utilities
  • Insights
    • Blogs
    • Brochures
    • Success Stories
    • News / Announcements
    • Webinars
    • White papers
  • Careers
    • Life At Kellton
    • Jobs
  • About
    • About Us
    • Our Partners
    • Our Leadership
    • Testimonials
    • Investors
    • Privacy-Policy
    • Contact Us
Search

Breadcrumb

  1. Home
  2. Blogs
  3. How to Integrate RabbitMQ with webMethods?

How to Integrate RabbitMQ with webMethods?

Data Engineering & AI
API Driven Development
January 14th , 2019
Posted By:
Nageswara Reddy Chintakuntla
linkedin
How to Integrate RabbitMQ with webMethods?

Related Post

Apple Vision Pro
Apple Vision Pro: A leap into the future of Spatial Computing
07 Jun, 2023
Thumbnail - Why should you run SAP on AWS?
Why should you run SAP on AWS?
02 Jun, 2023
Mobile App Development Cost
How Much Does Mobile App Development Cost in 2023?
31 May, 2023

RabbitMQ, a message broker, is enterprise-level message-queuing software. It’s equipped with multiple features for reliable delivery, routing, and federation to cater to extensible business requirements beyond the throughput. RabbitMQ currently powers 35000+ projects for startups and large enterprises. The fact that it can implement AMQP, an open wire protocol for messaging with powerful routing features, is what makes RabbitMQ highly popular for an open-source messaging queuing broker. It’s one of the earliest enterprise-grade messaging software to achieve quality compliance in terms of features, dev tools, client libraries, and quality documentation.

Java has always had messaging standards like JMS. However, it was a pain to find the right message broker for non-Java applications, which had distributed messaging, but limited to integration scenarios, monolithic or microservices. With the advent of AMQP, cross-language flexibility has become feasible for open-source message brokers.

Guide: RabbitMQ Message Broker Integration with Software AG's webMethods

RabbitMQ can be integrated into webMethods using Java Client programs. Here is a step-by-step guide to performing smooth integration.
 

Technologies used

  • webMethods Integration Server 9.X
  • RabbitMQ Server Version 3.7.4 with Erlang 20.3
  • Jar files to place in Integration Server classpath (rabbitmq-client.jar & amqp-client-5.5.1.jar)
     

RabbitMQ Webmethods Integration1


Once both webMethods Integration Server and RabbitMQ Server are installed, consumers on the Queue need to be created to listen to the messages from RabbitMQ and invoke flow service by passing data received in message payload to perform business logic in the flow service.

Receiving messages from RabbitMQ

Following are the steps to perform successful RabbitMQ integration with Java service:

Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from RabbitMQ to webMethods.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();

factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_Out_Queue", true, false, false, null);

Step 3: Create Consumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
String headers = properties.getHeaders().toString();
};
channel.basicConsume("RMQ_Out_Queue",true,consumer);

Step 4: Invoke flow service by passing inputs from Receive Client code
NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
Session s = StateManager.createContext(0x7fffffffL, "system", user);
s.setUser(user);
s.clearModified();
Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
StateManager.deleteContext(s.getSessionID());

Step 5: Implement flow service 
The flow service can perform actual business logic with the inputs passed from the received java client program.

RabbitMQ Webmethods Integration

Step 6: Publish message from RabbitMQ

Once the consumer is created for the queue from the Java client program, publish message on the same queue. 

RabbitMQ Webmethods Integration 1RabbitMQ Message Help

Step 7: Once the message is published successfully in RabbitMQ, the consumer created by the received java client program gets the message and passes it to the flow service. You can see the debug message printed in Server logs of Integration Server as shown below.

RabbitMQ Webmethods Integration 4

Send messages to RabbitMQ

Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from webMethods to RabbitMQ.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
                        
factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_In_Queue", true, false, false, null);

Step 3: Publish Message
String message = "Hello!! this message is from webMethods.";
channel.basicPublish("", "RMQ_In_Queue", new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
Once you run the Send client program successfully, you can see the message being sent to queue in RabbitMQ.

RabbitMQ Webmethods Integration Message

Complete Java Client code 

Receiver Client Code:

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class receiveMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
        String log="";
        String MsgFromQueue = "";
        String queueName = "RMQ_Out_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
        
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
        
            JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", "[**] Waiting for messages [**]");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    String headers = properties.getHeaders().toString();
        
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", message); 
                    IData idata = new IDataFactory().create();
                    IDataCursor idc = idata.getCursor();
                    IDataUtil.put(idc, "jsonString", headers+"::"+message);
                    IData pipelineIn = IDataUtil.clone(idata);
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "Before calling service.doInvoke()");
                    try {
                        NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
                        com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
                        Session s = StateManager.createContext(0x7fffffffL, "system", user);
                        s.setUser(user);
                        s.clearModified();
        
                        Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
                        StateManager.deleteContext(s.getSessionID());
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "After calling Service.doInvoke()");
                    } catch (Exception e) 
                    {
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"*** EXCEPTION***", e.toString());
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(queueName,true,consumer);            
            log = "Consumer created Successfully for RabbitMQ Queue";
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message",log );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getStackTrace());
            IDataUtil.put( pipelineCursor, "log-levl", log);
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
    }

    final static receiveMessage_SVC _instance = new receiveMessage_SVC();

    static receiveMessage_SVC _newInstance() { return new receiveMessage_SVC(); }

    static receiveMessage_SVC _cast(Object o) { return (receiveMessage_SVC)o; }

Sender Client Code:

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class sendMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
        String queueName = "RMQ_In_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("ID",  12345);
            headers.put("Name", "Kellton");
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
            String message = "Hello!! this message is from webMethods.";
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
            channel.close();
            connection.close();
        
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message", "Message Sent To RabbitMQ Successfully !" );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getLocalizedMessage() );
            IDataUtil.put( pipelineCursor, "StackTrace", e.getStackTrace() );
            IDataUtil.put( pipelineCursor, "ErrorString", e.toString());
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
    }

    final static sendMessage_SVC _instance = new sendMessage_SVC();

    static sendMessage_SVC _newInstance() { return new sendMessage_SVC(); }

    static sendMessage_SVC _cast(Object o) { return (sendMessage_SVC)o; }

}

Posted By:
Nageswara Reddy Chintakuntla
linkedin

Want to know more?

Generative AI and ChatGPT
Blog
Redefining the game: Generative AI and ChatGPT unlock new possibilities across industries
11 May, 2023
Thumbnail - AI in action: 5 ways it’s improving software development
Blog
AI in action: 5 ways it’s improving Software Development
03 May, 2023
Banner - How to unlock customer loyalty as an insurer
Blog
How to unlock customer loyalty as an insurer?
01 May, 2023

Leading you through Digital Transformation journey

North America: +1.844.469.8900

Asia: +91.124.469.8900

Europe: +44.203.807.6911

Email: ask@kellton.com

Footer menu right

  • Digital Experience
  • Data Engineering & AI
  • Nextgen Services
  • About
  • Contact

Footer Menu Left

  • Product Engineering
  • Cloud Engineering
  • SAP Services
  • Careers
  • Success Stories
clutch Badge
GoodFirms Badge

© 2023 Kellton