大步流星 2020-06-16
public class RegistryProtocol implements Protocol { public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker // 启动NettyServer服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); // 《服务提供方》向注册中心注册服务 if (register) { register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. // Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 《服务提供方》订阅服务 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; } }
private transient volatile Invoker<?> invoker; private T createProxy(Map<String, String> map) { // 省略若干代码... if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); // 核心方法,拿到invoker对象 invoker = refprotocol.refer(interfaceClass, url); } else { // 省略若干代码... if (urls.size() == 1) { // 核心方法,拿到invoker对象 // refprotocol为Protocol$Adpative对象 invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { // 省略若干代码... } } // 核心方法create service proxy // 将下面这段代码替换成 ProxyFactory$Adaptive 中 getProxy中的部分 return (T) proxyFactory.getProxy(invoker); }
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 《服务消费方》注册服务 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 《服务消费方》订阅服务 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory); return invoker; }