Как стать автором
Обновить
823.74
OTUS
Цифровые навыки от ведущих экспертов

Микросервисы с коммуникацией через Axon

Время на прочтение 5 мин
Количество просмотров 6.7K
В этом простом туториале мы сделаем пару микросервисов на Spring Boot и организуем между ними взаимодействие через фреймворк Axon.





Допустим у нас такая задача.

Есть источник сделок на фондовом рынке. Этот источник передает нам сделки по Rest-интерфейсу.

Нам надо эти сделки получить, сохранить в базу данных и сделать удобное in-memory хранилище.

Это хранилище должны выполнять следующие функции:

  • возвращать список трейдов;
  • возвращать полную позицию, т.е. таблицу «инструмент» — «текущее кол-во бумаг»;
  • возвращать позицию по заданному инструменту.

Как мы подойдем к решению этой задачи?

По заветам микросервисной моды, нам надо разделить задачу на составляющие микросервисы:

  • получение по Rest-у сделки;
  • сохранение сделки в базу данных;
  • in-memory хранилище для представления данных по позиции.

Давайте в рамках этого туториала сделаем первый и третий сервис, а второй оставим на вторую часть (напишите в комментариях если это интересно).

Итак, у нас есть два микросервиса.

Первый получает данных извне.

Второй эти данные обрабатывает и отвечает на входящие запросы.

Мы конечно хотим получить горизонтальное масштабирование, безостановочное обновление и прочие достоинства микросервисов.

Какая, весьма непростая, задача перед нами стоит?

Вообще-то их много, но сейчас давайте поговорим, как между этими микросервисами будут идти данные. Между ними можно тоже сделать Rest, можно поставить какую-нибудь очередь, можно много чего придумать со своими плюсами и минусами.

Давайте рассмотрим один из возможных подходов – ассинхронное взаимодействие через Axon-фреймворк.

Какие плюсы такого решения?

Во-первых, ассинхронное взаимодействие увеличивает гибкость (да, есть тут и минус, но мы пока ведь только о плюсах).

Во-вторых, прямо из коробки мы получаем Event Sourcing и CQRS.
В-третьих, Axon предоставляет готовую инфраструктуру, и нам надо сосредоточиться только на разработке бизнес-логики.

Приступим.

Проект у нас будет на gradle. В нем будет три модуля:

  • common. модуль с общими структурами данных (мы не любим копипасту);
  • tradeCreator. модуль с микросервисом для приема сделок по Rest;
  • tradeQueries. модуль с микросерсисом для отображения позиции.

Возьмем Spring Boot за основу и подключим стартер Axon-а.

Axon отлично работает и без Spring, но мы будем их использовать вместе.

Тут надо остановиться и пару слов рассказать про Axon.

Это клиент-серверная система. Есть сервер – это отдельно приложение, мы его будем запускать в докере.

И есть клиенты, которые встраиваются в микросервисы.
Получается такая картина. Сначала запускается Axon-сервер (в докере), потом наши микросервисы.

При старте микросервисы ищут сервер и начинают с ним взаимодействовать. Взаимодействие условно можно разделить на два вида: техническое и бизнесовое.

Техническое – это обмен сообщениями «я живой» (такие сообщения можно увидеть в режиме логирования debug).

Бизнесовое – это обмет сообщениями вроде «новая сделка».

Важная особенность, после запуска микросервис может спросить у Axon-сервера «что произошло» и сервер передает микросервису накопленные события. Таким образом, микросервис относительно безопасно может быть перезапущен без потери данных.
При такой схеме обмена мы можем очень просто запускать много экземпляров микросервисов,
причем на разных хостах.

Да, один экземпляр Axon-сервера – это не надежно, но пока так.

Мы работаем в парадигмах Event Sourcing и CQRS. Это значит, что у нас должны быть «команды», «события» и «выборки».

У нас будет одна команда: «создать сделку», одно событие «сделка создана» и три выборки: «показать все сделки», «показать позицию», «показать позицию по инструменту».

Схема работы получается такой:

  1. Микросервис tradeCreator принимает сделку по Rest.
  2. Микросервис tradeCreator создает команду «создать сделку» и отправляет ее в Axon-сервер.
  3. Axon-сервер принимает команду и пересылает команду заинтересованному получателю, в нашем случае это микросервис tradeCreator.
  4. Микросервис tradeCreator получает команду, формирует событие «сделка создана» и отправляет его Axon-серверу.
  5. Axon-сервер принимает событие и пересылает заинтересованным подписчикам.
  6. Сейчас у нас только один заинтересованный получатель – это микросервис tradeQueries.
  7. Микросервис tradeQueries получает событие и обновляет внутренние данные.

(Важно, что в момент формирования события Микросервис tradeQueries может быть не доступен, но как только он запустится, то сразу получит событие).

Да, axon-сервер стоит в центре коммуникций, все сообщения идут через него.

Давайте переходить к кодированию.

Чтобы не загромождать пост кодом, ниже я приведу только фрагменты, ссылка на пример целиком будет ниже.

Начнем с общего модуля common.

В нем общие части – это событие (class CreatedTradeEvent). Обратите внимание на наименование, по сути, это название команды, которая породила это событие, но в прошедшем времени. В прошедшем, т.к. сначала появляется команда, которая приводит к созданию события.

К другим общим структурам относятся классы для описания позиции (class Position), сделки (class Trade) и сторона сделки (enum Side), т.е. купля или продажа.

Переходим к модулю tradeCreator.

Этот модуль имеет Rest-интерфейс (class TradeController) для приема сделок.
Из полученной сделки формируется команда «создать сделку» и отправляется в axon-сервер.

    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }

Для обработки команды используется класс class TradeAggregate.
Чтобы Axon его нашел, ставим аннотацию @Aggregate.
Метод для обработки команды выглядит так (с сокращением):

    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }

Из команды формируется событие и отправляется на сервер.
Команда находится в классе CreateTradeCommand.

Теперь посмотрим на последний модуль tradeQueries.

Выборки описываются в пакете queries.
В этом модуле тоже есть Rest-интерфейс
public class TradeController.

Для примера посмотрим обработку запроса: «показать все сделки».

    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }

Создается запрос на выборку и отправляется на сервер.

Для обработки запроса на выборку используется класс TradesEventHandler.
В нем есть метод, отмеченный аннотацией

   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)

Именно он и отвечает за выборку данных из in-memory хранилища.

Возникает вопрос, как в этом хранилище обновляется информация.

Начнем с того, что это просто набор ConcurrentHashMap, заточенных под конкретные выборки.
Для их обновления применяется метод:

    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }

Он принимает событие «сделка создана» и обновляет Map-ы.

Это основные моменты разработки микросервисов.

Что можно сказать про недостатки Axon?

Во-первых, это усложнение инфраструктуры, появилась точка отказа – Axon-сервер, все коммуникации идут через него.

Во-вторых, весьма ярко проявляется недостаток подобных распределенных системы – временная несогласованность данных. В нашем случае между получением новой сделки и обновлением данных для выборок может пройти недопустимо много времени.

Что осталось за кадром?

Совсем ничего не сказано про Event Sourcing и CQRS, что это такое и для чего нужно.
Без раскрытия этих понятий некоторые моменты могли быть не понятны.

Возможно, отдельные фрагменты кода тоже требуют пояснение.

Об этом мы поговорили на открытом вебинаре.

Полный пример.
Теги:
Хабы:
+7
Комментарии 2
Комментарии Комментарии 2

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS