博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JMS系列(三)-java操作JMS Topic实例
阅读量:6227 次
发布时间:2019-06-21

本文共 7651 字,大约阅读时间需要 25 分钟。

在介绍如何通过java往jms消息队列里面写消息和读取消息,本文介绍如何通过java往jms主题里写消息和读取消息。

消息发布

同样将消息发布到主题中,需要经过以下步骤

  • 连接jms服务器
  • 获取连接工厂(Connection Factory)
  • 通过连接工厂创建主题连接(TopicConnection)
  • 通过主题连接创建主题会话(TopicSession)
  • 通过主题会话创建主题发布者(Publisher)
  • 创建消息(Message)
  • 通过发布者将消息发送到主题中

代码实现:

package asan.demo.jms;import java.util.Hashtable;import javax.jms.JMSException;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;public class JMSTopicSender {    private TopicPublisher sender = null;    private TopicSession session = null;    private static final String JMS_FACTORY_JNDI = "jms/jms_test_connection_factory1";    private static final String JMS_TOPIC_JNDI = "jms/jms_test_topic";    public JMSTopicSender() {        super();    }    public void sendMessage(String msg) {        TextMessage textMsg;        try {            if (this.sender == null) {                this.init();            }            textMsg = session.createTextMessage();            textMsg.setText(msg);            sender.send(textMsg);        } catch (JMSException e) {            e.printStackTrace();        } catch (Exception ex) {            ex.printStackTrace();        }    }    //    1. 连接jms服务器    //    2. 获取连接工厂(Connection Factory)    //    3. 通过连接工厂创建主题连接(TopicConnection)    //    4. 通过主题连接创建主题会话(TopicSession)    //    5. 通过主题会话创建主题发布者(Publisher)    //    6. 创建消息(Message)    //    7. 通过发布者将消息发送到主题中    private void init() throws NamingException, JMSException {        Hashtable properties = new Hashtable();        properties.put(Context.INITIAL_CONTEXT_FACTORY,                       "weblogic.jndi.WLInitialContextFactory");        properties.put(Context.PROVIDER_URL, "t3://127.0.0.1:7101");        properties.put(Context.SECURITY_PRINCIPAL, "weblogic");        properties.put(Context.SECURITY_CREDENTIALS, "weblogic1");        InitialContext ctx = new InitialContext(properties);        TopicConnectionFactory jmsFactory =            (TopicConnectionFactory)ctx.lookup(JMS_FACTORY_JNDI);        TopicConnection jmsConn = jmsFactory.createTopicConnection();        session = jmsConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);        Topic topic = (Topic)ctx.lookup(JMS_TOPIC_JNDI);        sender = session.createPublisher(topic);    }        public static void main(String[]cmd){        JMSTopicSender sender=new JMSTopicSender();        sender.sendMessage("hello jms topic");    }}

与队列不同的是,此时主题没有订阅者,那么该消息就不存储在主题中,即使后面有订阅者订阅了该主题,也无法接收订阅前的消息。

消息订阅

从主题中订阅消息,需要经过以下步骤:

  • 连接jms服务器
  • 获取连接工厂(Connection Factory)
  • 通过连接工厂创建主题连接(TopicConnection)
  • 通过主题连接创建主题会话(TopicSession)
  • 通过主题会话创建订阅者(Subscriber)
  • 接收消息(Message)

代码实现:

package asan.demo.jms;import java.util.Hashtable;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;public class JMSTopicReciver {    private TopicSubscriber reciver = null;    private static final String JMS_FACTORY_JNDI = "jms/jms_test_connection_factory1";    private static final String JMS_TOPIC_JNDI = "jms/jms_test_topic";    public JMSTopicReciver() {        super();    }    public void reciveMessage() {        try {            if (this.reciver == null) {                this.init();            }            System.out.println("waiting to recive message from jms topic "+JMS_TOPIC_JNDI);            while(true){                Message msg=reciver.receive();                if(msg instanceof TextMessage){                    TextMessage textMsg=(TextMessage)msg;                    System.out.println("recive jms message:"+textMsg.getText());                }            }        } catch (JMSException e) {            e.printStackTrace();        } catch (Exception ex) {            ex.printStackTrace();        }    }    //    1. 连接jms服务器    //    2. 获取连接工厂(Connection Factory)    //    3. 通过连接工厂创建主题连接(TopicConnection)    //    4. 通过主题连接创建主题会话(TopicSession)    //    5. 通过主题会话创建订阅者(Subscriber)    //    6. 接收消息(Message)    private void init() throws NamingException, JMSException {        Hashtable properties = new Hashtable();        properties.put(Context.INITIAL_CONTEXT_FACTORY,                       "weblogic.jndi.WLInitialContextFactory");        properties.put(Context.PROVIDER_URL, "t3://127.0.0.1:7101");        properties.put(Context.SECURITY_PRINCIPAL, "weblogic");        properties.put(Context.SECURITY_CREDENTIALS, "weblogic1");        InitialContext ctx = new InitialContext(properties);        TopicConnectionFactory jmsFactory =            (TopicConnectionFactory)ctx.lookup(JMS_FACTORY_JNDI);        TopicConnection jmsConn = jmsFactory.createTopicConnection();        TopicSession session = jmsConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);        Topic topic = (Topic)ctx.lookup(JMS_TOPIC_JNDI);        reciver = session.createSubscriber(topic);        jmsConn.start();    }        public static void main(String[]cmd){        JMSTopicReciver consumer=new JMSTopicReciver();        consumer.reciveMessage();    }}

运行代码,此时运行之前消息发布的代码,可以在控制台看到发送的消息

同样,稍微修改下上一篇文章的客户端程序,使整个过程看起来更清晰,修改后的客户端代码如下:

package asan.demo.jms;import java.util.Scanner;public class JMSClient {    public JMSClient() {        super();    }    public static void help() {        System.out.println("Usage:java -jar JMSClient.jar sender/reciver/topicSender/topicReciver");        System.out.println("sender:向jms队列发送消息");        System.out.println("reciver:从队列中取出消息");        System.out.println("topicSender:向jms主题发送消息");        System.out.println("topicReciver:从主题中取出消息");    }    public static void main(String[] cmd) {        if (cmd.length == 0) {            help();            return;        }        String mode = cmd[0];        if ("sender".equalsIgnoreCase(mode)) {            JMSSender sender = new JMSSender();            Scanner sc = new Scanner(System.in);            while (true) {                System.out.println("input you message(input end to exist):");                String msg = sc.nextLine();                if ("end".equalsIgnoreCase(msg)) {                    return;                }                sender.sendMessage(msg);                System.out.println("message send success");            }        } else if ("reciver".equalsIgnoreCase(mode)) {            JMSReciver consumer = new JMSReciver();            consumer.reciveMessage();        } else if ("topicSender".equalsIgnoreCase(mode)) {            JMSTopicSender sender = new JMSTopicSender();            Scanner sc = new Scanner(System.in);            while (true) {                System.out.println("input you message(input end to exist):");                String msg = sc.nextLine();                if ("end".equalsIgnoreCase(msg)) {                    return;                }                sender.sendMessage(msg);                System.out.println("message send success");            }        } else if ("topicReciver".equalsIgnoreCase(mode)) {            JMSTopicReciver consumer = new JMSTopicReciver();            consumer.reciveMessage();        }    }}

打包运行,执行以下命令将客户端作为主题发布者

java -jar JMSDemo.jar topicSender

新建窗口,执行以下命令将客户端作为主题订阅者

java -jar JMSDemo.jar topicReciver

在发布者上发送消息,在订阅者上看到收到的消息

可以再多开一个窗口,再运行一个订阅者

登录weblogic控制台,进入domain->Services->Messaging->JMS Modules->jms_test_module->jms_test_topic可以查看当前主题订阅统计信息

转载地址:http://rvnna.baihongyu.com/

你可能感兴趣的文章
ToRPC:一个双向RPC的Python实现
查看>>
我的友情链接
查看>>
nginx在reload时候报错invalid PID number
查看>>
神经网络和深度学习-第二周神经网络基础-第二节:Logistic回归
查看>>
Myeclipse代码提示及如何设置自动提示
查看>>
c/c++中保留两位有效数字
查看>>
ElasticSearch 2 (32) - 信息聚合系列之范围限定
查看>>
VS2010远程调试C#程序
查看>>
[MicroPython]TurniBit开发板DIY自动窗帘模拟系统
查看>>
由String类的Split方法所遇到的两个问题
查看>>
Python3.4 12306 2015年3月验证码识别
查看>>
从Handler.post(Runnable r)再一次梳理Android的消息机制(以及handler的内存泄露)
查看>>
windows查看端口占用
查看>>
Yii用ajax实现无刷新检索更新CListView数据
查看>>
JDBC的事务
查看>>
Io流的概述
查看>>
App 卸载记录
查看>>
JavaScript变量和作用域
查看>>
开源SIP服务器加密软件NethidPro升级
查看>>
百度页面分享插件源代码
查看>>