Sincronizzarsi su di una coda: Publisher

Il secondo esempio presentato nel tutorial Java EE di Sun nel paragrafo "Creating Robust JMS Applications", presenta uno scambio di messaggi tra publisher e subcriber che usano una coda accessoria per sincronizzarsi.

Ridisegno il codice presentato, perché mi é sembrato un po' caotico, sperando di ottenere qualcosa di più leggibile.

MultiplePublisher

La classe MultiplePublisher si prende il compito di postare i messaggi su di un Topic per l'uso del subscriber che vedremo più avanti.

Avendo deciso di lavorare con un topic, invece con una queue, dobbiamo aspettare che il ricevente ci dica quando é pronto a ricevere, per evitare che i nostri messaggi vadano persi.

Il metodo run() della classe, che estende Thread, sarà questo:

@Override
public void run() {
if(startup()) {
System.out.println("Publisher ready to start");

if(synchronize())
sendMessages();
}

closeConnections();
System.out.println("Publisher run completed");
}

Dopo la fase di inizializzazione, definita in startup(), ci si sincronizza con il destinatario, e solo se la sincronizzazione ha successo si mandano i messaggi.

La sincronizzazione avviene appoggiandosi su una coda e mettendosi in attesa che arrivi su di essa un messaggio prestabilito:

public boolean synchronize() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
System.out.println("Publisher is synchronizing on the control queue");
MessageConsumer receiver = sessions[1].createConsumer(cm.queue);
conns[1].start();

Message message = receiver.receive(2000);
if(message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
if(text.getText().equals("synchronize")) {
System.out.println("Publisher has received synchronize message");
return true;
}
}
System.out.println("Publisher has not received synchronize message");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}

return false;
}

In questo caso abbiamo deciso di aspettare per due secondi, se nulla arriva nel termine stabilito, diamo per fallita la connessione. Un esempio più realistico sarebbe più opportuno prevedere un meccanismo più sofisticato.

Vediamo ora il codice completo per questa classe. Notiamo che le due sessioni utilizzate (una per il topic su cui avviene il reale scambio di messaggi, una per la coda di controllo) sono create in modalità AUTO_ACKNOWLEDGE.

package jmsrobust;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MultiplePublisher extends Thread {
private final int NUMMSGS = 3;
private final String MSG_TEXT = "Here is an auto-acknowledge message";

private Connection conns[] = new Connection[2];
private Session sessions[] = new Session[2];

private boolean startup() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
for(int i = 0; i < 2; ++i) {
conns[i] = cm.connFact.createConnection();
sessions[i] = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
}
return true;
}
catch (Exception e) {
System.err.println("Connection problem: " + e.toString());
}

return false;
}

private void closeConnections() {
for(int i = 0; i < 2; ++i) {
try {
if(conns[i] != null) {
conns[i].close();
conns[i] = null;
}
}
catch(JMSException ee) {}
}
}

private void sendMessages() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

MessageProducer publisher = null;
TextMessage message = null;
try {
publisher = sessions[0].createProducer(cm.topic);
message = sessions[0].createTextMessage();

for (int i = 0; i < NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + i);
System.out.println("Publishing message: " + message.getText());
publisher.send(message);
}

System.out.println("Sending end of messages");
publisher.send(sessions[0].createMessage());
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}

@Override
public void run() {
if(startup()) {
System.out.println("Publisher ready to start");

if(synchronize())
sendMessages();
}

closeConnections();
System.out.println("Publisher run completed");
}

public boolean synchronize() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
System.out.println("Publisher is synchronizing on the control queue");
MessageConsumer receiver = sessions[1].createConsumer(cm.queue);
conns[1].start();

Message message = receiver.receive(2000);
if(message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
if(text.getText().equals("synchronize")) {
System.out.println("Publisher has received synchronize message");
return true;
}
}
System.out.println("Publisher has not received synchronize message");
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}

return false;
}
}

I dettagli implementativi della connessione a JMS sono nella classe JmsConnectionManager, che abbiamo visto nel precedente post dedicato all'argomento.

Nessun commento:

Posta un commento