创建ActiveMQ连接:
import JAVAx.jms.*;
import org.Apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void mAIn(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 启动连接
connection.start();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建ActiveMQ会话:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建ActiveMQ队列:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
创建ActiveMQ主题:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
发送消息到队列:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
发送消息到主题:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收队列中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收主题中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
发布-订阅模式中的发布者:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
发布-订阅模式中的订阅者:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
使用消息监听器接收队列中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
// 注册消息监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Received message: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 启动连接
connection.start();
// 等待接收消息
Thread.sleep(5000);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
使用消息监听器接收主题中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 注册消息监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Received message: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 启动连接
connection.start();
// 等待接收消息
Thread.sleep(5000);
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
使用消息选择器接收队列中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(queue, "age > 18");
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
使用消息选择器接收主题中的消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("myTopic");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(topic, "age > 18");
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
使用事务发送消息:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
public static void main(String[] args) {
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 创建队列
Queue queue = session.createQueue("myQueue");
// 创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 创建消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
// 提交事务
session.commit();
// 关闭会话
session.close();
// 关闭连接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
这些示例覆盖了ActiveMQ的常用用法,包括创建连接、创建会话、创建队列和主题、发送和接收消息等。你可以根据自己的需求选择相应的示例进行参考和使用。