XiaoqiangNan 2018-12-24
文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。
技术栈
zookeeper-3.4.13
thrift-0.11.0
curator-2.13.0
特性
1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息
2、客户端软负载均衡:暂支持随机和轮询两种方式
3、服务提供者和服务消费者可选择是否要注册到注册中心
4、业务服务实现类自动加载和注册
5、一个端口监听多个业务服务
最新源码: https://github.com/chenjuwen/thrift-microservice
功能主要包含以下关键实现点:
扫描和加载业务服务实现类
发布业务服务到thrift处理器
注册业务服务到注册中心
创建thrift服务端
从注册中心查找业务服务
调用业务服务接口
1、扫描和加载业务服务实现类
业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。
Common.thrift
namespace java com.seasy.microservice.api
struct Message {
1: i32 type;
2: binary data;
}
struct Response {
1: i32 code;
2: string message;
}
Hello.thrift
namespace java com.seasy.microservice.api
include "Common.thrift"
service Hello{
string helloString(1:string param)
Common.Response sendMessage(1:Common.Message message)
}开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。
@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0")
public class HelloServiceImpl implements Hello.Iface{
@Override
public String helloString(String param) throws TException {
return param;
}
@Override
public Response sendMessage(Message message) throws TException {
System.out.println(message.getType() + ", " + new String(message.getData()));
Response response = new Response(0, "success");
return response;
}
}根据自定义注解类扫描和加载业务服务实现类。
private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception {
ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>();
Reflections reflections = new Reflections(packagePath);
//查找有指定注解类的服务类
Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class);
for(Class<?> serviceImplementClass : serviceImplementClassSet){
ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class);
//服务相关信息封装在ServiceInformation类中
ServiceInformation serviceInformation = new ServiceInformation();
serviceInformation.setId(serviceInformation.getId());
serviceInformation.setServiceClass(serviceAnnotation.serviceClass());
serviceInformation.setVersion(serviceAnnotation.version());
serviceInformation.setTimeout(serviceAnnotation.timeout());
serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance());
String key = serviceAnnotation.serviceClass().getName();
serviceInformationMap.put(key, serviceInformation);
logger.debug("Class [" + key + "] loaded!");
}
return serviceInformationMap;
} 2、发布业务服务到thrift处理器
使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。
private void buildMultiplexedProcessor(){
multiplexedProcessor = new TMultiplexedProcessor();
for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
String serviceClassFullname = it.next();
logger.debug("serviceClassFullname=" + serviceClassFullname);
//服务名
String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
logger.debug("serviceName=" + serviceName);
Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance();
TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance);
if(serviceProcessor != null){
//以接口主类的SimpleName作为服务名
multiplexedProcessor.registerProcessor(serviceName, serviceProcessor);
serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true);
logger.info("Processor [" + serviceName + "] published!");
}
}
}
private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){
try{
String processorClassName = serviceClassFullname + "$Processor";
String ifaceClassName = serviceClassFullname + "$Iface";
Class<?> processorClass = Class.forName(processorClassName);
Class<?> ifaceClass = Class.forName(ifaceClassName);
//一个服务实现类对应一个Processor
Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass});
TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance});
return processor;
}catch(Exception ex){
logger.error("create service[" + serviceClassFullname + "] processor error", ex);
}
return null;
} 3、注册业务服务到注册中心
功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar
构造CuratorFramework实例
curator = CuratorFrameworkFactory.builder()
.connectString(connectString)
.namespace("thrift-microservice")
.build();
curator.start(); 构造ServiceDiscovery实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class) .client(curator) .serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class)) .basePath(basePath) .build(); serviceDiscovery.start();
将业务服务注册到Zookeeper注册中心
private void registerBusinessService(){
for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
String serviceClassFullname = it.next();
String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname);
if(serviceInformation.isProcessorRegistered()){
try{
//构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据
ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder()
.name(serviceName)
.id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId())
.address(EnvUtil.getLocalIp())
.port(getPort())
.payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName()))
.registrationTimeUTC(System.currentTimeMillis())
.serviceType(ServiceType.DYNAMIC)
.build();
serviceRegistry.registerService(serviceInstance);
serviceInformation.setServiceRegistered(true);
logger.info("Service [" + serviceName + "] registered!");
}catch(Exception ex){
logger.error("register service[" + serviceName + "] error", ex);
}
}
}
}
4、创建thrift服务端
private void startServer()throws Exception{
serverSocket = new TNonblockingServerSocket(getPort());
TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
tArgs.processor(multiplexedProcessor);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TCompactProtocol.Factory());
tserver = new TNonblockingServer(tArgs);
tserver.setServerEventHandler(new DefaultServerEventHandler());
tserver.serve();
}
5、从注册中心查找业务服务
根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。
public <T> T getServiceClient(Class<T> serviceClientClass){
try{
String serviceClientClassName = serviceClientClass.getName();
if(!serviceClientClassName.endsWith("$Client")){
throw new IllegalArgumentException("serviceClientClass must be $Client class");
}
String serviceName = serviceClientClassName.replace("$Client", "");
serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1);
Object object = getServiceClient(serviceName);
if(object != null){
return (T)object;
}
}catch(Exception ex){
logger.error("Failed to get ServiceClient", ex);
}
return null;
}
public Object getServiceClient(String serviceName) {
try{
ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName);
if(serviceInstance != null){
//服务所在机器的IP地址
String host = serviceInstance.getAddress();
//服务所在机器的监听端口
int port = serviceInstance.getPort();
ServiceClientFactory factory = null;
ServiceClientWrapper wrapper = null;
String key = host + ":" + port;
if(serviceClientFactoryMap.containsKey(key)){
factory = serviceClientFactoryMap.get(key);
wrapper = factory.getServiceClientWrapper(serviceName);
}else{
logger.info("create ServiceClientFactory...");
factory = ServiceClientFactory.getInstance();
factory.setHost(host);
factory.setPort(port);
factory.open();
serviceClientFactoryMap.put(key, factory);
}
if(wrapper == null){
Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client");
wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName);
factory.addServiceClientWrapper(wrapper);
return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject();
}else{
return wrapper.getServiceClientInstanceObject();
}
}else{
//服务不存在
logger.error("service not found: " + serviceName);
}
}catch(Exception ex){
logger.error("Failed to get ServiceClient", ex);
}
return null;
}一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。
public void open(){
try{
if(StringUtil.isNotEmpty(this.host) && this.port != 0){
if(transport == null){
transport = new TFramedTransport(new TSocket(this.host, this.port));
transport.open();
logger.debug("Transport opened --> " + this.host + ":" + this.port);
}else if(!transport.isOpen()){
transport.open();
}
}else{
throw new IllegalArgumentException("parameter host or port is invalid!");
}
}catch(Exception ex){
close();
throw new RuntimeException("Failed to open service Socket", ex);
}
}
public ServiceClientWrapper getServiceClientWrapper(String serviceName){
return serviceClientWrapperMap.get(serviceName);
}
public void addServiceClientWrapper(ServiceClientWrapper wrapper){
if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){
wrapper = instanceServiceClient(wrapper);
serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper);
}
}
private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){
try{
logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName());
TProtocol protocol = new TCompactProtocol(transport);
TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName());
Class[] classes = new Class[]{TProtocol.class};
Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol);
wrapper.setServiceClientInstanceObject(serviceClientInstanceObject);
return wrapper;
}catch(Exception ex){
logger.error("instance ServiceClient error", ex);
}
return wrapper;
}public class ServiceClientWrapper {
private ServiceInstance<ThriftServicePayload> serviceInstance;
private Class<?> serviceClientClass;
private String serviceName;
private Object serviceClientInstanceObject; //服务客户端类的实例对象
public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){
this.serviceInstance = serviceInstance;
this.serviceClientClass = serviceClientClass;
this.serviceName = serviceName;
}
public ServiceInstance<ThriftServicePayload> getServiceInstance() {
return serviceInstance;
}
public Class<?> getServiceClientClass() {
return serviceClientClass;
}
public String getServiceName() {
return serviceName;
}
public Object getServiceClientInstanceObject() {
return serviceClientInstanceObject;
}
public void setServiceClientInstanceObject(Object serviceClientInstanceObject) {
this.serviceClientInstanceObject = serviceClientInstanceObject;
}
}
6、调用业务服务接口
客户端获取服务Client实例对象的方式:
Hello.Client client = getServiceClient(Hello.Client.class)
或者
Object object = clientBootstrap.getServiceClient("Hello");
if(object != null){
Hello.Client client = (Hello.Client)object;
}访问接口方法:
client.helloString("hello string")
ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8"));
Response response = client.sendMessage(new Message(1, data));
System.out.println(response.getCode() + ", " + response.getMessage());7、注册中心管理(截图)


