Sincronizzarsi su di una coda: Subscriber/2

Ultimo passo nel completare l'esempio che abbiamo cominciato a vedere un paio di post fa, sullo scambio di messaggi tra un publisher e un subscriber per mezzo di un topic, usando una coda per sincronizzarsi.

Ci resta da vedere come implementare il listener che deve effettivamente leggere e gestire i messaggi che arrivano dal publisher per conto del subscriber.

TextListener

Abbiamo visto che il metodo receive() del nostro subscriver delega a questa classe la lettura dei messaggi, e resta in attesa, chiamando il suo metodo waitTillDone(), che l'operazione venga completata. Vediamo nel dettaglio come ciò avviene:

package jmsrobust;

import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

public class TextListener implements MessageListener {
private Monitor monitor = new Monitor();

public void waitTillDone() {
monitor.waitTillDone();
}

public void onMessage(Message message) {
if(message instanceof TextMessage) {
TextMessage msg = (TextMessage)message;

try {
System.out.println("Processing message: " + msg.getText());
}
catch (JMSException je) {
System.err.println("Exception in onMessage(): " + je.toString());
}
}
else {
// non-text message == end of the message stream
monitor.allDone();
}
}
}

Sembra tutto molto semplice, e in effetti lo é. Il metodo onMessage(), dichiarato nell'interfaccia MessageListener che la nostra classe implementa, processa i messaggi che ci vengono passati da JMS - ricordiamo che questo listener viene associato dal metodo receive() del nostro subscriber al topic usato come canale si comunicazione - finché non si vede un messaggio non testuale che abbiamo stabilito essere la convenzione che ci indica il termine delle trasmissioni. Segnaliamo questo al monitor che usiamo allo scopo di sincronizzare il TextListener con chi lo possiede.

Vediamoci dunque come implementiamo il monitor:

package jmsrobust;

public class Monitor {
private boolean done = false;

public synchronized void waitTillDone() {
while(!done) {
System.out.println("Waiting for notification");
try {
this.wait();
}
catch (InterruptedException ie) {
System.out.println("Wait interrupted");
}
}
System.out.println("Notification received");
}

public synchronized void allDone() {
done = true;
System.out.println("allDone notification");
this.notify();
}
}

In realtà mi sembra un po' sovrabbondante definire i metodi come sincronizzati, dato che non vengono chiamati concorrentemente, ma potrebbe essere utile in una situazione più complessa. Per il momento manteniamo il codice così, in attesa di valutare meglio i pro e i contro della decisione.

Per il resto, il codice dovrebbe essere abbastanza intuitivo. Con waitTillDone() mettiamo il thread che lo chiama in attesa. E questa attesa viene risolta con una chiamata a allDone() che cambia lo stato della variabile di controllo e notifica all'oggetto stesso che ha cambiato stato.

Test

Per testare la fuzionalità usiamo questo metodo:

private static void pubSub() {
System.out.println("Start multi publisher example");
Thread subscriber = new AsynchSubscriber();
Thread publisher = new MultiplePublisher();

try {
subscriber.start();
publisher.start();

subscriber.join();
publisher.join();
}
catch (InterruptedException e) {
e.printStackTrace(System.out);
}

System.out.println("Multi publisher example completed");
}

Nessun commento:

Posta un commento