依然怡然 2016-07-25
如果用户量增加后为了解决吞吐量问题,需要引入集群,在openfire中提供了集群的支持,另外也实现了两个集群插件:hazelcast和clustering。为了了解情况集群的工作原理,我就沿着openfire的源代码进行了分析,也是一次学习的过程。
public interface Cache<K,V> extends java.util.Map<K,V>
如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V> ,实际上DefaultCache就是用一个Hashmap来存数据的。
public class CacheFactory
/** * Returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @SuppressWarnings("unchecked") public static synchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }
上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后warpCache方法会将此容器放入到caches中。
public static synchronized void startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }
public static void doClusterTask(final ClusterTask<?> task) { cacheFactoryStrategy.doClusterTask(task); }
这里有个限定就是必须是ClusterTask派生的类才行,看看它的定义:
public interface ClusterTask<V> extends Runnable, Externalizable { V getResult(); }
主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。
public class ClusteredCacheFactory implements CacheFactoryStrategy {
首先是startCluster方法用于启动集群,主要完成几件事情:
/** * Notification message indicating that this JVM has joined a cluster. */ @SuppressWarnings("unchecked") public static synchronized void joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content) for (Cache cache : getAllCaches()) { // skip local-only caches if (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }
这里可以看到会读取所有的缓存容器并一个个的使用Wrapper包装一下,然后用同样的缓存名称去createCache一个新的Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用ClusteredCacheFactory去创建新的缓存容器。最后再将cache写入到新的clusteredCache 里,这样就完成了缓存的切换。
public Cache createCache(String name) { // Check if cluster is being started up while (state == State.starting) { // Wait until cluster is fully started (or failed) try { Thread.sleep(250); } catch (InterruptedException e) { // Ignore } } if (state == State.stopped) { throw new IllegalStateException("Cannot create clustered cache when not in a cluster"); } return new ClusteredCache(name, hazelcast.getMap(name)); }
这里使用的是ClusteredCache,而且最重要的是传入的第二个map参数换成了hazelcast的了,这样之后再访问这个缓存容器时已经不再是原先的本地Cache了,已经是hazelcast的map对象。hazelcast会自动对map的数据进行同步管理,这也就完成了缓存同步的功能。
那就看hazelcast的实现吧,在ClusteredCacheFactory中doClusterTask举个例子吧:
public void doClusterTask(final ClusterTask task) { if (cluster == null) { return; } Set<Member> members = new HashSet<Member>(); Member current = cluster.getLocalMember(); for(Member member : cluster.getMembers()) { if (!member.getUuid().equals(current.getUuid())) { members.add(member); } } if (members.size() > 0) { // Asynchronously execute the task on the other cluster members logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers( new CallableTask<Object>(task), members); } else { logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); } }
过程就是,先获取到集群中的实例成员,当然要排除自己。然后hazelcast提供了ExecutorService来执行这个task,方法就是submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。
总结
Openfire 的详细介绍:请点这里
Openfire 的下载地址:请点这里