Bloddy 2019-10-28
我们知道,在Eureka中,可以使用如下方法使Eureka主动下线,那么本篇文章就来分析一下子这个下线的流程
public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } }
主要做了这么几件事:
private void cancelScheduledTasks() { if (instanceInfoReplicator != null) { instanceInfoReplicator.stop(); } if (heartbeatExecutor != null) { heartbeatExecutor.shutdownNow(); } if (cacheRefreshExecutor != null) { cacheRefreshExecutor.shutdownNow(); } if (scheduler != null) { scheduler.shutdownNow(); } }
void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } } @Override public EurekaHttpResponse<Void> cancel(String appName, String id) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder.delete(ClientResponse.class); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
下线消息的处理在InstanceResource
类中
@DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); return Response.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } } public boolean cancel(final String appName, final String id, final boolean isReplication) { if (super.cancel(appName, id, isReplication)) { //往集群同步下线信息 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } return true; } return false; }
先看具体的下线逻辑,与租约过期清除的处理逻辑是一致的
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { read.lock(); CANCEL.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { //删除租约信息 leaseToCancel = gMap.remove(id); } synchronized (recentCanceledQueue) { recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); } //删除客户端状态信息 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } }
其中invalidateCache
则是删除当前服务中与该实例相关的缓存
集群的同步下线信息则跟集群信息注册的逻辑差不多