ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

迷思 2017-12-21

很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便订阅该事件消息的消费者能够对消息数据做进一步处理。让我们回顾一下微服务之间通信的几种方式,分为同步和异步两种。同步通信最常见的就是RESTful API,而且非常简单轻量,一个Request/Response回环就结束了;异步通信最常见的就是通过消息渠道,将载有特殊意义的数据的事件消息发送到消息渠道,而对某种类型消息感兴趣的消费者,就可以获取消息中所带信息并执行相应操作,这也是我们比较熟知的事件驱动架构的一种表现形式。虽然事件驱动型架构看起来非常复杂,从微服务的实现来看显得有些繁重,但它的应用范围确实很广,也为服务间通信提供了新的思路。了解DDD的朋友相信一定知道CQRS体系结构模式,它就是一种事件驱动型架构。事实上,实现一套完整的、安全的、稳定的、正确的事件驱动架构并不简单,由于异步特性带来的一致性问题会非常棘手,甚至需要借助一些基础结构层工具(比如关系型数据库,不错!只能是关系型数据库)来解决一些特殊问题。本文就打算带领大家一起探探路,基于ASP.NET Core Web API实现一个相对比较简单的事件驱动架构,然后引出一些有待深入思考的问题,留在今后的文章中继续讨论。或许,本文所引入的源代码无法直接用于生产环境,但我希望本文介绍的内容能够给到读者一些启发,并能够帮助解决实际中遇到的问题。

术语约定

本文会涉及一些相关的专业术语,在此先作约定:

  • 事件:在某一特定时刻发生在某件事物上的一件事情,例如:在我撰写本文的时候,电话铃响了
  • 消息:承载事件数据的实体。事件的序列化/反序列化和传输都以消息的形式进行
  • 消息通信渠道:一种带有消息路由功能的数据传输机制,用以在消息的派发器和订阅器之间进行数据传输

注意:为了迎合描述的需要,在下文中可能会混用事件和消息两个概念。

一个简单的设计

先从简单的设计开始,基本上事件驱动型架构会有事件消息(Events)、事件订阅器(Event Subscriber)、事件派发器(Event Publisher)、事件处理器(Event Handler)以及事件总线(Event Bus)等主要组件,它们之间的关系大致如下:

首先,IEvent接口定义了事件消息(更确切地说,数据)的基本结构,几乎所有的事件都会有一个唯一标识符(Id)和一个事件发生的时间(Timestamp),这个时间通常使用UTC时间作为标准。IEventHandler定义了事件处理器接口,显而易见,它包含两个方法:CanHandle方法,用以确定传入的事件对象是否可被当前处理器所处理,以及Handle方法,它定义了事件的处理过程。IEvent和IEventHandler构成了事件处理的基本元素。

然后就是IEventSubscriber与IEventPublisher接口。前者表示实现该接口的类型为事件订阅器,它负责事件处理器的注册,并侦听来自事件通信渠道上的消息,一旦所获得的消息能够被某个处理器处理,它就会指派该处理器对接收到的消息进行处理。因此,IEventSubscriber会保持着对事件处理器的引用;而对于实现了IEventPublisher接口的事件派发器而言,它的主要任务就是将事件消息发送到消息通信渠道,以便订阅端能够获得消息并进行处理。

IEventBus接口表示消息通信渠道,也就是大家所熟知的消息总线的概念。它不仅具有消息订阅的功能,而且还具有消息派发的能力,因此,它会同时继承于IEventSubscriber和IEventPublisher接口。在上面的设计中,通过接口分离消息总线的订阅器和派发器的角色是很有必要的,因为两种角色的各自职责不一样,这样的设计同时满足SOLID中的SRP和ISP两个准则。

基于以上基础模型,我们可以很快地将这个对象关系模型转换为C#代码:

public interface IEvent
{
    Guid Id { get; }
    DateTime Timestamp { get; }
}

public interface IEventHandler
{
    Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default);
    bool CanHandle(IEvent @event);
}

public interface IEventHandler<in T> : IEventHandler
    where T : IEvent
{
    Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default);
}

public interface IEventPublisher : IDisposable
{
    Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent;
}

public interface IEventSubscriber : IDisposable
{
    void Subscribe();
}

public interface IEventBus : IEventPublisher, IEventSubscriber { }

短短30行代码,就把我们的基本对象关系描述清楚了。对于上面的代码我们需要注意以下几点:

  1. 这段代码使用了C# 7.1的新特性(default关键字)
  2. Publish以及Handle方法被替换为支持异步调用的PublishAsync和HandleAsync方法,它们会返回Task对象,这样可以方便使用C#中async/await的编程模型
  3. 由于我们的这个模型可以作为实现消息系统的通用模型,并且会需要用到ASP.NET Core的项目中,因此,建议将这些接口的定义放在一个独立的NetStandard的Class Library中,方便今后重用和扩展

OK,接口定义好了。实现呢?下面,我们实现一个非常简单的消息总线:PassThroughEventBus。在今后的文章中,我还会介绍如何基于RabbitMQ和Azure Service Bus实现不一样的消息总线。

PassThroughEventBus

顾名思义,PassThroughEventBus表示当有消息被派发到消息总线时,消息总线将不做任何处理与路由,而是直接将消息推送到订阅方。在订阅方的事件监听函数中,会通过已经注册的事件处理器对接收到的消息进行处理。整个过程并不会依赖于任何外部组件,不需要引用额外的开发库,只是利用现有的.NET数据结构来模拟消息的派发和订阅过程。因此,PassThroughEventBus不具备容错和消息重发功能,不具备消息存储和路由功能,我们先实现这样一个简单的消息总线,来体验事件驱动型架构的设计过程。

我们可以使用.NET中的Queue或者ConcurrentQueue等基本数据结构来作为消息队列的实现,与这些基本的数据结构相比,消息队列本身有它自己的职责,它需要在消息被推送进队列的同时通知调用方。当然,PassThroughEventBus不需要依赖于Queue或者ConcurrentQueue,它所要做的事情就是模拟一个消息队列,当消息推送进来的时候,立刻通知订阅方进行处理。同样,为了分离职责,我们可以引入一个EventQueue的实现(如下),从而将消息推送和路由的职责(基础结构层的职责)从消息总线中分离出来。

internal sealed class EventQueue
{
    public event System.EventHandler<EventProcessedEventArgs> EventPushed;

    public EventQueue() { }

    public void Push(IEvent @event)
    {
        OnMessagePushed(new EventProcessedEventArgs(@event));
    }

    private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e);
}

EventQueue中最主要的方法就是Push方法,从上面的代码可以看到,当EventQueue的Push方法被调用时,它将立刻触发EventPushed事件,它是一个.NET事件,用以通知EventQueue对象的订阅者,消息已经被派发。整个EventQueue的实现非常简单,我们仅专注于事件的路由,完全没有考虑任何额外的事情。

接下来,就是利用EventQueue来实现PassThroughEventBus。毫无悬念,PassThroughEventBus需要实现IEventBus接口,它的两个基本操作分别是Publish和Subscribe。在Publish方法中,会将传入的事件消息转发到EventQueue上,而Subscribe方法则会订阅EventQueue.EventPushed事件(.NET事件),而在EventPushed事件处理过程中,会从所有已注册的事件处理器(Event Handlers)中找到能够处理所接收到的事件,并对其进行处理。整个流程还是非常清晰的。以下便是PassThroughEventBus的实现代码:

public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly IEnumerable<IEventHandler> eventHandlers;

    public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers)
    {
        this.eventHandlers = eventHandlers;
    }

    private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => (from eh in this.eventHandlers
            where eh.CanHandle(e.Event)
            select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event));

    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));

    public void Subscribe()
        => eventQueue.EventPushed += EventQueue_EventPushed;


    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
            }

            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);
    #endregion
}

实现过程非常简单,当然,从这些代码也可以更清楚地了解到,PassThroughEventBus不做任何路由处理,更不会依赖于一个基础结构设施(比如实现了AMQP的消息队列),因此,不要指望能够在生产环境中使用它。不过,目前来看,它对于我们接下来要讨论的事情还是会很有帮助的,至少在我们引入基于RabbitMQ等实现的消息总线之前。

同样地,请将PassThroughEventBus实现在另一个NetStandard的Class Library中,虽然它不需要额外的依赖,但它毕竟是众多消息总线中的一种,将它从接口定义的程序集中剥离开来,好处有两点:第一,保证了定义接口的程序集的纯净度,使得该程序集不需要依赖任何外部组件,并确保了该程序集的职责单一性,即为消息系统的实现提供基础类库;第二,将PassThroughEventBus置于独立的程序集中,有利于调用方针对IEventBus进行技术选择,比如,如果开发者选择使用基于RabbitMQ的实现,那么,只需要引用基于RabbitMQ实现IEventBus接口的程序集就可以了,而无需引用包含了PassThroughEventBus的程序集。这一点我觉得可以归纳为框架设计中“隔离依赖关系(Dependency Segregation)”的准则。

好了,基本组件都定义好了,接下来,让我们一起基于ASP.NET Core Web API来做一个RESTful服务,并接入上面的消息总线机制,实现消息的派发和订阅。

Customer RESTful API

我们仍然以客户管理的RESTful API为例子,不过,我们不会过多地讨论如何去实现管理客户信息的RESTful服务,那并不是本文的重点。作为一个案例,我使用ASP.NET Core 2.0 Web API建立了这个服务,使用Visual Studio 2017 15.5做开发,并在CustomersController中使用Dapper来对客户信息CRUD。后台基于SQL Server 2017 Express Edition,使用SQL Server Management Studio能够让我方便地查看数据库操作的结果。

RESTful API的实现

假设我们的客户信息只包含客户ID和名称,下面的CustomersController代码展示了我们的RESTful服务是如何保存并读取客户信息的。当然,我已经将本文的代码通过Github开源,开源协议为MIT,虽然商业友好,但毕竟是案例代码没有经过测试,所以请谨慎使用。本文源代码的使用我会在文末介绍。

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;

    public CustomersController(IConfiguration configuration)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
    }


    // 获取指定ID的客户信息
    [HttpGet("{id}")]
    public async Task<IActionResult> Get(Guid id)
    {
        const string sql = "SELECT [CustomerId] AS Id, [CustomerName] AS Name FROM [dbo].[Customers] WHERE [CustomerId]=@id";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = await connection.QueryFirstOrDefaultAsync<Model.Customer>(sql, new { id });
            if (customer == null)
            {
                return NotFound();
            }

            return Ok(customer);
        }
    }

    // 创建新的客户信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
}

代码一如既往的简单,Web API控制器通过Dapper简单地实现了客户信息的创建和返回。我们不妨测试一下,使用下面的Invoke-RestMethod PowerShell指令,发送Post请求,通过上面的Create方法创建一个用户:

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

可以看到,response中已经返回了新建客户的ID号。接下来,继续使用Invoke-RestMethod来获取新建客户的详细信息:

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

OK,API调试完全没有问题。下面,我们将这个案例再扩充一下,我们希望这个API在完成客户信息创建的同时,向外界发送一条“客户信息已创建”的事件,并设置一个事件处理器,负责将该事件的详细内容保存到数据库中。

加入事件总线和消息处理机制

首先,我们在ASP.NET Core Web API项目上,添加对以上两个程序集的引用,然后,按常规做法,在ConfigureServices方法中,将PassThroughEventBus添加到IoC容器中:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

在此,将事件总线注册为单例(Singleton)服务,是因为它不保存状态。理论上讲,使用单例服务时,需要特别注意服务实例对象的生命周期管理,因为它的生命周期是整个应用程序级别,在程序运行的过程中,由其引用的对象资源将无法释放,因此,当程序结束运行时,需要合理地将这些资源dispose掉。好在ASP.NET Core的依赖注入框架中已经帮我们处理过了,因此,对于上面的PassThroughEventBus单例注册,我们不需要过多担心,程序执行结束并正常退出时,依赖注入框架会自动帮我们dispose掉PassThroughEventBus的单例实例。那么对于单例实例来说,我们是否只需要通过AddSingleton方法进行注册就可以了,而无需关注它是否真的被dispose了呢?答案是否定的,有兴趣的读者可以参考微软的官方文档,在下一篇文章中我会对这部分内容做些介绍。

接下来,我们需要定义一个CustomerCreatedEvent对象,表示“客户信息已经创建”这一事件信息,同时,再定义一个CustomerCreatedEventHandler事件处理器,用来处理从PassThroughEventBus接收到的事件消息。代码如下,当然也很简单:

public class CustomerCreatedEvent : IEvent
{
    public CustomerCreatedEvent(string customerName)
    {
        this.Id = Guid.NewGuid();
        this.Timestamp = DateTime.UtcNow;
        this.CustomerName = customerName;
    }

    public Guid Id { get; }

    public DateTime Timestamp { get; }

    public string CustomerName { get; }
}

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));

    public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(true);
    

    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

两者分别实现了我们最开始定义好的IEvent和IEventHandler接口。在CustomerCreatedEventHandler类的第一个HandleAsync重载方法中,我们暂且让它简单地返回一个true值,表示事件处理成功。下面要做的事情就是,在客户信息创建成功后,向事件总线发送CustomerCreatedEvent事件,以及在ASP.NET Core Web API程序启动的时候,注册CustomerCreatedEventHandler实例,并调用事件总线的Subscribe方法,使其开始侦听事件的派发行为。

于是,CustomerController需要依赖IEventBus,并且在CustomerController.Create方法中,需要通过调用IEventBus的Publish方法将事件发送出去。现对CustomerController的实现做一些调整,调整后代码如下:

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;
    private readonly IEventBus eventBus;

    public CustomersController(IConfiguration configuration,
        IEventBus eventBus)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
        this.eventBus = eventBus;
    }

    // 创建新的客户信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            await this.eventBus.PublishAsync(new CustomerCreatedEvent(name));

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
    
    // Get方法暂且省略
}

然后,修改Startup.cs中的ConfigureServices方法,将CustomerCreatedEventHandler注册进来:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

并且调用Subscribe方法,开始侦听消息总线:

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe();

    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseMvc();
}

OK,现在让我们在CustomerCreatedEventHandler的HandleAsync方法上设置个断点,按下F5启用Visual Studio 2017调试,然后重新使用Invoke-RestMethod命令发送一个Post请求,可以看到,HandleAsync方法上的断点被命中,同时事件已被正确派发:

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

数据库中的数据也被正确更新:

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

目前还差最后一小步,就是在HandleAsync中,将CustomerCreatedEvent对象的数据序列化并保存到数据库中。当然这也不难,同样可以考虑使用Dapper,或者直接使用ADO.NET,甚至使用比较重量级的Entity Framework Core,都可以实现。那就在此将这个问题留给感兴趣的读者朋友自己搞定啦。

小结

到这里基本上本文的内容也就告一段落了,回顾一下,本文一开始就提出了一种相对简单的消息系统和事件驱动型架构的设计模型,并实现了一个最简单的事件总线:PassThroughEventBus。随后,结合一个实际的ASP.NET Core Web API案例,了解了在RESTful API中实现事件消息派发和订阅的过程,并实现了在事件处理器中,对获得的事件消息进行处理。

然而,我们还有很多问题需要更深入地思考,比如:

  • 如果事件处理器需要依赖基础结构层组件,依赖关系如何管理?组件生命周期如何管理?
  • 如何实现基于RabbitMQ或者Azure Service Bus的事件总线?
  • 如果在数据库更新成功后,事件发送失败怎么办?
  • 如何保证事件处理的顺序?

等等。。。在接下来的文章中,我会尽力做更详细的介绍。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_1这个tag,如下:

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

接下来还将会有chapter_2、chapter_3等这些tag,对应本系列文章的第二部分、第三部分等等。敬请期待。

相关推荐