java并发容器之ConcurrentHashMap
concurrentmap作为jdk.utils.concurrent包下的一个类,在并发编程中起到了举足轻重的作用。Spring的底层数据结构就是使用concurrentmap实现的。因此好好学习学习该类很重要。
在学习它之前,应该先了解了解它的两个老哥,HashMap和HashTable。看看为啥在现如今的并发编程中都不怎么用它俩。
1、HashMap
HashMap本身就不是线程安全的。如果想要执行并发操作,就必须加同步锁,比如synchorized或者ReentrantLock等,而这种互斥锁本身性能上并不是最优的。
那么现在看看HashMap的结构来看看为啥说HashMap是线程不安全的。
自JDK1.8开始,HashMap的存储结构是数组+链表+红黑树的结构,其基本思想就是根据key,利用Hash算出其在数组中的下标,如果两个key计算出的Hash值相同,意味着发生了碰撞,随后会使用链表存储这两个value,如果同一数组索引位置发生碰撞的个数大于8个,链表就会转换为红黑树存储,从而减小数据查找的时间复杂度。
jdk1.8的hashmap的hash值相对于1.7有了较大的改变。其让hash的高低位进行了&运算,目的是能够尽量让hash分布变得更加均匀,尽量减小发生冲突的概率。
查询流程get:
1、首先根据hash以及长度算出在数组中的下标。
2、根据下标取对应位置元素,如果目标位置的next是空,直接取当前数据返回;
3、如果不是,要判断当前是不是链表结构,如果是就从头到尾遍历,直到取到数据或者尾部即可;如果当前是红黑树,则根据红黑树获取目标数据。这时比较的是key是否equal.
插入元素流程put:
1、计算数组下标;
2、如果目标位置是空,则直接设置元素。table[i]=newNode;
3、如果不是空,则要看当前是不是红黑树,如果不是,就插入到链表的尾部。添加完后,如果达到了红黑树的阈值,就要将链表变成红黑树;
扩容流程resize:
扩容固定扩展到原size的两倍。
1、循环遍历原hashmap的数组,重新根据键的hash值和长度取模 h & (length-1),计算下标;
2、如果当前位置的next是空,即没有冲突数据,则直接在新数组的位置赋值即可;
3、如果不是,则要根据是红黑树还是链表做处理。如果是链表,则将数据分散到新数组的两个位置,一个是原位置,一个是原位置+原数组大小;如果是红黑树,同理也是要分散到两个位置,特别的是分散后的红黑树如果冲突小于达到了UNTREEIFY_THRESHOLD这个变量,会再转成链表结构。
那到底为啥HashMap是线程不安全的呢?主要有两个方面:
1、数据不一致。
假如现在有两个线程,都要添加一个元素,并且都要向同一个数组索引的位置插入元素。若此时线程1首先获得了链表头,但在添加前线程1挂起了(可能时间片用完了或者其他原因),线程2开始操作,并成功添加了元素。之后线程1又被唤醒,但此时它持有旧的链表头,那么此时它的添加操作就有可能覆盖线程2的操作。其实这个很好理解,这个和我们之前说的多线程并发修改一个共享变量是一个意思啊。
2、引起死循环
当HashMap容量不够的时侯,就会执行扩容即resize操作,如果多个线程都进行resize容易引起死循环。
当然死循环这个问题只会1.8之前的版本出现(之前采用的是头插法),jdk1.8开始采用的是尾插法,即扩容时也是保证链表的原有顺序,不会出现死循环的问题。
关于死循环的问题就不具体说了,有一篇文章介绍的更好,可以看一下: 疫苗:JAVA HASHMAP的死循环
2、HashTable
HashTable是线程安全的,但是性能却不怎么好,因为该类的几乎每个方法几乎都用到了synchorized内置锁。所以一般情况下,我们也不用它。
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
3、ConcurrentHashMap
ConcurrentHashMap是并发操作比较优秀的容器,目前也是得到了广泛的应用。
在JDK1.8之前,其采用数组+链表的结构,并将整个数组分段(Segment),并定义一个内部类Segment,其继承自ReentrantLock,有了段,加锁时不必将整个Map加锁,只需要对某个段加锁,这样不会将整个Map锁死。它的思想就是将整个锁分离,从而提高并发性能。
自JDK1.8开始,其数据结构和HashMap一样都是采用了数组+链表+红黑数的数据结构。它不再使用之前的分段锁,而是使用CAS+synchorized锁实现,关于CAS的介绍,我之前写过文章: Java多线程共享变量同步机制 。但是1.8仍然保留了原有的Segment的概念,这是为了兼容之前版本的序列化。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
它调用了一个原子操作:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
看上面代码可看见其调用了Unsafe的getObjectVolatile这个native方法。通过使用volative定义,保持了数据可见性。
其他原子操作:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
casTabAt即采用CAS的方式更新数据。
ConcurrentMap在执行put操作的时侯,主要有三个重要的并发控制。
1、但table是空时,会初始化数据。此时会通过volatile变量进行CAS操作来初始化数组;
2、如果对应数组元素(链表表头或者红黑树的根)为null,就采用CAS添加,即调用上面的casTabAt;
3、如果不为null,就使用synchorized加锁。代码:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//null,cas添加
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
不为null,加锁,向链表或者红黑树增加元素
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
综上所述,Concurrent实现线程安全的三大法宝:volatile变量+CAS+synchorized锁。
一篇不错的文章: java-并发-ConcurrentHashMap高并发机制-jdk1.8
练习:通过多线程,将每个月的统计数据添加到concurrentmap中,返回给http.
入口:
public Object getMerchantAllData(Integer merchant_id,String start,String end) throws Exception{
List<String> months = DateLib.getAllMonths(start,end);
ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap = new ConcurrentHashMap<>(months.size());
System.out.println(months);
ExecutorService executorService = Executors.newCachedThreadPool();
for (String month : months){
executorService.execute(new MerchantMonthlyDataThread(this,conMerHashMap,month,merchant_id));
}
executorService.shutdown();
executorService.awaitTermination(30,TimeUnit.DAYS);
HashMap<String,Object> map = new HashMap<>();
for (String key:conMerHashMap.keySet()){
map.put(key,conMerHashMap.get(key));
}
return SortMap.sortMapByKey(map,false);
}
线程实现类:
package com.haibo.demo.task;
import com.haibo.demo.service.SAPService;
import com.haibo.demo.service.impl.SAPServiceImpl;
import com.haibo.demo.s.MerchantFinanceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//@Component
public class MerchantMonthlyDataThread implements Runnable {
private Logger logger = LoggerFactory.getLogger(MerchantMonthlyDataThread.class.getName());
private ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap;
private String month;
private Integer merchant_id;
private static MerchantMonthlyDataThread merchantMonthlyDataThread;
private SAPService sapService;
public ConcurrentHashMap<String, MerchantFinanceData> getConMerHashMap() {
return conMerHashMap;
}
public String getMonth() {
return month;
}
public Integer getMerchant_id() {
return merchant_id;
}
public SAPService getSapService() {
return sapService;
}
public void setSapService(SAPService sapService) {
this.sapService = sapService;
}
public MerchantMonthlyDataThread(SAPService sapService, ConcurrentHashMap<String,MerchantFinanceData> conMerHashMap, String month, Integer merchant_id){
this.conMerHashMap = conMerHashMap;
this.month = month;
this.merchant_id = merchant_id;
this.sapService = sapService;
}
// @PostConstruct
// public void init() {
// merchantMonthlyDataThread = this;
// merchantMonthlyDataThread.sapService = this.sapService;
// }
@Override
public void run() {
logger.info("Start to run thread:" + Thread.currentThread().getName());
try {
logger.info(Thread.currentThread().getName() + "开始获取数据:" + this.month + "商户:" + this.merchant_id);
// if (null == merchantMonthlyDataThread.sapService){
// System.out.println("the service is null");
// }
MerchantFinanceData merchantFinanceData = this.sapService.getMonthlyMerchantData(this.month,this.merchant_id);
System.out.println("当前月份获取成功:" + merchantFinanceData);
// MerchantFinanceData merchantFinanceData = new MerchantFinanceData();
merchantFinanceData.setMonth(this.month);
merchantFinanceData.setMerchant_id(this.merchant_id);
logger.info("开始添加数据:" + this.month);
conMerHashMap.put(this.month,merchantFinanceData);
System.out.println(conMerHashMap);
}catch (Exception e){
logger.error("获取商户数据出错.月份:" + this.month + "错误信息:" + e.getMessage());
System.out.println(e);
}finally {
lock.unlock();
}
logger.info("End for thread:" + Thread.currentThread().getName());
}
}
我在向全局变量添加数据时,依然添加了同步锁,因为我也不知道会发生什么错误,还是保险起见。
此外,还有一个问题:在非controller层注入service,会提示为null,然后我按照网上的一通操作,在类上加@Component注解,增加PostContruct注解的init方法等。但在初始化参数上,一直出错。后来我就把接口当作参数传进去了。
参考资料:
微信分享/微信扫码阅读