activemq端口怎么调 activemq详解

一、下载与安装
直接去官网(
http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了 。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)
二、访问控制台
在浏览器输入:http://ip:8161/admin/,出现如下界面表示启动成功,默认的用户名密码都是admin

activemq端口怎么调 activemq详解


三、修改端口号
61616为对外服务端口号
8161为控制器端口号
当端口号冲突时,可以修改这两个端口号 。cd conf ,修改activemq.xml 修改里面的61616端口 。修改jetty.xml,修改里面的8161端口 。
queue队列模式:
【activemq端口怎么调 activemq详解】和rabbitmq简单队列模式一样,若是有多个消费者消费同一个队列中的消息的话,默认也是轮询机制的消费
示例代码:
public class Productor {public static final String BORKER_URL = "tcp://127.0.0.1:61616";public static final String QUEUE_NAME = "queue1";public static void main(String[] args) throws JMSException {//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);//创建tcp连接Connection connection = factory.createConnection();//建立连接connection.start();/*** 创建会话,1.是否开启事务,2.签收模式*/Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建队列(消息的目的地)Queue queue = session.createQueue(QUEUE_NAME);//创建生产者MessageProducer producer = session.createProducer(queue);//消息非持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息持久化 默认是持久化的//producer.setDeliveryMode(DeliveryMode.PERSISTENT);//创建消息TextMessage message = session.createTextMessage("你好吗");//发送消息producer.send(message);producer.close();session.close();connection.close();System.out.println("发送成功!");}}public class Consumer {public static final String BORKER_URL = "tcp://127.0.0.1:61616";public static final String QUEUE_NAME = "queue1";public static void main(String[] args) throws JMSException {//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);//创建tcp连接Connection connection = factory.createConnection();//建立连接connection.start();/*** 创建会话,1.是否开启事务,2.签收模式*/Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建/声明队列(消息的目的地)Queue queue = session.createQueue(QUEUE_NAME);//创建消费者MessageConsumer consumer = session.createConsumer(queue);/*while (true) {//receive会阻塞线程TextMessage message = (TextMessage)consumer.receive();System.out.println("接收到消息:" + message.getText());}*///监听的方式消费consumer.setMessageListener(message -> {TextMessage textMessage = (TextMessage)message;try {System.out.println("1号接收到消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}});}}topic队列模式:
称为发布订阅模式,生产者把消息发送给订阅给某个topic主题的消费者,是分发的模式,这种模式默认需要先启动消费者,不然就算生产者发布了某个topic主题的消息,消费者也消费不了;除非消费者提前订阅,并且做了消息持久化的处理,这样后启动消费者才能消费提前推送的消息 。
代码:
public class Productor {public static final String BORKER_URL = "tcp://127.0.0.1:61616";public static final String TOPIC_NAME = "topic1";public static void main(String[] args) throws JMSException {//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);//异步投递factory.setUseAsyncSend(true);//创建tcp连接Connection connection = factory.createConnection();/*** 创建会话,1.是否开启事务,2.签收模式*/Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建/声明topic(消息的目的地)Topic topic = session.createTopic(TOPIC_NAME);//创建生产者ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);//持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);//建立连接connection.start();//创建消息TextMessage message = session.createTextMessage("你好吗");//发送消息,异步发送回调函数producer.send(message, new AsyncCallback() {@Overridepublic void onSuccess() {System.out.println("success");}@Overridepublic void onException(JMSException e) {System.out.println("fail");}});producer.close();session.close();connection.close();System.out.println("发送成功!");}}public class Consumer1 {public static final String BORKER_URL = "tcp://127.0.0.1:61616";public static final String TOPIC_NAME = "topic1";public static void main(String[] args) throws JMSException {//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);//创建tcp连接Connection connection = factory.createConnection();//制定clientIdconnection.setClientID("my");/*** 创建会话,1.是否开启事务,2.签收模式*/Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建/声明topic(消息的目的地)Topic topic = session.createTopic(TOPIC_NAME);//订阅主题TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");//建立连接connection.start();while (true) {//receive会阻塞线程//接收订阅的消息TextMessage message = (TextMessage) subscriber.receive();System.out.println("接收到消息:" + message.getText());}/*//创建消费者MessageConsumer consumer = session.createConsumer(topic);//建立连接connection.start();*//*while (true) {//receive会阻塞线程TextMessage message = (TextMessage)consumer.receive();System.out.println("接收到消息:" + message.getText());}*//*//监听的方式消费consumer.setMessageListener(message -> {TextMessage textMessage = (TextMessage)message;try {System.out.println("1号接收到消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}});*/}}

推荐阅读