Sincronizzarsi su di una coda: Subscriber

Continuiamo l'esercizio tratto dal tutorial Sun su Java EE, sezione "Creating Robust JMS Applications", in cui un publisher su topic si sincronizza con il subscriber per mezzo di una coda dedicata.

Nel post precedente abbiamo visto il publisher, ora vediamo il subscriber.

AsynchSubscriber

Anche questa classe é una estensione di Thread, il cui metodo run() é questo:

@Override
public void run() {
if(startup()) {
System.out.println("Subscriber ready to start");

receive();
}

closeConnections();
System.out.println("Subscriber run completed");
}

Poco da dire su startup, se non che qui usiamo anche una Durable Connection - ci serve per poter instanziare un durable subscriber - mentre le sessioni sono entrambe di tipo AUTO_ACKNOWLEDGE.

Molto da dire invece sulla receive(). Semplificando, l'idea é quella di instanziare un TopicSubscriber sul topic definito nel nostro Application Server, metterci sopra un listener che si occuperà di trattare i messaggi ricevuti e far partire la connessione.

Comunichiamo quindi al publisher che siamo pronti, aspettiamo che il listener ci confermi che la ricezione é completata e quindi procediamo al cleanup delle risorse utilizzate.

Vediamo il codice:

private void receive() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
TopicSubscriber subscriber =
sessions[0].createDurableSubscriber(cm.topic, "AckSub");

TextListener listener = new TextListener();
subscriber.setMessageListener(listener);
conns[0].start();

if(synchronize() == false) {
System.out.println("Can't synchronize");
return;
}

listener.waitTillDone();
subscriber.close();
sessions[0].unsubscribe("AckSub");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}

La synchronize() é una funzione piuttosto semplice, si limita a mandare un messaggio alla coda che utilizziamo per sincronizzare subscriber e producer.

Vediamo a seguire il codice di tutta la classe:

package jmsrobust;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;

public class AsynchSubscriber extends Thread {
enum Type {DURABLE, STANDARD};

private Connection conns[] = new Connection[2];
private Session sessions[] = new Session[2];

private boolean startup() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
int i = Type.DURABLE.ordinal();
conns[i] = cm.drbConnFact.createConnection();
sessions[i] = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);

i = Type.STANDARD.ordinal();
conns[i] = cm.connFact.createConnection();
sessions[i] = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);

return true;
}
catch (Exception e) {
System.err.println("Connection problem: " + e.toString());
}

return false;
}

private void closeConnections() {
for(Type type : Type.values()) {
int i = type.ordinal();
try {
if(conns[i] != null) {
conns[i].close();
conns[i] = null;
}
}
catch(JMSException ee) {}
}
}

private void receive() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
// start a listener on a subscriber
TopicSubscriber subscriber =
sessions[0].createDurableSubscriber(cm.topic, "AckSub");

TextListener listener = new TextListener();
subscriber.setMessageListener(listener);
conns[0].start();

// Let the publisher know that subscriber is ready.
if(synchronize() == false) {
System.out.println("Can't synchronize");
return;
}

// wait for notification from publisher
listener.waitTillDone();
subscriber.close();
sessions[0].unsubscribe("AckSub");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}

/**
* Sends a message to the queue to notify a publisher that
* it is ready to receive messages.
*/
private boolean synchronize() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
MessageProducer producer = sessions[1].createProducer(cm.queue);
TextMessage message = sessions[1].createTextMessage();
message.setText("synchronize");
System.out.println("Subscriber is sending synchronize message");
producer.send(message);
} catch (JMSException e) {
System.err.println("Exception occurred for the subscriber: " + e.toString());
return false;
}

return true;
}

/**
* Runs the thread.
*/
@Override
public void run() {
if(startup()) {
System.out.println("Subscriber ready to start");

receive();
}

closeConnections();
System.out.println("Subscriber run completed");
}
}

Ci resta da vedere il codice della TextListener (con la associata classe Monitor) e il main che ci permette di testare le funzionalità, vediamole nel post successivo.

Nessun commento:

Posta un commento