Версия для печати темы
Нажмите сюда для просмотра этой темы в оригинальном формате
Форум программистов > Java EE (J2EE) и Spring > RabbitMq гарантированная доставка сообщения


Автор: 4epT 18.12.2012, 13:46
Всем привет. В проекте два модуля общаются между собой через очередь (RabbitMq). Использую spring amqp для работы с RabbitMq.

Собственно вопрос, как осуществить проверку что сообщение ушло именно туда куда нужно и что его вторая сторона сможет прочитать ?

Автор: MisterCleric 18.12.2012, 14:07
Привет.
Цитата

 ушло именно туда куда нужно

Это можно узнать с помощью RabbitTemplate#returnCallback & RabbitTemplate#mandatory = true.
Что означает, что твои сообщения являются важными и ты получишь отбой, если брокер не смог отроутить твое сообщение.
Цитата

то его вторая сторона сможет прочитать 

Это уже не есть проблема твоего сендера. С твоей стороны важно доставить сообщение в конкретную очередь(очереди) по указанному routingKey.
Если ты сообщение доставил, то уже никак не сможешь понять сможет ли его листененр прочитать.
В этом и есть принцип сообщений: доставили и забыли.
сендер и ресивер абсолютно не должны знать о друг друге и они развязаны через посредника брокера: такой себе loosely-coupled.

С другой стороны у RabbitTemplate есть метод sendAndReceive, с помощью которого ты можешь организовать шаблон gateway, но тогда и твой ресивер, тоже должен обеспечить возврат ответа.
Но это все-равно тебя не спасает от ошибки "no route", что успешно можно поймать returnCallback'ом.

http://static.springsource.org/spring-amqp/reference/html/amqp.html#amqp-template

Автор: 4epT 18.12.2012, 15:05
Спасибо большое за ответ. Я что то не пойму где установить mandatory в true и установить returnCallback.

Использую RabbitTemplate (org.springframework.amqp.rabbit.core), сообщения отправляю вот так:

Код

template.convertAndSend(text);


Так же в CachingConnectionFactory, нету property publisherReturns.

Можно какой нибудь пример ?)

Автор: MisterCleric 18.12.2012, 15:59
Цитата

где установить mandatory в true и установить returnCallback

Все ж зависит от того, какую версию Spring-AMQP ты используешь. Эта фича появилась в версии 1.1: https://jira.springsource.org/browse/AMQP-213

Автор: 4epT 18.12.2012, 16:22
Да, точно. Использовал 1.0.

Тогда на солько я понял:

1) делаем
Код

connectionFactory.setPublisherReturns(true);

2) делаем
Код

template.setMandatory(true);

3) делаем
Код

template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                //...
            }
        });


И если сообщение не получилось положить, то свалиться эксепшен ?

Все ли эти три пункта обязательны ?

Когда сработает returnedMessage и для что нужно писать в реализации ?

Автор: MisterCleric 18.12.2012, 16:40
Цитата

И если сообщение не получилось положить, то свалиться эксепшен ?

Это зависит от реализации
Код

 public void returnedMessage(Message message, int i, String s, String s1, String s2) {

Цитата

Все ли эти три пункта обязательны ?

Да
Цитата

Когда сработает returnedMessage 

См. исходники RabbitTemplate. И вообще рекомендую читать исходники библиотек, которые используете: много чего полезного можно для себя найти

Все остальное правильно.
Здесь подобный нашему дискашн: http://forum.springsource.org/showthread.php?127065

Автор: 4epT 19.12.2012, 11:34
Спасибо большое за помощь. Как только разберусь, опишу в этой теме кратко что и как... может кому нибудь будет полезно в будущем.

Автор: 4epT 19.12.2012, 16:53
Поразбирался.. посмотрел исходники, даже получилось кое что набросать, но есть пару не понятных вещей.

returnedMessage - словит сообщение, которое не удалось отроутить.

У меня есть 1 Exchange, 1 Routing key и 2 очереди. В одну очередь пишу (Queue_for_send), а другую слушаю (Queue_for_listen). Вот так делаю биндинг:

Код

rabbitAdmin.declareBinding(BindingBuilder.bind(queueForSend).to(exchange).with("defaultRoutingKey").noargs());


1) Слушателя биндить никуда не нужно ?

2) exchange это и есть брокер? Или я не так себе представляю это ?)

3) Конфигурация rabbitmq происходит в момент создания бина RabbitAdmin ?

Автор: MisterCleric 21.12.2012, 20:58
Привет, сорри, что поздно возвращаюсь: болел.
Давай с самого начала:
Цитата

У меня есть 1 Exchange, 1 Routing key и 2 очереди

Это допустимо, если твой Exchange is Topic: http://www.rabbitmq.com/tutorials/tutorial-five-java.html
Цитата

В одну очередь пишу (Queue_for_send), а другую слушаю (Queue_for_listen)

Тут вообще не понял: если есть очередь, то кто-то в нее пишет, а кто-то из нее читает.
Опиши, плз, свою задачу: чего ты хочешь достигнуть с точки зрения архитектуры?
Да, и в очередь никто не пишет, по AMQP: есть Exchange и есть RoutingKey, по которому данный Exchange и решит, куда передавать сообщение.

Тепрь по твоим вопросам:
Цитата

1) Слушателя биндить никуда не нужно ?

Нет, тебе достаточно сконфигурить на клиенте (слушателе) листенер-контейнер, ну и все, что он попросит.
Цитата

2) exchange это и есть брокер? Или я не так себе представляю это ?)

Брокер, это и есть сервер AMQP, ну или если идти более широко, то это тот сервер, где размещаются очереди и хранилище сообщений. Это из теории.
Цитата

3) Конфигурация rabbitmq происходит в момент создания бина RabbitAdmin ?

Примерно так, но это происходит только один раз: если на брокере все уже сконфигурировано, то твой RabbitAdmin тихонько эти все пропустит.
Но если ты как-то радикально поменял настройки очередей или обменников(e.g. добавил аргумент в очередь 'x-dead-letter-exchange'), то может даже отругаться.

Да, мы обсуждаем вопросы Spring-AMQP здесь: http://forum.springsource.org/forumdisplay.php?74-AMQP
Там получишь рекомендацию из первых рук. Это я тебе говорю, как один из разработчиков Spring.
Да-а-а, совсем себя не прорекламировал...

Автор: 4epT 24.12.2012, 12:53
Цитата

Это допустимо, если твой Exchange is Topic: http://www.rabbitmq.com/tutorials/tutorial-five-java.html


Да, мой Exchange - Topic.

Цитата

Тут вообще не понял: если есть очередь, то кто-то в нее пишет, а кто-то из нее читает.
Опиши, плз, свою задачу: чего ты хочешь достигнуть с точки зрения архитектуры?


Делаю асинхронную обработку некоторых событий. То есть есть сервис для приема сообщений (заявок), на этой стороне сообщения обрабатываются и кладутся в очередь (Queue_for_send) на обработку. Другое приложение обрабатывает поступившие сообщения и кладет ответ в другую очередь (Queue_for_listen), которую я слушаю.

Цитата

Нет, тебе достаточно сконфигурить на клиенте (слушателе) листенер-контейнер, ну и все, что он попросит.


Понятно, спасибо  smile 

Цитата

Брокер, это и есть сервер AMQP, ну или если идти более широко, то это тот сервер, где размещаются очереди и хранилище сообщений. Это из теории.


Понятно, спасибо  smile 

Цитата

Примерно так, но это происходит только один раз: если на брокере все уже сконфигурировано, то твой RabbitAdmin тихонько эти все пропустит.
Но если ты как-то радикально поменял настройки очередей или обменников(e.g. добавил аргумент в очередь 'x-dead-letter-exchange'), то может даже отругаться.


Тут вот в чем дело, допустим приложение нормально работает и в какой то момент времени падает очередь. Нужно ее переподнять (или переключить на другую), но нужно сконфигурировать настройки очередей и обменников, то есть связать exchange с очередью и роутинг кей. Но в таком случае RabbitAdmin уже проинициализирован, как быть в такой ситуации ?

Цитата

Да, мы обсуждаем вопросы Spring-AMQP здесь: http://forum.springsource.org/forumdisplay.php?74-AMQP
Там получишь рекомендацию из первых рук.


Я бы с радостью, но боюсь что с моим англ. вы меня слабо поймете )

p.s. Выздоравливай!)))
 

Автор: MisterCleric 24.12.2012, 21:01
Так я не понял: у тебя все рабюотает, как ожидалось или что-то еще осталось?
Цитата

но нужно сконфигурировать настройки очередей и обменников, то есть связать exchange с очередью и роутинг кей

Счас исходников нет под рукой, но кажись ты это спокойно можешь делать используя RabbitAdmin бин напрямую.
Это с одной стороны.
С другой всегда все эти конфигурационные вещи можно делать прямо на сервере RabbitMQ или через удобный web-интерфейс: http://www.rabbitmq.com/management.html

Автор: 4epT 25.12.2012, 00:06
Сейчас уже все работает, это я просто интересуюсь в тех моментах которые не совсем понятны.

Да, можно и напрямую через веб интерфейс, но там каждая минута на счету ...

Автор: 4epT 25.12.2012, 16:35
И такой еще вопрос. Я могу как то сделать "гарантированную обработку" ? То есть, слушатель получил сообщение из очереди, начал его обработку, если все ок, то он помечает сообщение что оно отоработано, если упал эксепшен, то сообщение остается в очереди на обработке.

Автор: MisterCleric 25.12.2012, 16:51
Цитата

Я могу как то сделать "гарантированную обработку" ? 

Сделай свой листенер транзакционным: http://static.springsource.org/spring-amqp/reference/htmlsingle/#d4e415

Автор: 4epT 25.12.2012, 18:33
 smile 

Класс!!! Спасибо )

И последний вопрос не по теме.

Я могу как то спрингом обернуть некотороый метод в транзакцию ? в методе происходит цепочка событий (работа с mongodb, redis, rabbitmq и т..д), и в случае эксепшена на одном из событий, откат всех изменений (подчистка redis, подчистка БД и т.п.)  smile  или такое делается только в ручную ?

Автор: MisterCleric 25.12.2012, 20:17
Цитата

как то спрингом обернуть некотороый метод в транзакцию 

Не вопрос: @Transactional тебе в руки.
http://static.springsource.org/spring/docs/3.2.x/spring-framework-reference/html/transaction.html
Если будут дальше вопросы по транзакциям, то лучше открывай новую тему.

Да, и раз ты говоришь, что также юзаешь mongodb, redis, то посмотри в мощный Framework, который предоставляет высокую абстракцию по интеграции всего этого в одном месте:
http://www.springsource.org/spring-integration

Автор: 4epT 26.12.2012, 11:37
Цитата

Да, и раз ты говоришь, что также юзаешь mongodb, redis, то посмотри в мощный Framework, который предоставляет высокую абстракцию по интеграции всего этого в одном месте:
http://www.springsource.org/spring-integration 


Уже использую)) спасибо!

Powered by Invision Power Board (http://www.invisionboard.com)
© Invision Power Services (http://www.invisionpower.com)