java并发容器之ConcurrentHashMap

concurrentmap作为jdk.utils.concurrent包下的一个类,在并发编程中起到了举足轻重的作用。Spring的底层数据结构就是使用concurrentmap实现的。因此好好学习学习该类很重要。

在学习它之前,应该先了解了解它的两个老哥,HashMap和HashTable。看看为啥在现如今的并发编程中都不怎么用它俩。

1、HashMap

HashMap本身就不是线程安全的。如果想要执行并发操作,就必须加同步锁,比如synchorized或者ReentrantLock等,而这种互斥锁本身性能上并不是最优的。

那么现在看看HashMap的结构来看看为啥说HashMap是线程不安全的。

自JDK1.8开始,HashMap的存储结构是数组+链表+红黑树的结构,其基本思想就是根据key,利用Hash算出其在数组中的下标,如果两个key计算出的Hash值相同,意味着发生了碰撞,随后会使用链表存储这两个value,如果同一数组索引位置发生碰撞的个数大于8个,链表就会转换为红黑树存储,从而减小数据查找的时间复杂度。

jdk1.8的hashmap的hash值相对于1.7有了较大的改变。其让hash的高低位进行了&运算,目的是能够尽量让hash分布变得更加均匀,尽量减小发生冲突的概率。

查询流程get:

1、首先根据hash以及长度算出在数组中的下标。

2、根据下标取对应位置元素,如果目标位置的next是空,直接取当前数据返回;

3、如果不是,要判断当前是不是链表结构,如果是就从头到尾遍历,直到取到数据或者尾部即可;如果当前是红黑树,则根据红黑树获取目标数据。这时比较的是key是否equal.

插入元素流程put:

1、计算数组下标;

2、如果目标位置是空,则直接设置元素。table[i]=newNode;

3、如果不是空,则要看当前是不是红黑树,如果不是,就插入到链表的尾部。添加完后,如果达到了红黑树的阈值,就要将链表变成红黑树;

扩容流程resize:

扩容固定扩展到原size的两倍。

1、循环遍历原hashmap的数组,重新根据键的hash值和长度取模 h & (length-1),计算下标;

2、如果当前位置的next是空,即没有冲突数据,则直接在新数组的位置赋值即可;

3、如果不是,则要根据是红黑树还是链表做处理。如果是链表,则将数据分散到新数组的两个位置,一个是原位置,一个是原位置+原数组大小;如果是红黑树,同理也是要分散到两个位置,特别的是分散后的红黑树如果冲突小于达到了UNTREEIFY_THRESHOLD这个变量,会再转成链表结构。

那到底为啥HashMap是线程不安全的呢?主要有两个方面:

1、数据不一致。

假如现在有两个线程,都要添加一个元素,并且都要向同一个数组索引的位置插入元素。若此时线程1首先获得了链表头,但在添加前线程1挂起了(可能时间片用完了或者其他原因),线程2开始操作,并成功添加了元素。之后线程1又被唤醒,但此时它持有旧的链表头,那么此时它的添加操作就有可能覆盖线程2的操作。其实这个很好理解,这个和我们之前说的多线程并发修改一个共享变量是一个意思啊。

2、引起死循环

当HashMap容量不够的时侯,就会执行扩容即resize操作,如果多个线程都进行resize容易引起死循环。

当然死循环这个问题只会1.8之前的版本出现(之前采用的是头插法),jdk1.8开始采用的是尾插法,即扩容时也是保证链表的原有顺序,不会出现死循环的问题。

关于死循环的问题就不具体说了,有一篇文章介绍的更好,可以看一下: 疫苗:JAVA HASHMAP的死循环

2、HashTable

HashTable是线程安全的,但是性能却不怎么好,因为该类的几乎每个方法几乎都用到了synchorized内置锁。所以一般情况下,我们也不用它。

 public synchronized V get(Object key) {
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return (V)e.value;
            }
        }
        return null;
    }

  public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
        Entry<K,V> entry = (Entry<K,V>)tab[index];
        for(; entry != null ; entry = entry.next) {
            if ((entry.hash == hash) && entry.key.equals(key)) {
                V old = entry.value;
                entry.value = value;
                return old;
            }
        }

        addEntry(hash, key, value, index);
        return null;
    }

3、ConcurrentHashMap

ConcurrentHashMap是并发操作比较优秀的容器,目前也是得到了广泛的应用。

在JDK1.8之前,其采用数组+链表的结构,并将整个数组分段(Segment),并定义一个内部类Segment,其继承自ReentrantLock,有了段,加锁时不必将整个Map加锁,只需要对某个段加锁,这样不会将整个Map锁死。它的思想就是将整个锁分离,从而提高并发性能。

自JDK1.8开始,其数据结构和HashMap一样都是采用了数组+链表+红黑数的数据结构。它不再使用之前的分段锁,而是使用CAS+synchorized锁实现,关于CAS的介绍,我之前写过文章: Java多线程共享变量同步机制 。但是1.8仍然保留了原有的Segment的概念,这是为了兼容之前版本的序列化。

  public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

它调用了一个原子操作:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

}

看上面代码可看见其调用了Unsafe的getObjectVolatile这个native方法。通过使用volative定义,保持了数据可见性。

其他原子操作:

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
casTabAt即采用CAS的方式更新数据。

ConcurrentMap在执行put操作的时侯,主要有三个重要的并发控制。

1、但table是空时,会初始化数据。此时会通过volatile变量进行CAS操作来初始化数组;

2、如果对应数组元素(链表表头或者红黑树的根)为null,就采用CAS添加,即调用上面的casTabAt;

3、如果不为null,就使用synchorized加锁。代码:

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             //null,cas添加
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                 不为null,加锁,向链表或者红黑树增加元素
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

综上所述,Concurrent实现线程安全的三大法宝:volatile变量+CAS+synchorized锁。

一篇不错的文章: java-并发-ConcurrentHashMap高并发机制-jdk1.8

练习:通过多线程,将每个月的统计数据添加到concurrentmap中,返回给http.

入口:


    public  Object getMerchantAllData(Integer merchant_id,String start,String end) throws Exception{
        List<String> months = DateLib.getAllMonths(start,end);
        ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap = new ConcurrentHashMap<>(months.size());
        System.out.println(months);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (String month : months){
            executorService.execute(new MerchantMonthlyDataThread(this,conMerHashMap,month,merchant_id));
        }
        executorService.shutdown();
        executorService.awaitTermination(30,TimeUnit.DAYS);
        HashMap<String,Object> map = new HashMap<>();
        for (String key:conMerHashMap.keySet()){
            map.put(key,conMerHashMap.get(key));
        }
        return SortMap.sortMapByKey(map,false);
    }

线程实现类:

package com.haibo.demo.task;

import com.haibo.demo.service.SAPService;
import com.haibo.demo.service.impl.SAPServiceImpl;
import com.haibo.demo.s.MerchantFinanceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//@Component
public class MerchantMonthlyDataThread implements Runnable {

    private Logger logger = LoggerFactory.getLogger(MerchantMonthlyDataThread.class.getName());


    private ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap;

    private String month;

    private Integer merchant_id;

    private static MerchantMonthlyDataThread merchantMonthlyDataThread;

    private SAPService sapService;

    public ConcurrentHashMap<String, MerchantFinanceData> getConMerHashMap() {
        return conMerHashMap;
    }

    public String getMonth() {
        return month;
    }

    public Integer getMerchant_id() {
        return merchant_id;
    }

    public SAPService getSapService() {
        return sapService;
    }

    public void setSapService(SAPService sapService) {
        this.sapService = sapService;
    }

    public MerchantMonthlyDataThread(SAPService sapService, ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap, String month, Integer merchant_id){
        this.conMerHashMap = conMerHashMap;
        this.month = month;
        this.merchant_id = merchant_id;
        this.sapService = sapService;
    }

//    @PostConstruct
//    public void  init() {
//        merchantMonthlyDataThread = this;
//        merchantMonthlyDataThread.sapService = this.sapService;
//    }

    @Override
    public void run() {
        logger.info("Start to run thread:" + Thread.currentThread().getName());
        try {
            logger.info(Thread.currentThread().getName() + "开始获取数据:" + this.month + "商户:" + this.merchant_id);
//            if (null == merchantMonthlyDataThread.sapService){
//                System.out.println("the service is null");
//            }
            MerchantFinanceData merchantFinanceData = this.sapService.getMonthlyMerchantData(this.month,this.merchant_id);
            System.out.println("当前月份获取成功:" + merchantFinanceData);
//            MerchantFinanceData merchantFinanceData = new MerchantFinanceData();
            merchantFinanceData.setMonth(this.month);
            merchantFinanceData.setMerchant_id(this.merchant_id);
            logger.info("开始添加数据:" + this.month);
            conMerHashMap.put(this.month,merchantFinanceData);
            System.out.println(conMerHashMap);
        }catch (Exception e){
            logger.error("获取商户数据出错.月份:" + this.month + "错误信息:" + e.getMessage());
            System.out.println(e);
        }finally {
            lock.unlock();
        }
        logger.info("End for thread:" + Thread.currentThread().getName());
    }
}

我在向全局变量添加数据时,依然添加了同步锁,因为我也不知道会发生什么错误,还是保险起见。

此外,还有一个问题:在非controller层注入service,会提示为null,然后我按照网上的一通操作,在类上加@Component注解,增加PostContruct注解的init方法等。但在初始化参数上,一直出错。后来我就把接口当作参数传进去了。

参考资料:

深入理解HashMap

--------EOF---------
微信分享/微信扫码阅读