Consumo asincrono di messaggi

Aggiungiamo un altro client al nostro sistema. Questo consuma messaggi in maniera asincrona. In pratica lo lanciamo, mettendolo in ascolto su una Queue o Topic e lo lasciamo correre indefinitamente.

TextListener

Gran parte del codice é uguale a quello che abbiamo scritto per il client sincrono, il principale cambiamento é che la gestione dei messaggi avviene per mezzo di un MessageListener.

A noi interessa gesitire solo i messaggi di testo, quindi lo chiamiamo TextListener:

package asynchConsumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TextListener implements MessageListener {
public void onMessage(Message message) {
System.out.println("Enter TextListener.onMessage()");
TextMessage msg = null;

try {
if(message instanceof TextMessage) {
msg = (TextMessage) message;
System.out.println("Reading message: " + msg.getText());
}
else {
System.err.println("Message is not a TextMessage");
}
}
catch(JMSException e) {
System.err.println("JMSException in onMessage(): " + e.toString());
}
catch(Throwable t) {
System.err.println("Exception in onMessage():" + t.getMessage());
}

System.out.println("Exit TextListener.onMessage()");
}
}

Consumer

La classe Consumer di asynchConsumer si differenzia da quella di synchConsumer praticamente nel solo metodo getMessage(). In questo caso ci si limita ad istanziare il Listener e associarlo al consumer:

public void getMessages() throws JMSException, NamingException {
Destination dest = getDestination();

MessageConsumer consumer = session.createConsumer(dest);
TextListener listener = new TextListener();
consumer.setMessageListener(listener);
connection.start();

System.out.println("Waiting for messages");
}

Main

Il cambiamento rilevante nella classe Main é che aggiungiamo la gestione dell'interrupt. L'idea é che il thread iniziale che esegue la funzione main termina dopo aver creato il consumer. Restano vivi i thread generati da questo, che termineremo esplicitamente su richiesta dell'utente, mediante un control-c.

La gestione dello shutdown é affidata a questa classe:

package asynchConsumer;

import javax.jms.JMSException;

public class Shutdown implements Runnable {
Consumer consumer;

public Shutdown(Consumer consumer) {
this.consumer = consumer;
}

public void run() {
System.out.println("Shutting down...");

if(consumer != null)
try { consumer.close();} catch(JMSException ex) {}
}
}

Tiene al suo interno un riferimento al consumer e, quando arriva il segnale di terminazione lo chiude.

La comunicazione alla Java Virtual Machine che va chiamato il metodo run() della nostra classe Shutdown quando arriva il momento di terminare l'esecuzione, si compie per mezzo di una chiamata a addShutdownHook() a cui passiamo un nuovo thread che ha il compito di eseguire la nostra classe.

Vediamo dunque la funzione main() della nostra applicazione con questa variazione, per il resto praticamente uguale al main per il consumer sincrono:

public static void main(String[] args) {
boolean isTopic = (args.length == 1 && args[0].startsWith("t")) ? true : false;

Consumer consumer = null;
try {
consumer = new Consumer();

// default: wait for queue messages
if(isTopic)
consumer.setDestination(Consumer.Type.TOPIC);
consumer.getMessages();
}
catch(Exception ex) {
System.err.println("Consuming failed: " + ex.toString());
ex.printStackTrace();
System.exit(1);
}

Runtime.getRuntime().addShutdownHook(new Thread(new Shutdown(consumer), "Shutdown"));
System.out.println("Control-C to terminate");
}

Nessun commento:

Posta un commento