并发编程(4)ConcurrentHashMap源码分析

心理学哲学批判性思维 2017-12-14

概述

ConcurrentHashMap,通过这个名字,可以知道Concurrent是并发的,HashMap是我们常用的一种用来存放键值对的数据结构,所以ConcurrentHashMap就是一种用来解决高并发的HashMap。这个是JDK在1.5之后提供的一种数据结构。我们知道Java中其实已经有了HashTable这个线程安全的Map,但是他是通过对整个table使用synchronized来保证线程安全的,当有多个线程对HashTable进行读写的时候,每次只能有一个线程会获取到HashTable的对象锁,别的只能处于等待状态,所以在高并发的情况下,它的效率是比较低的。当然JDK还提供了另外一种方式,那就是Collections.synchronizedMap(hashMap),这个底层也是通过synchronized对要高并发的数据结构来实现线程安全的。通过上面的分析可以知道,ConcurrentHashMap为了解决synchronized在高并发时的低效问题,ConcurrentHashMap之所以比HashTable高效,是因为他并不是对整个HashMap上锁,而是采用了分段锁,那么现在就从源码的角度来看看,分段锁是如何实现的。

继承关系

并发编程(4)ConcurrentHashMap源码分析

通过继承关系,可以发现,ConcurrentHashMap并没有继承HashMap,而是单独实现了ConcurrentMap这个接口,其余的跟HashMap是一样的

成员变量

//table的最大容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    //table的默认容量16
    private static final int DEFAULT_CAPACITY = 16;
	//默认的并发度为16
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //负载因子
    private static final float LOAD_FACTOR = 0.75f;
	//链表转化为红黑树的阈值
    static final int TREEIFY_THRESHOLD = 8;
	//红黑树转链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;
	//红黑树的最小容量
    static final int MIN_TREEIFY_CAPACITY = 64;

	//存放Node的数组
    transient volatile Node<K,V>[] table;
    private transient volatile Node<K,V>[] nextTable;
    private transient volatile long baseCount;
    //sizeCtl作为table是否在初始化或者在resize的标志,如果为负数说明正在初始化或者扩容,需要table进行CRUD操作的线程只能等待,稍后进行重试,否则就可以直接进行CRUD操作
    //正数则表明table没有初始化
    private transient volatile int sizeCtl;
    private transient volatile int transferIndex;
    private transient volatile int cellsBusy;
    private transient volatile CounterCell[] counterCells;
    //该节点是否已经移动到新数组中去
    static final int MOVED     = -1; 
    //是否是TREEBIN节点
    static final int TREEBIN   = -2;

除此之外,还有几个类需要解释一下

基本原理

TreeNode

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  //根节点
        TreeNode<K,V> left;//左节点
        TreeNode<K,V> right;//右节点
        TreeNode<K,V> prev; //上个节点的指针
        boolean red;//默认节点为黑
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }

      //查找相应的树节点
   final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
      //此处省略一万行代码......
    }

跟一般的树节点都差不多,在前面分析二叉树的时候也差不多是这种数据结构,所以还是比较好理解的,注释说Nodes for use in TreeBins,也就是说这个TreeNode是用来服务TreeBin的,那么继续看Treebins

TreeBins

这个通过注释可以知道,是用来包装TreeNode的,然后作为红黑树的树节点

static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
  volatile TreeNode<K,V> first;//TreeNode的根节点
  volatile Thread waiter;//waiter线程
  volatile int lockState;//当前节点的锁状态
    // values for lockState
        static final int WRITER = 1; // 获得写数据的锁状态
        static final int WAITER = 2; // 等待写数据的锁状态
        static final int READER = 4; // 添加数据时的锁状态

        TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;//头结点赋值
            TreeNode<K,V> r = null;
            //递归遍历传过来的Node,从根节点开始
          for (TreeNode<K,V> x = b, next; x != null; x = next)
           {
			  next = (TreeNode<K,V>)x.next;//拿到下一个节点
              x.left = x.right = null;//将左右节点置空,因为需要重新赋值
              //判断r节点是否为空
              if (r == null) {
                    x.parent = null;//根节点置空
                    x.red = false;//黑子树
                    r = x;//x赋值为r节点
                }
                else {
                    K k = x.key;//拿到key
                    int h = x.hash;//计算hash值
                    Class<?> kc = null;//K对应的泛型实体类
                    //从根节点r开始遍历
                    for (TreeNode<K,V> p = r;;) {
                        int dir, ph;
                        K pk = p.key;
                     if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                           (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            this.root = r;//将r赋值给根节点
            assert checkInvariants(root);
        }
    }

ForwardingNode

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        Node<K,V> find(int h, Object k) {
      // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
               ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }

一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。

CAS操作

tabAt

//获得在i位置上的Node节点
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

casTabAt

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
  • 利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
  • 在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
  • 因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN

setTabAt

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

利用volatile方法设置节点位置的值

核心方法

构造方法

并发编程(4)ConcurrentHashMap源码分析

之前分析过HashMap的源码,所以看这些还是比较轻松的,ConcurrentHashMap采用的是懒加载,也就是存入第一对KeyValue的时候才会去初始化table,所以前面四个构造方法只是做了一些参数配置,只有最后一个构造方法,才会去真正的初始化,下面重点看一下最后一个方法

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
  
 public void putAll(Map<? extends K, ? extends V> m) {
       tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

最后调用了putAll方法,这个会在后面的put方法里面再重点讲解。

transfer方法

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //创建一个容量是原来2倍的table数组
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //创建一个ForwardingNode,指向新table,并且用于填充已经移动的Node位置
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
              //如果所有的节点都已经完成复制工作  
              //就把nextTable赋值给table 清空临时对象nextTable

                    nextTable = null;
                    table = nextTab;
                    // 扩容至现在容量的0.75倍
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
             //用ForwardingNode来填充Null节点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
           //如果当前节点移动过,将次标志设置为true
           //这样后续线程访问便可以直接跳过此节点
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                        //普通Node节点
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                       //如果扩容后已经不再需要tree的结构 反向转换为链表结构
                     ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                             //在nextTable的i位置上插入一个链表   
                            setTabAt(nextTab, i, ln);
                            //在nextTable的i+n的位置上插入另一个链表
                            setTabAt(nextTab, i + n, hn);
                //在table的i位置上插入forwardNode节点  表示已经处理过该节点
                            setTabAt(tab, i, fwd);
               //设置advance为true 返回到上面的while循环中 就可以执行i--操作
                            advance = true;
                        }
                    }
                }
            }
        }
    }

helpTransfer

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用上面讲到的transfer方法来进行扩容

put方法

首先调用了put方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

紧接着调用了putVal方法,下面来分析下一年putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
  //key和value都不能为空
   if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//计算hash值
        int binCount = 0;//链表长度
        //开启死循环
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
            //table为null或者table的长度为0,初始化table
                tab = initTable();
            //根据hash值计算table的索引
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果索引处为null,则直接在此处进行插入
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
            //通过CAS进行插入,插入成功之后跳出循环
                    break;                   
            }
            else if ((fh = f.hash) == MOVED)
            //当遇到表ForwardNode时,需要对表进行整合
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //采用同步代码块进行上锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //普通节点
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历所有节点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                           //如果hash值跟key值都相同,则进行值替换
            if (e.hash == hash && ((ek = e.key) == key 
            ||(ek != null && key.equals(ek)))) {
                               oldVal = e.val;
                                if (!onlyIfAbsent)
                                   e.val = value;
                                    break;
                                }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) 
                            {
                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }
                        //树节点
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                //判断链表长度,如果链表长度大于阈值,进行树节点转换
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//改变table中的元素的个数
        return null;
    }

首先,计算记录的key的hashCode,然后计算table的index位置,然后获取该index的值,如果该位置还为null,说明该位置上还没有记录,则通过调用casTabAt方法来讲该新的记录插入到table的index位置上去,否则,通过synchronized关键字对table的index位置加锁,需要注意的是,当前线程只会锁住table的index位置,其他位置上没有锁住,所以此时其他线程可以安全的获得其他的table位置来进行操作。这也就提高了ConcurrentHashMap的并发度。然后判断table的index位置上的第一个节点的hashCode值,这个节点要么是链表的头节点,要么是红黑树的根节点,如果hashCode值小于0,那么就是一颗红黑树,至于为什么是这样,上文中已经提到,如果不小于0,那么就还是一条链表,如果是一条链表,那么就寻找是否已经有记录的key和当前想要插入的记录是一致的,如果一致,那么这次put的效果就是replace,否则,将该记录添加到链表中去。如果是一颗红黑树,那么就通过调用putTreeVal方法来进行插入操作。在插入操作完成之后,需要判断本次操作是否是更新操作,如果是更新操作,则不会造成size的变化,否则,如果本次put操作时一次添加操作,那么就需要进行更新size的操作,而size的更新涉及到并发环境,所以较为复杂,并且table的扩容操作也会在更新size的时候发生,如果在更新size之后发现table中的记录数量达到了阈值,就需要进行扩容操作,这也是较为复杂的一步。

get方法

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());//计算hash值
        //根据hash值定位元素
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
             //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点  
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
            //树节点
                return (p = e.find(h, key)) != null ? p.val : null;
            //通过寻址法进行寻找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

首先,计算出记录的key的hashCode,然后通过使用(hashCode & (length - 1))的计算方法来获得该记录在table中的index,然后判断该位置上是否为null,如果为null,则返回null,否则,如果该位置上的第一个元素(链表头节点或者红黑树的根节点)与我们先要查找的记录匹配,则直接返回这个节点的值,否则,如果该节点的hashCode小于0,则说明该位置上是一颗红黑树,至于为什么hashCode值小于0就代表是一颗红黑树而不是链表了,这就要看下面的代码了:

static final int TREEBIN   = -2; // hash for roots of trees
    
   
        TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            
             ......   
        }

而TREEBIN的值为-2,也就是小于0成立,根据他的说明,TREEBIN想要代表的是一颗红黑树的根节点,所以在判断到table的某个位置上的第一个节点的hashCode值小于0的时候,就可以判断为该位置上是一棵红黑树,继续回到get方法,如果是红黑树,则通过调用Node的find方法来查找到节点,而这个Node的find方法在子类中被重写,所以会直接调用子类的find方法来进行查找。还有一种情况是table的index位置上为一条链表,那么就通过链表的查找方法来进行记录查找。最后需要注意的是,ConcurrentHashMap是一种线程安全的HashMap,但是我们没有发现在get方法的过程中使用任何与锁等效的组件来做线程同步,为什么呢?对于读来说,允许多个线程一起读是很正常的,而且在Node的实现上,ConcurrentHashMap做了一些手脚:

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
   
        ....
   }


    /**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;

我们发现table数组是被volatile关键字修饰的,这就代表我们不需要担心table数组的线程可见性问题,也就没有必要再加锁来实现并发了。

删除操作属于写类型的操作,所以在进行删除的时候需要对table中的index位置加锁,ConcurrentHashMap使用synchronized关键字将table中的index位置锁住,然后进行删除,remove方法调用了replaceNode方法来进行实际的操作,而删除操作的步骤首先依然是计算记录的hashCode,然后根据hashCode来计算table中的index值,然后根据table中的index位置上是一条链表还是一棵红黑树来使用不同的方法来删除这个记录,删除记录的操作需要进行记录数量的更新(调用addCount方法进行)。

参考资料

www.importnew.com/22007.htmlwww.jianshu.com/p/f6730d578…blog.csdn.net/mine_song/a…

相关推荐