fanout 发送端
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class MyFanoutSender {
- private static final String MESSAGE = "my name is";
- public static void main(String[] args){
- Connection conn = null;
- Channel channel = null;
- try {
- //初始化连接,主机,端口,用户名,密码可以自己定义
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(HOST_PARAMETER.LOCAL_HOST);
- factory.setPort(HOST_PARAMETER.LOCAL_PORT);
- factory.setUsername(HOST_PARAMETER.LOCAL_USER_NAME);
- factory.setPassword(HOST_PARAMETER.LOCAL_PASSWORD);
- //创建连接
- conn = factory.newConnection();
- //创建通道
- channel = conn.createChannel();
- //定义为fanout类型的交换机
- channel.exchangeDeclare(HOST_PARAMETER.EXCHANGE_NAME, "fanout");
- //发送,指定routingkey为""
- channel.basicPublish(HOST_PARAMETER.EXCHANGE_NAME, "", null, MESSAGE.getBytes());
- System.out.println("I send a fanout massage!");
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- try {
- if(channel != null){
- channel.close();
- }
- if(conn != null){
- conn.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
fanout订阅者(接收端),可以自己多定义几个,才能看出效果
- import java.io.IOException;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- public class MyFirstFanoutReceiver {
- public static void main(String[] args){
- Connection conn = null;
- Channel channel = null;
- try {
- //初始化连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(HOST_PARAMETER.LOCAL_HOST);
- factory.setPort(HOST_PARAMETER.LOCAL_PORT);
- factory.setUsername(HOST_PARAMETER.LOCAL_USER_NAME);
- factory.setPassword(HOST_PARAMETER.LOCAL_PASSWORD);
- //创建连接
- conn = factory.newConnection();
- //创建通道
- channel = conn.createChannel();
- //声明交换机类型
- channel.exchangeDeclare(HOST_PARAMETER.EXCHANGE_NAME, "fanout");
- //声明默认的队列
- String queue = channel.queueDeclare().getQueue();
- //将队列与交换机绑定,最后一个参数为routingKey,与发送者指定的一样""
- channel.queueBind(queue, HOST_PARAMETER.EXCHANGE_NAME, "");
- //消费者
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
- throws IOException {
- System.out.println(new String(body,"utf-8")+" Tom");
- }
- };
- channel.basicConsume(queue, true, consumer);
- System.out.println("i am the first fanout receiver!");
- }catch (Exception e) {
- e.printStackTrace();
- }
- }
- }