Nel post precedente abbiamo visto il publisher, ora vediamo il subscriber.
AsynchSubscriber
Anche questa classe é una estensione di Thread, il cui metodo run() é questo:
@Override
public void run() {
if(startup()) {
System.out.println("Subscriber ready to start");
receive();
}
closeConnections();
System.out.println("Subscriber run completed");
}
Poco da dire su startup, se non che qui usiamo anche una Durable Connection - ci serve per poter instanziare un durable subscriber - mentre le sessioni sono entrambe di tipo AUTO_ACKNOWLEDGE.
Molto da dire invece sulla receive(). Semplificando, l'idea é quella di instanziare un TopicSubscriber sul topic definito nel nostro Application Server, metterci sopra un listener che si occuperà di trattare i messaggi ricevuti e far partire la connessione.
Comunichiamo quindi al publisher che siamo pronti, aspettiamo che il listener ci confermi che la ricezione é completata e quindi procediamo al cleanup delle risorse utilizzate.
Vediamo il codice:
private void receive() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();
try {
TopicSubscriber subscriber =
sessions[0].createDurableSubscriber(cm.topic, "AckSub");
TextListener listener = new TextListener();
subscriber.setMessageListener(listener);
conns[0].start();
if(synchronize() == false) {
System.out.println("Can't synchronize");
return;
}
listener.waitTillDone();
subscriber.close();
sessions[0].unsubscribe("AckSub");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}
La synchronize() é una funzione piuttosto semplice, si limita a mandare un messaggio alla coda che utilizziamo per sincronizzare subscriber e producer.
Vediamo a seguire il codice di tutta la classe:
package jmsrobust;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
public class AsynchSubscriber extends Thread {
enum Type {DURABLE, STANDARD};
private Connection conns[] = new Connection[2];
private Session sessions[] = new Session[2];
private boolean startup() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();
try {
int i = Type.DURABLE.ordinal();
conns[i] = cm.drbConnFact.createConnection();
sessions[i] = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
i = Type.STANDARD.ordinal();
conns[i] = cm.connFact.createConnection();
sessions[i] = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
return true;
}
catch (Exception e) {
System.err.println("Connection problem: " + e.toString());
}
return false;
}
private void closeConnections() {
for(Type type : Type.values()) {
int i = type.ordinal();
try {
if(conns[i] != null) {
conns[i].close();
conns[i] = null;
}
}
catch(JMSException ee) {}
}
}
private void receive() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();
try {
// start a listener on a subscriber
TopicSubscriber subscriber =
sessions[0].createDurableSubscriber(cm.topic, "AckSub");
TextListener listener = new TextListener();
subscriber.setMessageListener(listener);
conns[0].start();
// Let the publisher know that subscriber is ready.
if(synchronize() == false) {
System.out.println("Can't synchronize");
return;
}
// wait for notification from publisher
listener.waitTillDone();
subscriber.close();
sessions[0].unsubscribe("AckSub");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}
/**
* Sends a message to the queue to notify a publisher that
* it is ready to receive messages.
*/
private boolean synchronize() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();
try {
MessageProducer producer = sessions[1].createProducer(cm.queue);
TextMessage message = sessions[1].createTextMessage();
message.setText("synchronize");
System.out.println("Subscriber is sending synchronize message");
producer.send(message);
} catch (JMSException e) {
System.err.println("Exception occurred for the subscriber: " + e.toString());
return false;
}
return true;
}
/**
* Runs the thread.
*/
@Override
public void run() {
if(startup()) {
System.out.println("Subscriber ready to start");
receive();
}
closeConnections();
System.out.println("Subscriber run completed");
}
}
Ci resta da vedere il codice della TextListener (con la associata classe Monitor) e il main che ci permette di testare le funzionalità, vediamole nel post successivo.
Nessun commento:
Posta un commento