Как стать автором
Обновить

Multi connection IBM MQ с использованием Spring

Время на прочтение 6 мин
Количество просмотров 7.5K
Приведу пример как сконфигурировать несколько endpoints для подключения к IBM MQ.

Цель:

  • читать из нескольких очередей, именованных одинаково, но находящихся на разных хостах/администраторах очередей
  • писать ответ в рандомно определенную ноду

0. Будем считать, что вы на данный момент уже развернули MQ или пользуетесь чьей-то.

1. Подгружаем зависимости в проект:

maven
<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>mq-jms-spring-boot-starter</artifactId>
    <version>2.3.3</version>
</dependency>


gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'


2. Создаем конфиг, вводим параметры подключения ваших точек (вы же их создали уже?). Используем массив, поэтому подключений может быть сколь угодно много.

mq:
  servers:
    - queueManager: QM1
      channel: DEV.ADMIN.SVRCONN
      connName: ibmmq.ru(1414)
      user: admin
      password: passw0rd
    - queueManager: QM2
      channel: DEV.ADMIN.SVRCONN
      connName: ibmmq.ru(1415)
      user: admin
      password: passw0rd
  queue1: QUEUE1
  queue2: QUEUE2

3. Создаем классы для считывания этих пропертей:

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "mq")
@EqualsAndHashCode(callSuper = false)
@Data
public class MqConfig {
    private List<ConnectionConfiguration> servers;
    private String queue1;
    private String queue2;

}

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = false)
public class ConnectionConfiguration {
    String queueManager;
    String channel;
    String connName;
    String user;
    String password;
}

4. Создаем слушателя:

import javax.jms.MessageListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqListener implements MessageListener {

    @SneakyThrows
    @Override
    public void onMessage(@Payload javax.jms.Message message) {
        log.info("Получено сообщение <" + message + ">");
        //TODO: сюда добавим отправку ответа чуть позже
    }

5. Конфигурируем! Определяем коннекшионФактори для каждого элемента массива из yml-пропертей. Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты. Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories.


import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.converter.SimpleMessageConverter;

import javax.jms.*;
import java.util.*;

import static javax.jms.DeliveryMode.NON_PERSISTENT;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;

@Configuration
@EnableJms
@Slf4j
public class MqConfiguration {

    @Autowired
    MqConfig mqConfig;

    @Autowired
    private JmsListenerEndpointRegistry registry;

//Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories
    @Bean
    public List<JmsListenerContainerFactory> myFactories(
            @Qualifier("myConnFactories") 
            List<CachingConnectionFactory> connectionFactories,
            MqListener mqListener) {
        List<JmsListenerContainerFactory> factories = new ArrayList<>();
        connectionFactories.forEach(connectionFactory -> {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);

            QosSettings qosSettings = new QosSettings();
            qosSettings.setDeliveryMode(NON_PERSISTENT);
            factory.setReplyQosSettings(qosSettings);

            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());
            endpoint.setDestination(mqConfig.getQueue1());
            endpoint.setMessageListener(mqListener);
            registry.registerListenerContainer(endpoint, factory);

            factories.add(factory);
        });
        return factories;
    }
//Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты
    @Bean
    @Qualifier("myJmsTemplates")
    public List<JmsTemplate> jmsTemplates(
            @Qualifier("myConnFactories") 
            List<CachingConnectionFactory> connectionFactories) {
        return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));
    }

    public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {
        List<JmsTemplate> jmsTemplates = new ArrayList<>();
        for (ConnectionFactory connectionFactory : connectionFactories) {
            JmsTemplate jmsTemplate = new JmsTemplate();
            jmsTemplate.setConnectionFactory(connectionFactory);
            jmsTemplate.setMessageConverter(new SimpleMessageConverter());
            jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());
            jmsTemplate.setDeliveryMode(NON_PERSISTENT);
            jmsTemplate.setDeliveryPersistent(false);
            jmsTemplate.setExplicitQosEnabled(true);
            jmsTemplates.add(jmsTemplate);
        }
        return jmsTemplates;
    }

//Определяем коннекшионФактори для каждого элемента массива из yml-пропертей
    @Bean
    @Qualifier("myConnFactories")
    public List<CachingConnectionFactory> connectionFactories() throws JMSException {
        List<CachingConnectionFactory> factories = new ArrayList<>();

        for (ConnectionConfiguration server : mqConfig.getServers()) {
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            MQConnectionFactory cf = new MQConnectionFactory();
            cachingConnectionFactory.setTargetConnectionFactory(cf);
            cf.setQueueManager(server.getQueueManager());
            cf.setChannel(server.getChannel());
            cf.setConnectionNameList(server.getConnName());
            cf.setStringProperty(WMQConstants.USERID, server.getUser());
            cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());
            cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");

            factories.add(cachingConnectionFactory);
        }
        return factories;
    }

}

endpoint.setMessageListener(mqListener);
Здесь указываем слушателя (которого создали в п.4), чтобы определить действия при приеме сообщения.

6. Создадим сервисный слой, где допустим будет какая-то логика и после отправка ответа.

import javax.jms.TextMessage;

public interface MqService {

    void sendToMq(TextMessage msg);

}

import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MqServiceImpl implements MqService {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    @Qualifier("myJmsTemplates")
    List<JmsTemplate> jmsTemplates;


    @Override
    public void sendToMq(TextMessage msg ) {
        //какая-то логика
        //рандомным образом определяем в какую ноду/темплейт отправлять сообщение.
        int maxIndex = jmsTemplates.size()-1; // Конечное значение диапазона - "до"
        int randomNumber = (int) Math.round(Math.random() * maxIndex);
        jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);
    }
}

7. Добавляем отправку ответа в слушатель:

    @Autowired
    MqService mqService;

    @SneakyThrows
    @Override
    public void onMessage(@Payload javax.jms.Message message) {
        log.info("Получено сообщение <" + message + ">");
        mqService.sentToMq((TextMessage) message);
    }


Вуаля, готово, можно проверять.
Теги:
Хабы:
+3
Комментарии 4
Комментарии Комментарии 4

Публикации

Истории

Работа

Java разработчик
359 вакансий

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн
PG Bootcamp 2024
Дата 16 апреля
Время 09:30 – 21:00
Место
Минск Онлайн
EvaConf 2024
Дата 16 апреля
Время 11:00 – 16:00
Место
Москва Онлайн