Il punto di questo esempio é che vogliamo che il subscriber possa disconnettersi da JMS senza perdere messaggi a lui indirizzati. Usiamo perciò una connessione durable, in modo da poter creare un TopicSubscriber durable.
La gestione di basso livello per JMS, con la creazione del contesto iniziale, delle connection factory e del topic é la medesima che abbiamo già visto in precedenza, inglobata nella classe JmsConnectionManager, perciò non ne ripetiamo qui i dettagli.
MultiplePublisher
Il publisher é piuttosto semplice, usa la connessione non durable, su cui crea una sessione AUTO_ACKNOWLEDGE, il producer é un MessageProducer costruito sulla session specificando il topic che abbiamo definito precedentemente via Admin Console di GlassFish.
In pratica ci si limita a mandare un certo numero di messaggi sul topic ogni volta che viene invocato il metodo publishMessages(); ecco qui a seguire il codice della classe:
package durable;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class MultiplePublisher {
private Connection connection = null;
private Session session = null;
private MessageProducer producer;
private JmsConnectionManager cm = JmsConnectionManager.getInstance();
private int msgBaseIndex = 0;
public MultiplePublisher() throws JMSException {
connection = cm.connFact.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(cm.topic);
}
public void publishMessages() {
final int NUMMSGS = 3;
final String MSG_TEXT = "This is message ";
try {
TextMessage message = session.createTextMessage();
for(int i = 1; i <= NUMMSGS; ++i) {
message.setText(MSG_TEXT + (msgBaseIndex + i));
System.out.println("Publishing message: " + message.getText());
producer.send(message);
}
msgBaseIndex += NUMMSGS;
// non-text message as end of messages
producer.send(session.createMessage());
}
catch(JMSException e) {
System.err.println("Exception in publishMessages: " + e.toString());
}
}
public void finish() {
if(connection != null) {
try {
connection.close();
}
catch(JMSException e) {}
}
}
}
DurableSubscriber
Il subscriber di questo esempio é simile al subscriber dell'esempio precedente, e ricicliamo per il suo uso le classi TextListener e Monitor, che mettono a disposizione rispettivamente il metodo onMessage() che gestisce i messaggi come indirizzati dal topic e il meccanismo per sincronizzare DurableSubscriber con il listener.
Il suo costruttore crea una connessione durable, e da questa una sessione di tipo AUTO_ACKNOWLEDGE.
La ricezione di messaggi é gestita da due metodi, startSubscriber() e closeSubscriber().
La startSubscriber() crea un durable subscriber per il topic usato dal publisher, crea un listener, di tipo TextListener - che abbiamo già trattato nel post precedente, e lo associa al subscriber. Quindi facciamo partire la connessione.
La closeSubscriber() aspetta che il listener abbia completato la ricezione dei messaggi, quindi chiude il subscriber e ferma la connessione.
Vediamo il codice della classe:
package durable;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
public class DurableSubscriber {
private Connection connection = null;
private Session session = null;
private TextListener listener = null;
private TopicSubscriber subscriber = null;
private JmsConnectionManager cm = JmsConnectionManager.getInstance();
public DurableSubscriber() throws JMSException {
connection = cm.drbConnFact.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void startSubscriber() throws JMSException {
System.out.println("Starting subscriber");
subscriber = session.createDurableSubscriber(cm.topic, "MakeItLast");
listener = new TextListener();
subscriber.setMessageListener(listener);
connection.start();
}
public void closeSubscriber() throws JMSException {
listener.waitTillDone();
System.out.println("Closing subscriber");
subscriber.close();
connection.stop();
}
public void finish() {
if (connection != null) {
System.out.println("Unsubscribing from durable subscription");
try {
session.unsubscribe("MakeItLast");
connection.close();
}
catch(JMSException je) {
System.out.println(je.getMessage());
}
}
}
}
Main
Il main fa partire e poi chiude due volte il subscriber. Se non fosse durable i messaggi mandati dal publisher quando il subscriber é giù andrebbero persi. Eseguendo il codice vediamo che, come ci aspettiamo, i messaggi restano sul topic in attesa che il subscriber venga fatto ripartire.
public static void main(String[] args) {
DurableSubscriber subscriber = null;
MultiplePublisher publisher = null;
try {
subscriber = new DurableSubscriber();
publisher = new MultiplePublisher();
subscriber.startSubscriber();
publisher.publishMessages();
subscriber.closeSubscriber();
publisher.publishMessages();
subscriber.startSubscriber();
subscriber.closeSubscriber();
}
catch(JMSException je) {
System.out.println("Exception: " + je.getMessage());
}
finally {
if(subscriber != null)
subscriber.finish();
if(publisher != null)
publisher.finish();
}
System.exit(0);
}