jiangxuege 2019-06-27
本文我们将通过RabbitMQ和AMQP协议在Go微服务之间进行消息传递。
微服务是将应用程序的业务领域分离成具有清晰分离域的边界上下文,运行进程分离,其中任何跨域边界的持久关系必须依赖最终一致性,而不是类似于ACID事务或外键约束。其中很多概念都来自域驱动设计,或受其启发。领域驱动设计是另外一个很大的话题,足以用一个文章系列来介绍。
在我们Go语言微服务博客系列的上下文和微服务大体架构中,实现服务间的松耦合的一种模式是使用消息传递来进行服务间通信,不需要严格的请求/响应消息交换或类似的消息交换。也就是说,使用消息传递只是便于服务间松耦合的众多策略中的一种。
在Spring Cloud中,RabbitMQ似乎是选择的消息中间人(代理), 特别是因为在第八部分中我们看到的,Spring Cloud Config服务器具有RabbitMQ运行时依赖。
本文中,将会让accountservice服务每当读取特殊账号对象时,就在RabbitMQ exchange上放一条消息。这个消息会被一个我们本文所实现的全新微服务消费。我们也将处理Go代码在多微服务间的复用问题,将多服务复用代码放在common类库中,这样每个微服务都可以import它。
还记得我们在第一部分中的系统景观的图片吗? 下面是在本部分完成之后看起来的样子:
依然还有很多元素尚未实现。 不要担心,我们慢慢都会做到的。
这一部分有很多源代码,本文不会包含所有代码。 要查看完整代码,可克隆并切换到P9分支,或者直接查看https://github.com/callistaen...。
我们将实现一个简单的虚构(make-believe)用例: 当特定VIP账号在读取accountservice服务时,我们希望通知一个vip offer服务,在某些情况下,它将为账户持有人产生"offer"。在适当设计的领域模型中,账户对象和VIP offer对象时两个独立领域,它们应该尽可能少的互相了解。
换言之,accountservice不能直接访问VIP服务的存储。这个例子中,我们通过RabbitMQ传递一个消息给vipservice, 完全将业务逻辑和持久化都委托给vipservice。
我们将使用AMQP协议做所有通信,这个协议是面向互操作性消息传递的ISO标准应用程序层协议。我们的选择使用的Go类库是streadway/amqp, 类似在第八部分中我们消费配置更新时候使用的。
让我们重复在AMQP中exchange和publisher, consumer和queue之间的关系:
也就是说消息被发布到exchange, 然后将消息副本基于路由规则和可能已经注册消费者的绑定分布到queue。在quora.com网站上的这个帖子对这个话题进行了很好的解释。
Thread vs Post: 在论坛中,常用Thread和Post代指某些东西。但是这两者有什么区别呢?
通俗的讲Thread就是论坛中最初发起的某个主题的话题, 包含很多Post(A thread is a group of posts on a single topic.)。中文社区通常所谓的楼主发的第一个东西。 而Post则是对楼主最初发的内容做的回复或跟帖。
参考链接: https://www.drupal.org/projec...。
现实中的(Quora中的答案)例子:
假设你在Apple商店里边,先要买耳机。 店里就会有人过来问你:"需要什么?" 你告诉他你需要买耳机,然后他就把你带到他的同事的柜台前的排队队列之后等待。因为很多其他人也在买东西,销售员正在处理队列前面的那个消费者。 如果这个时候,另外一个人进店了,刚才招呼你的人会同样询问对方需要什么帮助。刚进来的人需要修下手机,被找呼的人带到了另外一个修理手机的柜台等待了。
这个例子中问你需要什么的人就是exchange, 他会根据需要把你路由到恰当的队列中排队等待。在队列的后面有很多员工,也就是对应队列的worker, 或者消费者。一次处理一个请求,基于先进先出的原则。也可能会根据最先到的人做一个简单轮询。
如果店里没有导流的服务员,那么你就需要来回在每个柜台前来回问是否能帮到你,直到找到你需要办理业务的柜台后开始排队。
当然,导航苹果商店的工作不复杂,但在应用程序中,你可能有很多队列,服务不同类型的请求,基于路由和绑定具有交换路由消息的键来说非常有帮助。 发布者只需要关心添加正确的路由密匙,而消费者只需要关心用正确的绑定密匙创建正确的队列,就可以做到"我对这些消息感兴趣。"
既然我们需要在accountservice和vipservice中使用消息传递代码和从Spring Cloud Config服务器上加载配置的代码,我们可以创建可共享的库。
我们在goblog目录下面创建一个common目录来保存我们可复用的东西:
mkdir -p common/messaging mkdir -p common/config
我们将所有AMQP相关的代码放在messaging目录,配置相关的放在config目录。这样你可以把之前的goblog/accountservice/config中的代码移到common/config目录中,并相应的修改import语句中的代码位置。可以看看已完成代码看它是如何支持的。
消息传递代码在单独文件中封装起来,里边定义了我们应用将用于连接、发布和订阅的接口以及具体实现。老实说,对于使用streadway/amqp的AMQP消息传递来说有很多样板代码,因此无需在意代码的实现细节。
在common/messaging/下面创建一个messagingclient.go文件:
package messaging import ( "github.com/streadway/amqp" "fmt" "log" ) // Defines our interface for connecting and consuming messages. type IMessagingClient interface { ConnectToBroker(connectionString string) Publish(msg []byte, exchangeName string, exchangeType string) error PublishOnQueue(msg []byte, queueName string) error Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error Close() } // Real implementation, encapsulates a pointer to an amqp.Connection type MessagingClient struct { conn *amqp.Connection }
上面代码片段,定义了messaging的接口。 这就是accountservice和vipservice需要消息传递的时候需要使用它们进行处理的,希望能从很多复杂的东西里边抽象出来。注意我已经选择两种变体"Product"和"Consume"来使用topics和direct/queue消息模式。
接下来,我们定义了一个保存amqp.Connection指针的结构体,我们会将必要的方法绑定到它上面(隐式的,因为Go语言中都是这样干的), 这样就实现了我们声明的接口。
func (m *MessagingClient) ConnectToBroker(connectionString string) { if connectionString == "" { panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?") } var err error m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString)) if err != nil { panic("Failed to connect to AMQP compatible broker at: " + connectionString) } } func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error { if m.conn == nil { panic("Tried to send message before connection was initialized. Don't do that.") } ch, err := m.conn.Channel() // Get a channel from the connection defer ch.Close() queue, err := ch.QueueDeclare(// Declare a queue that will be created if not exists with some args queueName, // our queue name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) // Publishes a message onto the queue. err = ch.Publish( "", // exchange queue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, // Our JSON body as []byte }) fmt.Printf("A message was sent to queue %v: %v", queueName, body) return err }
ConnectToBroker中展示了我们如何获取连接指针的,例如amqp.Dial方法。如果我们没有配置或者无法连接我们的broker, 会panic我们的微服务,容器编排会尝试使用新实例重新尝试。 传入的连接字符串就像这样:
amqp://guest:guest@rabbitmq:5672/
注意我们现在使用的是Docker Swarm模式下的RabbitMQ broker的服务名。
PublishOnQueue()函数相当长,它或多或少是从官方例子派生过来的,这里我对其进行了简化,带比较少的参数。要发布消息到命名队列,我们需要传入的参数有:
要了解更多exchange的详情,可以参考RabbitMQ的官方文档。
PublishOnQueue()方法样本代码使用的很重,但是很容易理解。声明队列(如果不存在就创建它), 然后发布我们的[]byte消息到它里边。发布消息到命名exchange更加复杂,它需要样板代码首先声明一个exchange,一个队列,然后实现将它们绑定一起的代码。 详细请查看完整代码。
继续,实际使用我们MessagingClient的是在goblog/accountservice/service/handlers.go中,因此我们添加一个字段,并硬编码检查是否为VIP, 然后如果请求账号id是10000的话,我们就发送一个消息传递。
var DBClient dbclient.IBoltClient var MessagingClient messaging.IMessagingClient // 添加新行 var isHealthy = true func GetAccount(w http.ResponseWriter, r *http.Request) { // Read the 'accountId' path parameter from the mux map var accountId = mux.Vars(r)["accountId"] // Read the account struct BoltDB account, err := DBClient.QueryAccount(accountId) account.ServedBy = util.GetIP() // If err, return a 404 if err != nil { fmt.Println("Some error occured serving " + accountId + ": " + err.Error()) w.WriteHeader(http.StatusNotFound) return } notifyVIP(account) // 添加新行 同时发送VIP通知。 // NEW call the quotes-service quote, err := getQuote() if err == nil { account.Quote = quote } // If found, marshal into JSON, write headers and content data, _ := json.Marshal(account) writeJsonResponse(w, http.StatusOK, data) } // If our hard-coded "VIP" account, spawn a goroutine to send a message. func notifyVIP(account model.Account) { if account.Id == "10000" { go func(account model.Account) { vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()} data, _ := json.Marshal(vipNotification) fmt.Printf("Notifying VIP account %v\n", account.Id) err := MessagingClient.PublishOnQueue(data, "vip_queue") if err != nil { fmt.Println(err.Error()) } }(account) } }
借此机会,我们展示调用新goroutine的内联匿名函数, 也就是说使用了go关键词的。既然我们没有什么理由在发送消息传递的时候需要阻塞执行HTTP处理的主goroutine, 那么这种情况就是使用goroutine实现并行的最佳时机。
main.go文件也需要更新一点代码以便可以在启动的时候使用加载的并注入到Viper中的配置来初始化AMQ连接。
... func main() { fmt.Printf("Starting %v\n", appName) config.LoadConfigurationFromBranch( viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeBoltClient() initializeMessaging() // 新增行,初始化消息传递 handleSigterm(func() { service.MessagingClient.Close() }) service.StartWebServer(viper.GetString("server_port")) } func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'amqp_server_url' set in configuration, cannot start") } service.MessagingClient = &messaging.MessagingClient{} service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) } ...
没有什么大不了的东西 - 我们创建一个空的MessagingClient实例并将其地址赋值给service.MessagingClient, 然后使用配置amqp_server_url来调用ConnectToBroker方法。如果配置中没有broker_url,我们就panic()退出,因为我们不希望在甚至都没有可能连接到broker的情况下运行服务。
如果成功的连接到broker, 那么我们就调用Subscribe方法来订阅由配置指定的topic。
我们在我们的.yml配置文件中添加amqp_broker_url
属性到第八部分中的配置文件中,这些东西已经没有人管了。
broker_url: amqp://guest:[email protected]:5672 _(dev)_ broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_
注意test profile, 我们使用的是Swarm服务名"rabbitmq", 而不是我笔记本上看到的Swarm的网络IP地址。(你实际的IP地址可能会变化,192.168.99.100似乎是运行Docker Toolbox的标准IP)。
配置文件中使用明文的用户名和密码是不推荐的,在现实生活中,我们一般会使用第八部分中看到的Spring Cloud Config服务器内置的加密特性。
当然,我们应该至少编写一个单元测试,确保我们handlers.go中的GetAccount函数当某人请求神奇的并非常特殊的账号标识为10000的账号时尝试发送一个消息。为此,我们需要模拟IMessagingClient和handlers_test.go中添加新的测试用例实现。让我们开始模拟吧。 这次我们将使用第三方工具mockery来产生IMessagingClient接口的实现:(记住在命令行运行这些命令的时候使用恰当的GOPATH设置)。
> go get github.com/vektra/mockery/.../ > cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging > ./$GOPATH/bin/mockery -all -output . Generating mock for: IMessagingClient
我们现在在当前目录有一个IMessagingClient.go模拟文件。 我不太喜欢这样的文件名字,不喜欢驼峰,所以我将它重命名为一个明显的东西,它模拟并遵循本博客系列中文件名的约定。
mv IMessagingClient.go mockmessagingclient.go
可能需要调整一般文件中的import语句,删除import别名。 除了那些,我们使用一个黑盒方式来达到这个特殊模拟 - 仅假设它在我们开始写测试的时候会工作。
请随意检查生成的模拟实现的源代码,它非常类似我们之前第四部分中手工写的东西。
切到handlers_test.go,我们添加一个新的测试用例:
// declare mock types to make test code a bit more readable var anyString = mock.AnythingOfType("string") var anyByteArray = mock.AnythingOfType("[]uint8") // == []byte func TestNotificationIsSentForVIPAccount(t *testing.T) { // Set up the DB client mock mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil) DBClient = mockRepo mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil) MessagingClient = mockMessagingClient Convey("Given a HTTP req for a VIP account", t, func() { req := httptest.NewRequest("GET", "/accounts/10000", nil) resp := httptest.NewRecorder() Convey("When the request is handled by the Router", func() { NewRouter().ServeHTTP(resp, req) Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() { So(resp.Code, ShouldEqual, 200) time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue) }) })}) }
可以查看注释了解详情。我不喜欢在断言调用数之前人为添加10毫秒睡眠,但由于模拟是在goroutine中调用,和主线程是独立的,我们需要允许它有一些时间来完成。 希望在涉及到有goroutine或者channel的时候,有更好的单元测试方式。
我承认,模拟这种方式比使用类似Mockito的东西更冗余, 当写Java应用的单元测试的时候。不过,我认为可读性和易读性还是不错的。
确保测试通过:
go test ./...
如果你还没有做的话,先运行springcloud.sh脚本更新配置服务器。 然后,运行copyall.sh并等几秒钟更新accountservice。我们将使用curl来获取我们特殊的账号:
> curl http://$ManagerIP:6767/accounts/10000 {"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}
如果所有进行顺利的话,我们可以打开RabbitMQ管理控制台,并看我们是否在名为vipQueue的队列上获得了一个消息。
在上面截图最底下,我们看到vipQueue有一个消息。如果我们使用RabbitMQ管理控制台的Get Message功能, 我们会看到下面的消息:
最后,是时候从头开始写一个全新的微服务了, 我们需要用它来展示如何从RabbitMQ消费消息。我们将确保应用在前面内容中学到的模式包括:
如果你已经切出P9分支的代码了,那么在你goblog目录下面就已经有了vipservice了。
我不会一行行过每个代码文件的内容,因为有些和accountservice里边的重复了。相反我将聚焦在刚才发送消息的消费方面。需要注意一些事情:
我们会使用common/messaging的SubscribeToQueue函数,例如:
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
这里我们应该提供的最重要的是:
vip_queue
)。实际上将我们的回调函数绑定到队列的SubscribeToQueue实现的实现并不奇怪,如果我们需要了解细节,可以查阅源代码。
继续快速看看vipservice的入口文件main.go, 看看我们如何设置的:
package main import ( "flag" "fmt" "github.com/callistaenterprise/goblog/common/config" "github.com/callistaenterprise/goblog/common/messaging" "github.com/callistaenterprise/goblog/vipservice/service" "github.com/spf13/viper" "github.com/streadway/amqp" "os" "os/signal" "syscall" ) var appName = "vipservice" var messagingClient messaging.IMessagingClient func init() { configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server") profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles") configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from") flag.Parse() viper.Set("profile", *profile) viper.Set("configServerUrl", *configServerUrl) viper.Set("configBranch", *configBranch) } func main() { fmt.Println("Starting " + appName + "...") config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeMessaging() // Makes sure connection is closed when service exits. handleSigterm(func() { if messagingClient != nil { messagingClient.Close() } }) service.StartWebServer(viper.GetString("server_port")) } func onMessage(delivery amqp.Delivery) { fmt.Printf("Got a message: %v\n", string(delivery.Body)) } func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'broker_url' set in configuration, cannot start") } messagingClient = &messaging.MessagingClient{} messagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) // Call the subscribe method with queue name and callback function err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage) failOnError(err, "Could not start subscribe to vip_queue") err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) failOnError(err, "Could not start subscribe to "+viper.GetString("config_event_bus")+" topic") } // Handles Ctrl+C or most other means of "controlled" shutdown gracefully. Invokes the supplied func before exiting. func handleSigterm(handleExit func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c handleExit() os.Exit(1) }() } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } }
看起来和accountservice非常相似,对不对? 我们可能会重复如何安装和启动我们添加的每个微服务的基本知识。
onMessage函数在这里仅仅打印我们接到的vip消息的body。如果我们需要实现更多虚构的用例,它会调用一些花哨的逻辑来确定账号持有人是否有资格获得"超级可怕的购买我们所有东西(TM)"的offer, 并且可能写一个offer给"VIP offer数据库"。你可以随意实现并提交一个PR。
没有什么可补充的。除了这个片段,当我们按下Ctrl + C或者当Swarm认为是时候杀死服务实例:
func handleSigterm(handleExit func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c handleExit() os.Exit(1) }() }
不是最容易读的代码片段,它所做的就是注册通道c作为os.Interrupt和syscall的监听器。SIGTERM和goroutine会阻塞在c上的消息监听,知道接收到这两种信号。 这样就使得我们非常肯定我们提供的handleExit()函数在微服务被杀掉的时候都会被调用。怎么确定? Ctrl + C或docker swarm扩展也工作良好。kill也一样。 kill -9不会。 因此请求不要使用kill -9停止,除非你必须要这样做。
它将调用我们在IMessageConsumer接口中声明的Close()函数, 它实现的时候确保AMQP连接被正确关闭。
我们对copyall.sh内容进行了修改:
#!/bin/bash export GOOS=linux export CGO_ENABLED=0 cd accountservice;go get;go build -o accountservice-linux-amd64;echo built `pwd`;cd .. cd healthchecker;go get;go build -o healthchecker-linux-amd64;echo built `pwd`;cd .. cd vipservice;go get;go build -o vipservice-linux-amd64;echo built `pwd`;cd .. export GOOS=darwin cp healthchecker/healthchecker-linux-amd64 accountservice/ cp healthchecker/healthchecker-linux-amd64 vipservice/ docker build -t someprefix/accountservice accountservice/ docker service rm accountservice docker service create --name=accountservice --replicas=1 --network=my_network -p=6767:6767 someprefix/accountservice docker build -t someprefix/vipservice vipservice/ docker service rm vipservice docker service create --name=vipservice --replicas=1 --network=my_network someprefix/vipservice
运行这个脚本,等待几秒钟,让服务重新构建部署完成。然后查看:
> docker service ls ID NAME REPLICAS IMAGE kpb1j3mus3tn accountservice 1/1 someprefix/accountservice n9xr7wm86do1 configserver 1/1 someprefix/configserver r6bhneq2u89c rabbitmq 1/1 someprefix/rabbitmq sy4t9cbf4upl vipservice 1/1 someprefix/vipservice u1qcvxm2iqlr viz 1/1 manomarks/visualizer:latest
或者可以使用dvizz Docker Swarm服务呈现来查看:
既然docker service logs特性已经在1.13.0中被标记为试验阶段,我们依然可以使用前面的方式来查看vipservice的日志。首先,运行docker ps找出容器id:
> docker ps CONTAINER ID IMAGE a39e6eca83b3 someprefix/vipservice:latest b66584ae73ba someprefix/accountservice:latest d0074e1553c7 someprefix/configserver:latest
然后使用vipservice的容器id来查看日志:
> docker logs -f a39e6eca83b3 Starting vipservice... 2017/06/06 19:27:22 Declaring Queue () 2017/06/06 19:27:22 declared Exchange, declaring Queue () 2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus') Starting HTTP service at 6868
然后另外打开一个窗口,执行下面的请求:
> curl http://$ManagerIP:6767/accounts/10000
然后你就会在刚才日志里边看到多了下面一行信息:
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
也就是说我们的vipservice成功的消费了从accountservice发布的消息。
跨越服务的多个实例的分布式work模式是利用了work队列的概念。每个vip消息应该只能被单个vipservice实例处理。
因此让我们看看当我们将vipservice规模扩大到2个的时候会发生什么:
> docker service scale vipservice=2
数秒之后新的实例就可以使用了。既然我们使用的是AMQP中的direct/queue方式,我们希望有轮询的行为。使用curl触发四个VIP账户查询。
> curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000
然后在看看日志:
> docker logs -f a39e6eca83b3 Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"} Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}
正如我们预料的,我们看到第一个实例处理了四条消息中的两条。如果我们对其他的vipservice进行docker logs查询,我们会看到其他的消息在它们里边消费了。非常满意。
这次不会做性能测试,在发送和接受一些消息后,快速查看内存使用就足够了:
CONTAINER CPU % MEM USAGE / LIMIT vipservice.1.tt47bgnmhef82ajyd9s5hvzs1 0.00% 1.859MiB / 1.955GiB accountservice.1.w3l6okdqbqnqz62tg618szsoj 0.00% 3.434MiB / 1.955GiB rabbitmq.1.i2ixydimyleow0yivaw39xbom 0.51% 129.9MiB / 1.955GiB
上买呢在服务了一些请求后得到的信息。新的vipservice和accountservice一样不是很复杂,因此和预料的一样启动的时候占用的内存非常小。
本文可能是这个系列目前最长的一篇文章了!我们完成了:
在第十部分,我们将做一些轻量的但在现实世界非常重要的模型 - 使用Logrus, Docker GELF日志驱动记录结构化日志以及将日志发不到Laas提供者商。