Messaggi con ricevuta

Per il momento trattiamo sessioni senza transazioni, in questo caso la gestione delle ricevute é determinata da come la sessione viene creata, il che può avvenire in una di queste tre modalità:
  • AUTO_ACKNOWLEDGE, la ricevuta é gestita automaticamente dalla sessione;
  • CLIENT_ACKNOWLEDGE, il client chiama esplicitamente il metodo acknowledge() del messaggio ricevuto per notificare la ricezione. Nota che la notifica non vale per il solo messaggio su cui si chiama il metodo, ma per tutti i messaggi consumati dal client;
  • DUPS_OK_ACKNOWLEDGE, gestione automatica minimizzando i controlli della sessione per evitare duplicazioni di messaggi, permette una gestione più leggera a costo del rischio di duplicazione di messaggi (il campo JMSRedelivered viene messo a true per segnalare messaggi duplicati).
I messaggi senza acknowledgment prima del termine della sessione vengono tenuti in sospeso e rimandati alla nuova connessione alla coda, e anche al topic, nel caso sia associato a un durable TopicSubscriber.

Lavoriamo sul codice fornito dal tutorial Sun su Java EE nel folder jms/advanced/ackequivexample.

L'esempio che sviluppiamo qui mostra come nel caso di un receiver sincrono la ricevuta di ritorno per un messaggio venga generata al termine della gestione.

Specificando CLIENT_ACKNOWLEDGE come modalità di sessione, daremo la ricevuta di ritorno esplicitamente al termine della gestione.

Applicazione Java

Creamo una nuova applicazione Java SE, che chiamiamo jmsRobust e, come negli esempi precedenti, aggiungiamo i jar che ci permettono di interagire con l'application server:
  • appserv-admin.jar
  • appserv-launch.jar
  • appserv-rt.jar
  • imq.jar
  • imqbroker.jar
  • imqjmsra.jar
  • javaee.jar
  • jms.jar
JmsConnectionManager

Creiamo un unico punto di accesso a JMS, la classe JmsConnectionManager, un singleton che va progettato con un minimo di attenzione, visto l'ambiente multithreading in cui ci troviamo.

Al solito, l'URL utilizzata é quella del localhost, nel caso l'application server sia su un'altra macchina occorre fornire l'indirizzo corretto.

Per il resto il codice dovrebbe essere abbastanza leggibile:

package jmsrobust;

import java.util.Hashtable;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JmsConnectionManager {
private static final String AS_URL = "iiop://127.0.0.1:3700";
private static final String AS_CLASS = "com.sun.appserv.naming.S1ASCtxFactory";
private static final String CONNECTION = "jms/tConnectionFactory";
private static final String QUEUE = "jms/tQueue";
private static final String TOPIC = "jms/tTopic";

private static volatile JmsConnectionManager instance;

protected ConnectionFactory connFact;
protected Queue queue;
protected Queue ctrlQueue;
protected Topic topic;

private Context ctx;

private JmsConnectionManager() {
System.out.println(Thread.currentThread().getName() + ": initializing the context");

Hashtable properties = new Hashtable(2);
properties.put(Context.PROVIDER_URL, AS_URL);
properties.put(Context.INITIAL_CONTEXT_FACTORY, AS_CLASS);
try {
ctx = new InitialContext(properties);
System.out.println(Thread.currentThread().getName() + ": context initialized");

System.out.println(Thread.currentThread().getName() + ": lookup step");
connFact = (ConnectionFactory)ctx.lookup(CONNECTION);

queue = (Queue)ctx.lookup(QUEUE);
topic = (Topic)ctx.lookup(TOPIC);
System.out.println(Thread.currentThread().getName() + ": lookup done");
}
catch(NamingException ne) {
ne.printStackTrace();
}
}

public static JmsConnectionManager getInstance() {
if(instance == null) {
System.out.println(Thread.currentThread().getName() + ": init required");
synchronized(JmsConnectionManager.class) {
if(instance == null) {
System.out.println(Thread.currentThread().getName() + ": init really required");
instance = new JmsConnectionManager();
System.out.println(Thread.currentThread().getName() + ": init done");
}
}
}

return instance;
}
}

SynchClient

Dobbiamo ora creare due classi, un sender e un receiver che usino la coda per scambiarsi messaggi. Notiamo però che le due classi sarebbero molto simili, c'é una prima fase di startup, uguale per entrambi, in cui si crea una connessione e con questa una sessione; segue un passo specifico in cui il sender manda i messaggi e il receiver li riceve, e infine c'é il cleanup in cui si chiude la connessione.

Tutto ciò ci spinge ad usare il pattern Template Method. La SynchClient sarà la classe astratta alla base della nostra gerarchia:

package jmsrobust;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

public abstract class SynchClient extends Thread {
protected Connection connection;
protected Session session;

private boolean startup() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
connection = cm.connFact.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
return true;
}
catch (Exception e) {
System.err.println("Connection problem: " + e.toString());
}

return false;
}

private void closeConnection() {
if (connection != null) {
try { connection.close();}
catch (JMSException ee) {}
}
}

protected abstract void operation();

@Override
public void run() {
if(startup())
operation();
closeConnection();
}
}

Il template method del pattern é qui run(), che definisce come il codice viene eseguito dalle classi derivate. Due dei tre passi sono definiti in questa classe, startup() e closeConnection(), c'é poi il metodo dichiarato qui come abstract, che andrà definito nelle classi derivate.

La cosa importante da notare dal punto di vista di JMS é che la sessione viene creata in modalità CLIENT_ACKNOWLEDGE, dunque toccherà a chi riceve dare conferma esplicita del messaggio ricevuto.

SynchSender

Il sender implementa operation() con lo scopo di mettere un messaggio sulla coda. Usa la session creata da startup() per creare un MessageProducer che faccia riferimento alla coda che abbiamo istanziato in JmsConnectionManager:

package jmsrobust;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

public class SynchSender extends SynchClient {
final String MSG_TEXT = "Please explicitly acknowledge this";

protected void operation() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
MessageProducer producer = session.createProducer(cm.queue);
TextMessage message = session.createTextMessage();

long secs = System.currentTimeMillis() % 10000;
message.setText(MSG_TEXT + " (" + secs + ")");

System.out.println("Sending message: " + message.getText());
producer.send(message);
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}
}

SynchReceiver

Il receiver implementa operation() per leggere un messaggio dalla coda, crea quindi un MessageConsumer sulla coda usata anche dal sender, fa partire la connessione e si mette in attesa di ricevere il messaggio.

Ricevuto il messaggio e compiuta la sua elaborazione viene mandata ricevuta alla coda:

package jmsrobust;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;

public class SynchReceiver extends SynchClient {

protected void operation() {
JmsConnectionManager cm = JmsConnectionManager.getInstance();

try {
MessageConsumer receiver = session.createConsumer(cm.queue);
connection.start();
TextMessage message = (TextMessage)receiver.receive();

System.out.println("The receiver processes: " + message.getText());
System.out.println("The receiver acknowledge the message");
message.acknowledge();
} catch (JMSException e) {
System.err.println("Exception occurred: " + e.toString());
}
}
}

Main

Per testare il codice scriviamo questo codice:

private static void clientAck() {
System.out.println("Start explicit client acknowledgment example");
Thread sender = new SynchSender();
Thread receiver = new SynchReceiver();

sender.start();
receiver.start();

try {
sender.join();
receiver.join();
}
catch (InterruptedException e) {
e.printStackTrace(System.out);
}

System.out.println("Explicit client acknowledgment example completed");
}

public static void main(String[] args) {
clientAck();

System.exit(0);
}

Nessun commento:

Posta un commento