博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
轻松搞定RabbitMQ(五)——路由选择
阅读量:5999 次
发布时间:2019-06-20

本文共 5526 字,大约阅读时间需要 18 分钟。

hot3.png

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

       在中,我们建立了一个简单的日志系统。可以广播消息给多个消费者。本篇博文,我们将添加新的特性——我们可以只订阅部分消息。比如:我们可以接收Error级别的消息写入文件。同时仍然可以在控制台打印所有日志。

Bindings(绑定)

       在上一篇博客中我们已经使用过绑定。类似下面的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");
       绑定表示转换器与队列之间的关系。可以简单的人为:队列对该转发器上的消息感兴趣。

       绑定可以设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");
       绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。

Direct exchange(直接转发)

       前面讲到我们的日志系统广播消息给所有的消费者。我们想对其扩展,根据消息的严重性来过滤消息。例如:我们希望将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。我们使用的fanout转发器,不能给我们太多的灵活性。它仅仅只是盲目的广播而已。我们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。

       在上图中,我们可以看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其他消息全部丢弃。

Multiple bindings(多重绑定)

       使用一个绑定键绑定多个队列是完全合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。

Emitting logs(发送日志)

       我们将这种模式用于日志系统,发送消息给direct类型的转发器。我们将 提供日志严重性做为绑定键。那样,接收程序可以选择性的接收严重性的消息。首先关注发送日志的代码:

像往常一样首先创建一个转换器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
       然后为发送消息做准备:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
       为了简化代码,我们假定日志的严重性是‘info’,‘warning’,‘error’中之一。

Subscribing(订阅)

       接收消息跟前面博文中的一样。我们仅需要修改一个地方:为每一个我们感兴趣的严重性的消息,创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){      channel.queueBind(queueName, EXCHANGE_NAME, severity);}

完整的例子

发送端代码(EmitLogDirect.java)

public class EmitLogDirect {	private final static String EXCHANGE_NAME = "direct_logs";	public static void main(String[] args) throws IOException {		/**		 * 创建连接连接到MabbitMQ		 */		ConnectionFactory factory = new ConnectionFactory();		// 设置MabbitMQ所在主机ip或者主机名		factory.setHost("127.0.0.1");		// 创建一个连接		Connection connection = factory.newConnection();		// 创建一个频道		Channel channel = connection.createChannel();		// 指定转发——广播		channel.exchangeDeclare(EXCHANGE_NAME, "direct");		//所有日志严重性级别		String[] severities={"error","info","warning"};		for(int i=0;i<3;i++){			String severity = severities[i%3];//每一次发送一条不同严重性的日志						// 发送的消息			String message = "Hello World"+Strings.repeat(".", i+1);			//参数1:exchange name			//参数2:routing key			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());			System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");		}		// 关闭频道和连接		channel.close();		connection.close();	}}
消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {	private static final String EXCHANGE_NAME = "direct_logs";	public static void main(String[] argv) throws IOException, InterruptedException {		ConnectionFactory factory = new ConnectionFactory();		factory.setHost("127.0.0.1");		// 打开连接和创建频道,与发送端一样		Connection connection = factory.newConnection();		final Channel channel = connection.createChannel();		channel.exchangeDeclare(EXCHANGE_NAME, "direct");		// 声明一个随机队列		String queueName = channel.queueDeclare().getQueue();		//所有日志严重性级别		String[] severities={"error","info","warning"};		for (String severity : severities) {			//关注所有级别的日志(多重绑定)			channel.queueBind(queueName, EXCHANGE_NAME, severity);		}		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");				// 创建队列消费者		final Consumer consumer = new DefaultConsumer(channel) {			  @Override			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {			    String message = new String(body, "UTF-8");			    System.out.println(" [x] Received '"  + envelope.getRoutingKey() + "':'" + message + "'");			  }			};			channel.basicConsume(queueName, true, consumer);	}}

消费者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {	private static final String EXCHANGE_NAME = "direct_logs";	public static void main(String[] argv) throws IOException, InterruptedException {		ConnectionFactory factory = new ConnectionFactory();		factory.setHost("127.0.0.1");		// 打开连接和创建频道,与发送端一样		Connection connection = factory.newConnection();		final Channel channel = connection.createChannel();		channel.exchangeDeclare(EXCHANGE_NAME, "direct");		// 声明一个随机队列		String queueName = channel.queueDeclare().getQueue();	    	    String severity="error";//只关注error级别的日志,然后记录到文件中去。	    channel.queueBind(queueName, EXCHANGE_NAME, severity);	    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");				// 创建队列消费者		final Consumer consumer = new DefaultConsumer(channel) {			  @Override			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {			    String message = new String(body, "UTF-8");			    //记录日志到文件:			    print2File( "["+ envelope.getRoutingKey() + "] "+message);			  }			};			channel.basicConsume(queueName, true, consumer);	}		private static void print2File(String msg) {		try {			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());			File file = new File(dir, logFileName + ".log");			FileOutputStream fos = new FileOutputStream(file, true);			fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());			fos.flush();			fos.close();		} catch (FileNotFoundException e) {			e.printStackTrace();		} catch (IOException e) {			e.printStackTrace();		}	}  }
       最终结果:

       罗哩罗嗦的说这么多,其实就是说了这么一件事:我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息。一个队列可以绑定多个routingKey。这就是我们今天的主题——路由选择。

转载于:https://my.oschina.net/u/2260184/blog/518474

你可能感兴趣的文章
实录分享|kubernetes 在腾讯游戏的应用实践
查看>>
文章系列:响应式JavaScript
查看>>
Angular 2.x 从 0 到 1 (二)史上最简单的 Angular2 教程
查看>>
「又拍云 Open Talk」分享:SAY清风—创业公司如何做管理
查看>>
1100名达摩院“扫地僧”加持,阿里云的下一个十年
查看>>
力荐50个最实用的免费机器学习数据集
查看>>
中国技术力量:中国技术开放日亮相QCon旧金山
查看>>
百度云磁盘CDS、对象存储BOS技术深度解析
查看>>
Microsoft宣布通过Azure Event Grid服务提供对CloudEvents的支持
查看>>
WordPress 5.2 Beta 3 发布,要求 PHP 5.6.20 以上版本
查看>>
在商城系统中使用设计模式----策略模式之在spring中使用策略模式
查看>>
odoo:开源 ERP/CRM 入门与实践
查看>>
传输控制协议(transmission control protocol)——TCP 整理
查看>>
物联网的四种计算模式
查看>>
比NGINX更快:nginx-1.15.5 vs mongols-1.2.3
查看>>
linux cat
查看>>
小程序微信支付
查看>>
winhex处理磁盘格式化
查看>>
JavaScript正则表达式——预定义类匹配常见的字符类
查看>>
linux简单常用命令
查看>>