XiaoqiangNan 2018-12-24
文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。
技术栈
zookeeper-3.4.13
thrift-0.11.0
curator-2.13.0
特性
1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息
2、客户端软负载均衡:暂支持随机和轮询两种方式
3、服务提供者和服务消费者可选择是否要注册到注册中心
4、业务服务实现类自动加载和注册
5、一个端口监听多个业务服务
最新源码: https://github.com/chenjuwen/thrift-microservice
功能主要包含以下关键实现点:
扫描和加载业务服务实现类
发布业务服务到thrift处理器
注册业务服务到注册中心
创建thrift服务端
从注册中心查找业务服务
调用业务服务接口
1、扫描和加载业务服务实现类
业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。
Common.thrift namespace java com.seasy.microservice.api struct Message { 1: i32 type; 2: binary data; } struct Response { 1: i32 code; 2: string message; } Hello.thrift namespace java com.seasy.microservice.api include "Common.thrift" service Hello{ string helloString(1:string param) Common.Response sendMessage(1:Common.Message message) }
开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。
@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0") public class HelloServiceImpl implements Hello.Iface{ @Override public String helloString(String param) throws TException { return param; } @Override public Response sendMessage(Message message) throws TException { System.out.println(message.getType() + ", " + new String(message.getData())); Response response = new Response(0, "success"); return response; } }
根据自定义注解类扫描和加载业务服务实现类。
private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception { ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>(); Reflections reflections = new Reflections(packagePath); //查找有指定注解类的服务类 Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class); for(Class<?> serviceImplementClass : serviceImplementClassSet){ ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class); //服务相关信息封装在ServiceInformation类中 ServiceInformation serviceInformation = new ServiceInformation(); serviceInformation.setId(serviceInformation.getId()); serviceInformation.setServiceClass(serviceAnnotation.serviceClass()); serviceInformation.setVersion(serviceAnnotation.version()); serviceInformation.setTimeout(serviceAnnotation.timeout()); serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance()); String key = serviceAnnotation.serviceClass().getName(); serviceInformationMap.put(key, serviceInformation); logger.debug("Class [" + key + "] loaded!"); } return serviceInformationMap; }
2、发布业务服务到thrift处理器
使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。
private void buildMultiplexedProcessor(){ multiplexedProcessor = new TMultiplexedProcessor(); for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); logger.debug("serviceClassFullname=" + serviceClassFullname); //服务名 String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); logger.debug("serviceName=" + serviceName); Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance(); TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance); if(serviceProcessor != null){ //以接口主类的SimpleName作为服务名 multiplexedProcessor.registerProcessor(serviceName, serviceProcessor); serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true); logger.info("Processor [" + serviceName + "] published!"); } } } private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){ try{ String processorClassName = serviceClassFullname + "$Processor"; String ifaceClassName = serviceClassFullname + "$Iface"; Class<?> processorClass = Class.forName(processorClassName); Class<?> ifaceClass = Class.forName(ifaceClassName); //一个服务实现类对应一个Processor Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass}); TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance}); return processor; }catch(Exception ex){ logger.error("create service[" + serviceClassFullname + "] processor error", ex); } return null; }
3、注册业务服务到注册中心
功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar
构造CuratorFramework实例
curator = CuratorFrameworkFactory.builder() .connectString(connectString) .namespace("thrift-microservice") .build(); curator.start();
构造ServiceDiscovery实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class) .client(curator) .serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class)) .basePath(basePath) .build(); serviceDiscovery.start();
将业务服务注册到Zookeeper注册中心
private void registerBusinessService(){ for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname); if(serviceInformation.isProcessorRegistered()){ try{ //构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据 ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder() .name(serviceName) .id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId()) .address(EnvUtil.getLocalIp()) .port(getPort()) .payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName())) .registrationTimeUTC(System.currentTimeMillis()) .serviceType(ServiceType.DYNAMIC) .build(); serviceRegistry.registerService(serviceInstance); serviceInformation.setServiceRegistered(true); logger.info("Service [" + serviceName + "] registered!"); }catch(Exception ex){ logger.error("register service[" + serviceName + "] error", ex); } } } }
4、创建thrift服务端
private void startServer()throws Exception{ serverSocket = new TNonblockingServerSocket(getPort()); TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket); tArgs.processor(multiplexedProcessor); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(new TCompactProtocol.Factory()); tserver = new TNonblockingServer(tArgs); tserver.setServerEventHandler(new DefaultServerEventHandler()); tserver.serve(); }
5、从注册中心查找业务服务
根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。
public <T> T getServiceClient(Class<T> serviceClientClass){ try{ String serviceClientClassName = serviceClientClass.getName(); if(!serviceClientClassName.endsWith("$Client")){ throw new IllegalArgumentException("serviceClientClass must be $Client class"); } String serviceName = serviceClientClassName.replace("$Client", ""); serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1); Object object = getServiceClient(serviceName); if(object != null){ return (T)object; } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; } public Object getServiceClient(String serviceName) { try{ ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName); if(serviceInstance != null){ //服务所在机器的IP地址 String host = serviceInstance.getAddress(); //服务所在机器的监听端口 int port = serviceInstance.getPort(); ServiceClientFactory factory = null; ServiceClientWrapper wrapper = null; String key = host + ":" + port; if(serviceClientFactoryMap.containsKey(key)){ factory = serviceClientFactoryMap.get(key); wrapper = factory.getServiceClientWrapper(serviceName); }else{ logger.info("create ServiceClientFactory..."); factory = ServiceClientFactory.getInstance(); factory.setHost(host); factory.setPort(port); factory.open(); serviceClientFactoryMap.put(key, factory); } if(wrapper == null){ Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client"); wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName); factory.addServiceClientWrapper(wrapper); return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject(); }else{ return wrapper.getServiceClientInstanceObject(); } }else{ //服务不存在 logger.error("service not found: " + serviceName); } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; }
一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。
public void open(){ try{ if(StringUtil.isNotEmpty(this.host) && this.port != 0){ if(transport == null){ transport = new TFramedTransport(new TSocket(this.host, this.port)); transport.open(); logger.debug("Transport opened --> " + this.host + ":" + this.port); }else if(!transport.isOpen()){ transport.open(); } }else{ throw new IllegalArgumentException("parameter host or port is invalid!"); } }catch(Exception ex){ close(); throw new RuntimeException("Failed to open service Socket", ex); } } public ServiceClientWrapper getServiceClientWrapper(String serviceName){ return serviceClientWrapperMap.get(serviceName); } public void addServiceClientWrapper(ServiceClientWrapper wrapper){ if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){ wrapper = instanceServiceClient(wrapper); serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper); } } private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){ try{ logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName()); TProtocol protocol = new TCompactProtocol(transport); TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName()); Class[] classes = new Class[]{TProtocol.class}; Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol); wrapper.setServiceClientInstanceObject(serviceClientInstanceObject); return wrapper; }catch(Exception ex){ logger.error("instance ServiceClient error", ex); } return wrapper; }
public class ServiceClientWrapper { private ServiceInstance<ThriftServicePayload> serviceInstance; private Class<?> serviceClientClass; private String serviceName; private Object serviceClientInstanceObject; //服务客户端类的实例对象 public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){ this.serviceInstance = serviceInstance; this.serviceClientClass = serviceClientClass; this.serviceName = serviceName; } public ServiceInstance<ThriftServicePayload> getServiceInstance() { return serviceInstance; } public Class<?> getServiceClientClass() { return serviceClientClass; } public String getServiceName() { return serviceName; } public Object getServiceClientInstanceObject() { return serviceClientInstanceObject; } public void setServiceClientInstanceObject(Object serviceClientInstanceObject) { this.serviceClientInstanceObject = serviceClientInstanceObject; } }
6、调用业务服务接口
客户端获取服务Client实例对象的方式:
Hello.Client client = getServiceClient(Hello.Client.class) 或者 Object object = clientBootstrap.getServiceClient("Hello"); if(object != null){ Hello.Client client = (Hello.Client)object; }
访问接口方法:
client.helloString("hello string") ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8")); Response response = client.sendMessage(new Message(1, data)); System.out.println(response.getCode() + ", " + response.getMessage());
7、注册中心管理(截图)