wangol 2016-07-30
Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将Netflix的中间层服务连接在一起。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出Load Balancer后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用Ribbon实现自定义的负载均衡算法。
以轮询的方式依次将请求调度不同的服务器,即每次调度执行i = (i + 1) mod n,并选出第i台服务器。
Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; /////////////////////////////////////////////////////////////////////// /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } }
The basic idea for weighted round robin has been obtained from JCS The implementation for choosing the endpoint from the list of endpoints is as follows:Let's assume 4 endpoints:A(wt=10), B(wt=30), C(wt=40), D(wt=20). Using the Random API, generate a random number between 1 and10+30+40+20. Let's assume that the above list is randomized. Based on the weights, we have intervals as follows: 1-----10 (A's weight) 11----40 (A's weight + B's weight) 41----80 (A's weight + B's weight + C's weight) 81----100(A's weight + B's weight + C's weight + D's weight) Here's the psuedo code for deciding where to send the request: if (random_number between 1 & 10) {send request to A;} else if (random_number between 11 & 40) {send request to B;} else if (random_number between 41 & 80) {send request to C;} else if (random_number between 81 & 100) {send request to D;}
随机选择状态为UP的Server
int index = rand.nextInt(serverCount); server = upList.get(index);
区域感知负载均衡内置电路跳闸逻辑,可被配置基于区域同源关系(Zone Affinity,也就是更倾向于选择发出调用的服务所在的托管区域内,这样可以降低延迟,节省成本)选择目标服务实例。它监控每个区域中运行实例的行为,而且能够实时的快速丢弃一整个区域。这样在面对整个区域故障时,帮我们提升了弹性。
The key metric used to measure the zone condition is Average Active Requests, which is aggregated per rest client per zone. It is the total outstanding requests in a zone divided by number of available targeted instances (excluding circuit breaker tripped instances). This metric is very effective when timeout occurs slowly on a bad zone. The LoadBalancer will calculate and examine zone stats of all available zones. If the Average Active Requests for any zone has reached a configured threshold, this zone will be dropped from the active server list. In case more than one zone has reached the threshold, the zone with the most active requests per server will be dropped. Once the the worst zone is dropped, a zone will be chosen among the rest with the probability proportional to its number of instances. A server will be returned from the chosen zone with a given Rule (A Rule is a load balancing strategy, for example {@link AvailabilityFilteringRule}) For each request, the steps above will be repeated. That is to say, each zone related load balancing decisions are made at real time with the up-to-date statistics aiding the choice.
具体实现:
@Override protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) { super.setServerListForZones(zoneServersMap); if (balancers == null) { balancers = new ConcurrentHashMap<String, BaseLoadBalancer>(); } for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) { String zone = entry.getKey().toLowerCase(); getLoadBalancer(zone).setServersList(entry.getValue()); } // check if there is any zone that no longer has a server // and set the list to empty so that the zone related metrics does not // contain stale data for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) { if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) { existingLBEntry.getValue().setServersList(Collections.emptyList()); } } } @Override public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Throwable e) { logger.error("Unexpected exception when choosing server using zone aware logic", e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } } @VisibleForTesting BaseLoadBalancer getLoadBalancer(String zone) { zone = zone.toLowerCase(); BaseLoadBalancer loadBalancer = balancers.get(zone); if (loadBalancer == null) { // We need to create rule object for load balancer for each zone IRule rule = cloneRule(this.getRule()); loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats()); BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer); if (prev != null) { loadBalancer = prev; } } return loadBalancer; } private IRule cloneRule(IRule toClone) { IRule rule; if (toClone == null) { rule = new AvailabilityFilteringRule(); } else { String ruleClass = toClone.getClass().getName(); try { rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig()); } catch (Exception e) { throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e); } } return rule; } @Override public void setRule(IRule rule) { super.setRule(rule); if (balancers != null) { for (String zone: balancers.keySet()) { balancers.get(zone).setRule(cloneRule(rule)); } } }
策略名 | 策略声明 | 策略描述 | 实现说明 |
BestAvailableRule | public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule | 选择一个最小的并发请求的server | 逐个考察Server,如果Server被tripped了,则忽略,在选择其中ActiveRequestsCount最小的server |
AvailabilityFilteringRule | public class AvailabilityFilteringRule extends PredicateBasedRule | 过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(active connections 超过配置的阈值) | 使用一个AvailabilityPredicate来包含过滤server的逻辑,其实就就是检查status里记录的各个server的运行状态 |
WeightedResponseTimeRule | public class WeightedResponseTimeRule extends RoundRobinRule | 根据相应时间分配一个weight,相应时间越长,weight越小,被选中的可能性越低。 | 一个后台线程定期的从status里面读取评价响应时间,为每个server计算一个weight。Weight的计算也比较简单responsetime 减去每个server自己平均的responsetime是server的权重。当刚开始运行,没有形成statas时,使用roubine策略选择server。 |
RetryRule | public class RetryRule extends AbstractLoadBalancerRule | 对选定的负载均衡策略机上重试机制。 | 在一个配置时间段内当选择server不成功,则一直尝试使用subRule的方式选择一个可用的server |
RoundRobinRule | public class RoundRobinRule extends AbstractLoadBalancerRule | roundRobin方式轮询选择server | 轮询index,选择index对应位置的server |
RandomRule | public class RandomRule extends AbstractLoadBalancerRule | 随机选择一个server | 在index上随机,选择index对应位置的server |
ZoneAvoidanceRule | public class ZoneAvoidanceRule extends PredicateBasedRule | 复合判断server所在区域的性能和server的可用性选择server | 使用ZoneAvoidancePredicate和AvailabilityPredicate来判断是否选择某个server,前一个判断判定一个zone的运行性能是否可用,剔除不可用的zone(的所有server),AvailabilityPredicate用于过滤掉连接数过多的Server。 |
<dependency> <groupId>com.netflix.ribbon</groupId> <artifactId>ribbon-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.netflix.ribbon</groupId> <artifactId>ribbon-httpclient</artifactId> <version>2.2.0</version> </dependency>
# Max number of retries sample-client.ribbon.MaxAutoRetries=1 # Max number of next servers to retry (excluding the first server) sample-client.ribbon.MaxAutoRetriesNextServer=1 # Whether all operations can be retried for this client sample-client.ribbon.OkToRetryOnAllOperations=true # Interval to refresh the server list from the source sample-client.ribbon.ServerListRefreshInterval=2000 # Connect timeout used by Apache HttpClient sample-client.ribbon.ConnectTimeout=3000 # Read timeout used by Apache HttpClient sample-client.ribbon.ReadTimeout=3000 # Initial list of servers, can be changed via Archaius dynamic property at runtime sample-client.ribbon.listOfServers=www.sohu.com:80,www.163.com:80,www.sina.com.cn:80 sample-client.ribbon.EnablePrimeConnections=true
public static void main( String[] args ) throws Exception { ConfigurationManager.loadPropertiesFromResources("sample-client.properties"); System.out.println(ConfigurationManager.getConfigInstance().getProperty("sample-client.ribbon.listOfServers")); RestClient client = (RestClient)ClientFactory.getNamedClient("sample-client"); HttpRequest request = HttpRequest.newBuilder().uri(new URI("/")).build(); for(int i = 0; i < 20; i ++) { HttpResponse response = client.executeWithLoadBalancer(request); System.out.println("Status for URI:" + response.getRequestedURI() + " is :" + response.getStatus()); } ZoneAwareLoadBalancer lb = (ZoneAwareLoadBalancer) client.getLoadBalancer(); System.out.println(lb.getLoadBalancerStats()); ConfigurationManager.getConfigInstance().setProperty("sample-client.ribbon.listOfServers", "www.baidu.com:80,www.linkedin.com:80"); System.out.println("changing servers ..."); Thread.sleep(3000); for(int i = 0; i < 20; i ++) { HttpResponse response = client.executeWithLoadBalancer(request); System.out.println("Status for URI:" + response.getRequestedURI() + " is :" + response.getStatus()); } System.out.println(lb.getLoadBalancerStats()); }