Messaging

JMS Spring-ben

A Spring a JMS támogatását template-en keresztül adja, melyet JmsTemplate-nek hívnak. A JMS-t támogató üzenetküldő rendszerek közül most az Apache ActiveMQ-t fogjuk használni, melynek két változata létezik

  • Apache ActiveMQ
  • Apache ActiveMQ Artemis

Az utóbbi egy újra implementálása az elsőnek, így ezt célszerű használni új projektekben:

Feladat

Készítsünk egy egyszerű point-to-point üzenet alapú rendszert, melyben Artemis-t használunk! Bár az üzenetküldés lényege pont az, hogy a küldő és a fogadó külön-külön projektben van, hogy növeljük a lazán csatoltságot, ugyanakkor az egyszerűség kedvéért most készítsük el ezt egy alkalmazáson belül!

1. lépés - Artemis telepítés

Ebben a lépésben az Artemis-t kell beszereznünk és telepítenünk. Az Artemis rendszer lesz az üzenetküldő rendszerünk, azaz ő adja broker-t. A szoftver letölthető a https://activemq.apache.org/components/artemis/download/ URL-ről. Töltsük le és csomagoljuk ki egy számunkra megfelelő helyre, majd próbáljuk is ki! Ehhez a következőket kell tennünk:

1
cd <ARTEMIS_INSTALL_DIR>\bin\

Ezután nyissunk egy parancssort/terminált, majd adjuk ki a következő parancsot!

1
artemis create test-broker

Ezzel a paranccsal létrehozunk egy új message broker-t, mely az üzenetküldést megvalósítja majd. A fenti parancs kiadása után a rendszer bekéri a felhasználónevet és jelszót, illetve azt, hogy az anonymous hozzáférés engedélyezett-e. Mi a következőket használtuk:

  • felhasználónév: artemis
  • jelszó: artemis
  • anonymous hozzáférések tiltása

A felhasználónév és jelszó megadására használhatjuk a következő parancssori paraméterek megadását is:

1
artemis create test-broker --user=artemis --password=artemis

Megjegyzés

A broker-t a számítógépen bárhova telepíthetjük, nem kell az artemis mappájába lennie.

A létrehozás után az artemis ki is írja, hogy hogyan lehet a broker elindítani:

1
<ARTEMIS_INSTALL_DIR>\test-broker\bin\artemis run

Amennyiben a háttérben futó win service-ként szeretnénk futtatni a broker-t, arra is lehetőségünk van a következőkkel:

1
2
<ARTEMIS_INSTALL_DIR>\test-broker\bin\artemis-service.exe install
<ARTEMIS_INSTALL_DIR>\test-broker\bin\artemis-service.exe start

Leállítani a

1
<ARTEMIS_INSTALL_DIR>\test-broker\bin\artemis-service.exe stop

utasítással állíthatjuk le a service-t.

Most a sima command line futtatást válasszuk, mivel így láthatunk sok-sok hasznos infot!

Az egyik hasznos információ a log alján olvasható:

  • HTTP Server started at http://localhost:8161

Látogassunk is el ide!

artemis_http_server

Itt menjünk a Management Console menüponthoz! Itt kéri majd be a rendszer az általunk a telepítéskor megadott felhasználónevet és jelszót. Amennyiben be tudtunk lépni, akkor kicsit ismerkedhetünk a felülettel, de most ezt félretesszük egy kicsit és megírjuk magát az alkalmazásunkat, amely ezt a broker-t fogja használni.

2. lépés - JMS alkalmazás megírása

Készítsünk egy új alkalmazást a Spring Initializr segítségével! A függőségek között adjuk meg az Artemis-t és a Web-et (továbbá a lombok-ot amennyiben ezt is használni szeretnénk)!

A pom.xml így a következőképpen fog kinézni (már ami ezt a két függőséget illeti):

1
2
3
4
5
6
7
8
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

Amennyiben a classpath-ba bekerül az artemis függőség, akkor alapértelmezetten localhost:61616-on fog futni az Artemis broker. Fejlesztés során ez így rendben is van, de production-ben jó ha ezeket tudjuk konfigurálni:

  • spring.artemis.host
  • spring.artemis.port
  • spring.artemis.user
  • spring.artemis.password

Az egyszerűség kedvéért az üzenetküldést és az üzenet kiolvasást is ugyanebben a projektben végezzük még most.

Elsőként készítsük el az üzenet küldéséért felelős komponenst!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Service
@Slf4j
public class SimpleMessageSender {

    private JmsTemplate jmsTemplate;

    public SimpleMessageSender(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Value("${jms.destination}")
    private String destination;

    public void send(String message) throws JmsException {
        jmsTemplate.convertAndSend(destination, message);
        log.info("Message sent: " + message);
    }
} 

Vegyük szépen sorjában, hogy mi történik itt. Az üzeneteink küldéséhez szükségünk lesz egy JmsTemplate-re, melyen keresztül a tényleges üzenetküldést tehetjük meg. A JmsTemplate a központi eleme a JMS integrációnak, mely nélkül saját magunknak kellene létrehoznunk a kapcsolatot és a session-t is. A fenti példában a convertAndSend() metódust használjuk. Emellett megtalálható egy másik metódus is, mely a send(). Mindkét üzenetküldő metódusnak különböző paraméterezése létezik. A send()-nek több esetben egy MessageCreator-ra van szüksége, mely a Message objektum előállításáért felelős.

A convertAndSend() metódus egy Object objektumot vár, melyet automatikusan átkonvertál egy Message objektummá. A convertAndSend()-nek megadhatunk egy MessagePostProcessor-t, melyet még a küldés előtt hív meg a rendszer, így tovább formálhatjuk az üzenetet olyan formájúra, amilyenre csak szeretnénk.

A fenti két metódusnak különböző változatai léteznek, melyek attól függően alkalmazhatóak, hogy hogyan adjuk meg az üzenet célpontját (queue vagy topic attól függően, hogy milyen üzenetküldési modellt alkalmazunk). Ezek alapján a következő 3 lehetőségünk van:

  • destination nélküli paraméterezés: ilyenkor a default destination-re küldjük az üzenetet
  • Destination objektum megadása
  • String alapú megadás: A célpontot a neve alapján azonosítjuk

Lássunk egy-két további példát az üzenetküldések megadására! Default queue-ba küldjük az üzenetet:

1
2
3
4
5
6
jmsTemplate.send(new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createObjectMessage(message);
    }
});

Itt egy anonymous inner class-ban adjuk meg a MessageCreator-t, melynek egyetlen createMessage() metódusa megkapja a Session-t. A megvalósításban a Session-ön keresztül hozzuk létre a Message objektumot, melybe belecsomagoljuk az egyszerű String üzenetünket. Mivel a MessageCreator egy Functional interface, így használhatunk lambdát is:

1
jmsTemplate.send(session -> session.createObjectMessage(message));

Mivel nincs megadva a queue, amibe az üzenetet szeretnénk küldeni, így szükséges megadnunk a default destination-t, melyet az application.properties állományban tehetünk meg:

1
spring.jms.template.default-destination=Q.Test

Amennyiben a Destination interface-t szeretnénk használni a küldések során, akkor azt talán a legegyszerűbb módon egy bean formájában tudjuk megtenni:

1
2
3
4
@Bean
public Destination orderQueue() {
    return new ActiveMQQueue("Q.Test");
}

Ezután ezt szabadon injektálhatjuk oda, ahova csak szeretnénk.

A fenti példák alapján látszódik, hogy a send() metódus használata is egyszerű, azonban a MessageCreator megadása sokszor csak boilerplate kódot jelent. Egyszerű esetben nem szeretnénk ezzel foglalkozni, csak a megadott üzenetet elküldeni.

Az első esetben használt convertAndSend() pontosan ezt oldja meg, de hogyan is működik ez? A színfalak mögött egy MessageConverter végzi el a piszkos munkát (ez a Spring része, nem pedig az artemis-é), mely interface a következőképpen néz ki:

1
2
3
4
5
public interface MessageConverter {
    Message toMessage(Object object, Session session) throws JMSException MessageConversionException;

    Object fromMessage(Message message) throws JMSException, MessageConversionException;
}

Elég egyértelmű ez az interface, azonban a legtöbb esetben van beépített megvalósítása is:

  • MappingJackson2MessageConverter: Üzenetek JSON-ből és JSON-ba (Jackson lib segítségével)
  • MarshallingMessageConverter: Üzenetek XML-ből és XML-be (JAXB segítségével)
  • SimpleMessageConverter: az alábbi konverziók lehetségesek vele
    • String \(\leftrightarrow\) TextMessage
    • byte array \(\leftrightarrow\) BytesMessage
    • Map \(\leftrightarrow\) MapMessage
    • Serializable \(\leftrightarrow\) ObjectMessage

Ez utóbbi az alapértelmezett, de szükséges, hogy a küldeni kívánt objektum implementálja a Serializable interface-t.

Megjegyzés

Példa másik MessageConverter használatára:

1
2
3
4
5
6
@Bean
public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");
    return messageConverter;
}

A setTypeIdPropertyName()-ra azért van szükség, hogy a fogadó tudja, hogy a megkapott üzenetet milyen típusúra kell alakítania. Ez alapértelmezetten az elküldött objektum fully qualified neve (package-elt osztálynév). Ütközések esetén használhatjuk a setTypeIdMappings() metódust, hogy mesterségesen előállított neveket használjunk. Például:

1
2
3
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("myclass", MyClass.class);
messageConverter.setTypeIdMappings(typeIdMappings);

Ilyen módon a fogadó a saját értelmezésében fogadhatja a myclass típusú üzeneteket (más package, más osztálynév, csak bizonyos property-ket kell átkonvertálni, stb.).

Amennyiben az üzenethez el szeretnénk tárolni további információkat, akkor használhatjuk a message post processor-t. Például egy étteremnél szeretnénk, ha tudnánk, hogy a rendelések weben történtek-e vagy az éttermen belül, helyben rendeltek.

1
2
3
4
jms.convertAndSend(order, message -> {
    message.setStringProperty("SOURCE", "LOCAL");
        return message;
    });

Most, hogy megismertük a különféle küldési lehetőségeket, az üzenet küldésének triggereléséhez készítsünk egy egyszerű REST API végpontot!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@Slf4j
public class MessageController {

    private SimpleMessageSender simpleMessageSender;

    public MessageController(SimpleMessageSender simpleMessageSender) {
        this.simpleMessageSender = simpleMessageSender;
    }

    @PostMapping("/send")
    public HttpEntity<String> send(@RequestBody String message){
        try{
            simpleMessageSender.send(message);
            return ResponseEntity.ok("Message sent: " + message);
        } catch (JmsException e){
            log.error("Could not send message", e);
            return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

A végpont egy egyszerű String-et kap, melyet a korábban ismertetett SimpleMessageSender segítségével küldünk el az üzenet sorba (queue).

Üzenetek fogadása

A JmsTemplate lehetőséget ad arra is, hogy üzeneteket fogadjunk. A küldés mintájára a fogadásra a receive és a receiveAndConvert metódusok szolgálnak.

Lássunk egy megvalósítást:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@Service
@Slf4j
public class SimpleReceiver {

    private JmsTemplate jmsTemplate;

    @Value("${jms.destination}")
    private String destination;

    @Autowired
    public SimpleReceiver(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void receiveMessage() throws JMSException {
        String msg = (String)jmsTemplate.receiveAndConvert(destination);
        log.info("receiveAndConvert: " + msg);
    }
}

A példa egy úgynevezett pull típusú fogadást végez, amihez meg kell hívnunk a receiveMessage() metódust, mely egészen addig blokkolja a szálat, ameddig nem kapunk üzenetet. A példában csak a payload-ot használjuk fel. Arra is lehetőség van, hogy a header-öket is feldolgozzuk. Ilyen esetben a receive metódust kell használnunk, illetve szükséges egy MessageConverter is.

A pull típus mellett létezik a push típusú üzenetfogadás is, mely nem blokkolja a szálat, hanem egy listener-t használ. Ez a listener megadása a @JmsListener használatával történik, melyet az adott komponens tetszőleges metódusára aggathatunk fel. Ebben az esetben az üzenet fogadóját a következőképpen kell elkészítenünk:

1
2
3
4
5
6
7
8
9
@Service
@Slf4j
public class SimpleReceiver {

    @JmsListener(destination = "${jms.destination}")
    public void receive(String message){
        log.info("Message received: " + message);
    }
}

A használt megközelítés kicsit hasonlít a @RequestMapping megadásokra.

RabbitMQ

Telepítési lépések:

  1. Erlang telepítése
  2. RabbitMQ telepítése

Miután telepítettük a RabbitMQ-t, azt service-ként el tudjuk indítani a Start menüből.

A pom.xml-be a következőt kell hozzáadnunk!

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

A fenti dependency jelenléte a classpath-ban triggereli az automatikus konfigurációt, mely így létrehoz egy connection factory és egy RabbitTemplate bean-t is (de erről lesz szó hamarosan).

Az application.properties fájlban a következő hasznos property-ket adhatjuk meg:

  • spring.rabbitmq.addresses: Vesszővel elválasztott lista, mely a RabbitMQ brokerek címeit tartalmazza
  • spring.rabbitmq.host: Broker host (default: localhost)
  • spring.rabbitmq.port: Broker port (default: 5672)
  • spring.rabbitmq.username: Broker-hez használt felhasználónév
  • spring.rabbitmq.password: Broker-hez használt jelszó

Ezután lássuk, hogyan küldhetünk üzeneteket! A folyamat lelkét a RabbitTemplate osztály adja, mely nagyban hasonlít a JmsTemplate-hez. Például használhatóak a következő metódusok:

  • void send(String exchange, String routingKey, Message message)
  • void convertAndSend(String exchange, String routingKey, Object message)
  • void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)

Ezen metódusok a legbővebb paraméterezéssel vannak jelen, ugyanakkor ezek változatai is használhatóak, ahol rendre elhagyjuk az exchange, vagy az exchange és a routingKey paramétereket.

A send egy raw Message objektumot tud elküldeni, így a saját objektumainkat át kell alakítani üzenetté (MessageConverter). A sendAndConvert() hasonlóan egyedi objektumokon is működik és a konverziót automatikusan, a színfalak mögött végzi a rendszer. Rendre megtalálható a MessagePostProcessor is, mellyel az üzenet elküldése előtt manipulálhatjuk a tartalmat.

A fő különbség (mely magából a protokollból jön), hogy destination helyett itt az exchange és a routing key elemeket adhatjuk meg. Amikor nem adjuk meg az exchange-t, akkor az default exchange-be továbbítódik, illetve ugyanez vonatkozik a routingKey-re is.

Lássunk az üzenetküldő service osztályt:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@Service
public class RabbitSenderService {

    private final RabbitTemplate rabbitTemplate;

    @Value("${hu.suaf.rabbit.routing-key}")
    private String routingKey;

    public RabbitSenderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void send(String msg){
        MessageConverter converter = rabbitTemplate.getMessageConverter();
        MessageProperties props = new MessageProperties();
        Message message = converter.toMessage(msg, props);
        rabbitTemplate.send(routingKey, message);
    }
}

A fenti példában az egyszerű send metódust használjuk, melyhez elő kell állítanunk az üzenetet. A RabbitTemplate bean-t automatikusan létrehozza a rendszer (Spring Boot autokonfiguráció miatt), melytől el tudjuk kérni az általa használt MessageConverter objektumot. A konverter-el egyszerűen előállíthatjuk tetszőleges objektum Message változatát, melyet a send-el már el is küldünk. Az üzenetre vonatkozó property-ket nyilván a MessageProperties-ben lehet megadni, ha erre szükségünk van (a fenti esetben ez nem kellett, így csak egy üreset adunk át). A send-nél nem adtunk meg exchange-et, így a küldés során az a default-nak lesz továbbítva. A routing key-t a megadott property névvel megadhatjuk az application.properties-ben.

Kapcsolódó linkek


Utolsó frissítés: 2020-11-18 12:05:12