消费者的竞争模式
允许多个并发用户处理在同一个通讯通道接收的消息。这种模式使系统能够同时处理多个邮件,以优化吞吐量,提高可扩展性和可用性,以及平衡工作负载。
背景和问题
在云中运行的应用程序,可以预计,以处理大量的请求。而不是过程的每个请求同步地,一个常用的方法是通过一个消息传送系统到该异步地处理它们的另一服务(消费者服务),以通过他们的应用程序。这种策略有助于确保在应用程序的业务逻辑没有被阻塞,而正在处理的请求。
请求的数量可以随着时间的原因有很多显著变化。突然一阵在用户活动或聚集的请求,来自多个租户未来可能会导致不可预测的工作负载。在高峰时间的系统可能需要处理许多每秒数百个请求,而在其他时间的数量可能是非常小的。此外,该工作的性质进行的处理这些请求可能是高度可变的。使用消费者服务的单个实例,可能会导致该实例成为充斥请求或消息传送系统可通过消息从应用程序来的流入被重载。为了处理这种波动的负载,该系统可以运行消费者服务的多个实例。然而这些消费者必须协调,以确保每个消息只传送给一个单个消费者。工作量也需要跨消费者被负载平衡,以防止一个实例成为瓶颈。
解决方案
使用消息队列来实现应用和消费者服务的实例之间的通信信道。在消息队列中的形式应用帖请求,以及消费者的服务实例从队列中接收消息并对其进行处理。这种方法使消费者的服务实例的同一池中从应用程序的任何实例处理消息。图 1 示出了该架构。
图1 - 使用消息队列分发工作提高到一个服务的实例
该解决方案具有以下优点:
- 它使固有的负载调平系统,可以处理由应用程序实例发送请求量很大的变化。队列充当应用程序实例和消费者服务实例,这有助于最大限度地减少对应用程序和服务实例(所描述的基于队列的负载调平模式)的可用性和响应性的影响之间的缓冲区。处理的消息,需要一些将被执行时,不会妨碍同时由消费者服务的其他实例所处理的其它消息长期运行的处理。
- 它提高了可靠性。如果一个生产者直接与消费者,而不是使用这种模式进行通信,但不监视消费者,有一个高概率的消息可能丢失或失败,如果消费者无法进行处理。在这种模式的消息不被发送给一个特定的服务实例,一个失败服务实例不会阻塞一个生产者和消息可以通过任何加工服务实例进行处理。
- 它不需要复杂的协调的消费者之间,或在生产者和消费者的实例。消息队列确保每个消息传递至少一次。
- 它是可扩展的。该系统能够动态地增加或减少消费者服务的实例的数目的消息量是波动的。
- 它可以提高弹性,如果消息队列提供事务读取操作。如果消费者服务实例能够读取和处理该消息作为一个事务操作的一部分,并且如果这种消费服务实例随后发生故障时,这种模式可以确保该消息将被返回到队列中被拾起并处理通过的另一个实例消费者服务。
问题和注意事项
在决定如何实现这个模式时,请考虑以下几点:
- 留言订购。其中消费者服务实例接收消息的顺序是无法保证的,并且不一定反映了所创建的消息中的顺序。设计系统,以确保信息的处理是幂等的,因为这将有助于消除该消息的处理顺序上的任何依赖。有关幂等的详细信息,请参阅乔纳森·奥利弗的博客幂等模式。
注意
微软 Azure 服务总线队列可以通过使用消息会先入先出消息的顺序工具保证。欲了解更多信息,请参阅消息传递模式 MSDN 上使用会话。
- 设计服务的永续性。如果系统被设计为检测和重新启动失败的服务实例中,可能有必要执行由服务实例执行作为幂等操作,以最小化被检索和处理一次以上的单个消息的影响的处理。
- 检测有害消息。格式不正确的消息,或者需要访问不可用的资源的任务,可能会造成服务实例失败。该系统应避免这样的消息被返回到队列,而是捕获和别处存储这些信息的详细信息,以便可以在需要进行分析。
- 处理结果。服务实例处理一个消息从生成该消息的应用程序逻辑完全分离,并且它们未必能够直接进行通信。如果服务实例生成必须传回给应用程序逻辑结果,该信息必须被存储在一个位置,都可以访问两个和系统必须提供某种指示时的处理已经完成,以防止应用逻辑从检索数据不全。
注意
如果您正在使用 Azure 的工作进程可能能够通过使用专用的邮件回复队列回传结果的应用程序逻辑。应用逻辑必须能够将这些结果与原来的消息关联起来。这种情况下进行了更详细的异步消息的引物进行说明。
- 扩展的信息系统。在一个大型的解决方案,一个消息队列可以是不堪重负的消息的数量,并成为系统中的瓶颈。在这种情况下,考虑分割该消息系统直接从特定制造商的信息到一个特定的队列,或使用负载平衡,以跨多个消息队列分发消息。
- 邮件系统的可靠性保障。一个可靠的消息传送系统,需要保证的是,一旦应用程序放入队列的消息,它也不会丢失。这是确保所有邮件传递至少一次重要的。
当使用这个模式
使用这种模式时:
- 工作量为一个应用程序被分成可异步运行任务。
- 任务是独立的,可以并行地运行。
- 工作容积变化很大,需要一个可扩展的解决方案。
- 该解决方案必须提供高可用性,并且如果处理一个任务失败必须是有弹性的。
这种模式可能不适合时:
- 它是不容易的应用程序的工作负荷分离成离散的任务,或有任务之间的依赖程度高。
- 任务必须同步进行,而应用逻辑必须等待任务完成后再继续。
- 任务必须以特定的顺序来执行。
Note
有些邮件系统支持会话,使生产者对消息进行分组在一起,并确保它们都被同一个接收者处理。这个机制可以与优先消息使用(如果它们支持)来实现消息排序的一种形式,在顺序从生产者传送消息到单个消费者。
例子
Azure 提供存储队列和服务总线队列,可作为一个合适的机制来实现这种模式。应用逻辑可以发布消息到一个队列,而消费者实现为在一个或多个角色的任务可以检索从这个队列中的消息并进行处理。对于弹性,一个服务总线队列使得消费者使用 PeekLock 模式,当它从队列检索消息。这种模式实际上不是删除消息,而只是从其他消费者隐藏它。当处理完它原来的用户可以删除该邮件。如果消费者要失败,偷看锁将超时,消息将再次变得可见,让消费者又找回它。
Note
有关使用 Azure 的服务总线队列的详细信息,请参阅服务总线队列,主题和 MSDN 上的订阅。有关使用 Azure 存储队列的信息,请参阅如何 MSDN 上使用队列存储服务。
从可供下载的例子 CompetingConsumers 解决方案的 QueueManager 类下面的代码显示了本指南说明了如何通过在网络或辅助角色开始的事件处理程序使用 QueueClient 实例中创建一个队列。
private string queueName = ...;
private string connectionString = ...;
...
public async Task Start()
{
// Check if the queue already exists.
var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
if (!manager.QueueExists(this.queueName))
{
var queueDescription = new QueueDescription(this.queueName);
// Set the maximum delivery count for messages in the queue. A message
// is automatically dead-lettered after this number of deliveries. The
// default value for dead letter count is 10.
queueDescription.MaxDeliveryCount = 3;
await manager.CreateQueueAsync(queueDescription);
}
...
// Create the queue client. By default the PeekLock method is used.
this.client = QueueClient.CreateFromConnectionString(
this.connectionString, this.queueName);
}
下面的代码片段显示了一个应用程序如何创建和发送一批消息队列。
public async Task SendMessagesAsync()
{
// Simulate sending a batch of messages to the queue.
var messages = new List<BrokeredMessage>();
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
messages.Add(message);
}
await this.client.SendBatchAsync(messages);
}
下面的代码显示了如何消费服务实例可以从队列中下一个事件驱动的方式接收消息。该 processMessageTask 参数的 ReceiveMessages 法为代表,它引用在收到消息时运行的代码。此代码是异步运行。
private ManualResetEvent pauseProcessingEvent;
...
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
// Set up the options for the message pump.
var options = new OnMessageOptions();
// When AutoComplete is disabled it is necessary to manually
// complete or abandon the messages and handle any errors.
options.AutoComplete = false;
options.MaxConcurrentCalls = 10;
options.ExceptionReceived += this.OptionsOnExceptionReceived;
// Use of the Service Bus OnMessage message pump.
// The OnMessage method must be called once, otherwise an exception will occur.
this.client.OnMessageAsync(
async (msg) =>
{
// Will block the current thread if Stop is called.
this.pauseProcessingEvent.WaitOne();
// Execute processing task here.
await processMessageTask(msg);
},
options);
}
...
private void OptionsOnExceptionReceived(object sender,
ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
...
}
需要注意的是自动缩放的功能,例如可在天青,可用于启动和停止的角色实例的队列长度的波动。欲了解更多信息,请参阅自动缩放指导。另外,没有必要维持角色实例和工人之间的一对一的对应过程,单个角色实例可以实现多个工作进程。欲了解更多信息,请参阅计算资源整合模式。