Topic vs Queue

Facciamo un esempio per sperimentare la differenza tra topic e queue.

Costruiremo un producer che metterà messaggi in una destinazione di tipo javax.jms.Topic e altri in una javax.jms.Queue; a questo affiancheremo un consumer capace di ricevere messaggi da Topic e da Queue.

Per rendere le cose più interessanti lavoreremo con java SE, lavorando quindi sostanzialmente in remoto.

Admin Console

Avevamo già creato per i precedenti esempi una Connection Factory e una Destination di tipo Queue, se guardiamo nella nostra console dell'amministratore per Glassfish sotto Resources, JMS Resources le dovremmo trovare sotto Connection Factories (con JNDI Name jms/tConnectionFactory) e Destination Resources (con JNDI Name jms/tQueue).

Creiamo una nuova destinazione, il Topic che ha nome JNDI "jms/tTopic", destinazione fisica a nome "tConnectionFactory" (che poi é la connection factory che abbiamo già definito), tipo di risorsa "javax.jms.Topic", e una descrizione a nostro piacere.

Applicazione Java SE

Creaiamo ora un progetto per una applicazione Java SE, che chiamemo JmsSimple (o come altro ci sembra meglio).

Dobbiamo aggiungere dieci jar che ci permettono di gestire JMS pur non essendo in un Java EE Container:
  • appserv-rt.jar,
  • javaee.jar,
  • appserv-admin.jar,
  • appserv-launch.jar
  • appserv-ext.jar
  • appserv-deployment-client.jar
  • imqjmsra.jar,
  • jms.jar,
  • imq.jar,
  • imqbroker.jar.
Nella nostra applicazione vogliamo due package: producer e synchConsumer. Il producer creerà messaggi per JMS, synchConsumer fornirà le funzionalità per consumarli.

Producer

La differenza fondamentale tra l'applicazione remota che generava messaggi per JMS che abbiamo sviluppato in precedenza e questa, é che qui gestiamo anche la destinazione JMS Topic ma, come si vede nel codice sorgente che segue, si tratta proprio di una differenza minima, in pratica dobbiamo solo avere il nome della nostra destinazione, e sapere che é di tipo Topic:

package producer;

import java.util.Hashtable;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Producer {
private static final String AS_URL = "iiop://127.0.0.1:3700";
private static final String AS_CLASS = "com.sun.appserv.naming.S1ASCtxFactory";
private static final String CONNECTION = "jms/tConnectionFactory";
private static final String QUEUE = "jms/tQueue";
private static final String TOPIC = "jms/tTopic";

private static Queue queue;
private static Topic topic;
public enum Type {QUEUE, TOPIC};

private int messages;
private Type destType;
private Connection connection;
private Session session;

private static Context ctx;
static {
System.out.println("initializing the context");

Hashtable properties = new Hashtable(2);
properties.put(Context.PROVIDER_URL, AS_URL);
properties.put(Context.INITIAL_CONTEXT_FACTORY, AS_CLASS);
try {
ctx = new InitialContext(properties);
}
catch (NamingException ex) {
ex.printStackTrace();
}
}

public void setMessages(int messages) {
this.messages = messages;
}

public void setDestination(Type destType) {
this.destType = destType;
}

public Producer() throws JMSException, NamingException {
messages = 4;
destType = Type.QUEUE;

ConnectionFactory f = (ConnectionFactory)ctx.lookup(CONNECTION);
connection = f.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

private Destination getDestination() throws NamingException {
if(destType == Type.QUEUE) {
if(queue == null) {
System.out.println("Looking for JMS queue");
queue = (Queue)ctx.lookup(QUEUE);
}
return queue;
}
else { // Type.TOPIC
if(topic == null) {
System.out.println("Looking for JMS topic");
topic = (Topic)ctx.lookup(TOPIC);
}
return topic;
}
}

public void sendMessages() throws JMSException, NamingException {
Destination dest = getDestination();

MessageProducer producer = session.createProducer(dest);
TextMessage message = session.createTextMessage();

for (int i = 0; i < messages; i++) {
message.setText("This is message " + (i + 1));
System.out.println("Sending message: " + message.getText());
producer.send(message);
}

// Send a non-text control message indicating end of messages
producer.send(session.createMessage());
}

public void close() throws JMSException {
System.out.println("Closing JMS connection");
if(connection != null)
connection.close();
}
}

La particolarità implementativa é che, in coda ai messaggi di tipo testo, viene spedito un messaggio vuoto, di tipo MessageImpl (come dire, non un messaggio reale), a indicare che la trasmissione é stata completata.

La classe Main del package producer si limita a fornire il metodo statico main, che si occupa di instanziare il producer e usarlo per mandare messaggi:

public static void main(String[] args) {
try {
Producer producer = new Producer();

// default: 4 messages for queue
producer.sendMessages();

// 2 messages for topic
producer.setDestination(Producer.Type.TOPIC);
producer.setMessages(2);
producer.sendMessages();

producer.close();
}
catch(Exception ex) {
System.err.println("Production failed: " + ex.toString());
ex.printStackTrace();
System.exit(1);
}
System.exit(0);
}


Consumer

Le classi che consumano i messaggi sono organizzate in modo parallelo al produttore.

La classe Consumer si mette in attesa sulla risorsa destinazione JMS e continua a leggere messaggi fino a che non riceve il messaggio fittizio che indica il termine delle trasmissioni:

package synchConsumer;

import java.util.Hashtable;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Consumer {
private static final String AS_URL = "iiop://127.0.0.1:3700";
private static final String AS_CLASS = "com.sun.appserv.naming.S1ASCtxFactory";
private static final String CONNECTION = "jms/tConnectionFactory";
private static final String QUEUE = "jms/tQueue";
private static final String TOPIC = "jms/tTopic";

private static Queue queue;
private static Topic topic;
public enum Type {QUEUE, TOPIC};

private Type destType;
private Connection connection;
private Session session;

private static Context ctx;
static {
System.out.println("initializing the context");

Hashtable properties = new Hashtable(2);
properties.put(Context.PROVIDER_URL, AS_URL);
properties.put(Context.INITIAL_CONTEXT_FACTORY, AS_CLASS);
try {
ctx = new InitialContext(properties);
}
catch (NamingException ex) {
ex.printStackTrace();
}
}

public void setDestination(Type destType) {
this.destType = destType;
}

public Consumer() throws JMSException, NamingException {
destType = Type.QUEUE;

ConnectionFactory f = (ConnectionFactory)ctx.lookup(CONNECTION);
connection = f.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

private Destination getDestination() throws NamingException {
if(destType == Type.QUEUE) {
if(queue == null) {
System.out.println("Looking for JMS queue");
queue = (Queue)ctx.lookup(QUEUE);
}
return queue;
}
else { // Type.TOPIC
if(topic == null) {
System.out.println("Looking for JMS topic");
topic = (Topic)ctx.lookup(TOPIC);
}
return topic;
}
}

public void getMessages() throws JMSException, NamingException {
Destination dest = getDestination();

MessageConsumer consumer = session.createConsumer(dest);
connection.start();

System.out.println("Waiting for messages");
while (true) {
Message m = consumer.receive(1);

if (m != null) {
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
}
else {
System.out.print("Not a text message: " + m.getClass().getName());
System.out.println(", teminating...");
break;
}
}
}
System.out.println("Messages received.");
}

public void close() throws JMSException {
System.out.println("Closing JMS connection");
if(connection != null)
connection.close();
}
}

La classe Main del package synchConsumer si limita anche qui a fornire il metodo statico main che si occupa di istanziare il consumer. L'utente che lancia questa classa decide se stare in ascolto sulla Queue o sul Topic specificando un argomento in ingresso:

public static void main(String[] args) {
boolean isTopic = (args.length == 1 && args[0].startsWith("t")) ? true : false;

try {
Consumer consumer = new Consumer();

// default: wait for queue messages
if(isTopic)
consumer.setDestination(Consumer.Type.TOPIC);
consumer.getMessages();

consumer.close();
}
catch(Exception ex) {
System.err.println("Consuming failed: " + ex.toString());
ex.printStackTrace();
System.exit(1);
}
System.exit(0);
}

Differenze

La differenza tra una Queue e un Topic é che la Queue tiene i messaggi in attesa di essere consumati, mentre il Topic distribuisce i messaggi a chi é in attesa.

Per vedere la cosa nei fatti basta eseguire producer e consumer in ordine diverso. Se per Queue cambiando l'ordine dell'esecuzione il risultato finale non cambia, per Topic se eseguiamo il producer prima del consumer i messaggi generati vanno persi.

Nessun commento:

Posta un commento