Java多线程共享变量同步机制及各种锁
- 线程安全介绍;
- Java synchronized
- 死锁
- Java ReentantLock
- Java ReentrantReadWriteLock
- volatile
- CAS
并发编程中永远都避不开的一个话题就是线程安全。如何保证在多线程场景中共享,可变的变量是准确的,不会产生脏数据是一个很重要的课题。
首先描述一下可能导致线程不安全的原因。
每个线程之间的数据是不可见的,每个线程首先都会从主内存中获取原始数据,并加载到自己的工作内存中,假如说,现在主内存中有一个整数变量a,值为1,此时线程A从工作内存中取出,并想将其值加1;线程B同时也从主内存中获取变量a,且发生在A返回修改后的值之前取出,那么此时线程B拿到的也是1。也就是说,经过两个线程分别加1,最后变量a值实际上只实现了一次加1,最后变成2,而不是我们想要的结果3.
对于这种问题,各大编程语言都有着各种优秀的同步机制,从而来保证每个线程修改后的值可以被其他的线程看到。Python是通过GIL实现(比较弱鸡,只是保证同一个时间永远只有一个线程在工作),PHP是多进程的,其实本身并没有什么好的锁机制,主要还是依靠文件锁,数据库锁或者缓存队列来实现。golang和Java的锁机制相对来说更完善一些。此外,在分布式系统还有分布式锁,比如通过Redis(RedLock算法),zookeeper(通过创建临时节点)等实现的分布式锁。
在golang中,主要有互斥锁,读写锁等等。感兴趣的可以见: golang中的锁机制 。
因为目前是在学Java,所以详细记录Java中的各种同步机制。
1、synchronized
synchronized是Java的一个内置锁,是一种独占互斥锁。使用它可以保证弱某个线程拿到了锁,其他线程必须等待其完成后,才可以进行操作,是一种阻塞式的锁。那么它到底是如何实现的呢?
synchronized最传统的锁依赖的是依赖操作系统的Mutex Lock。那么Mutex Lock是如何工作的呢?
Mutex的机制是若线程可以得到锁,就占有该做并执行;若获得不到锁,线程就进入休眠状态。
它的底层基本实现思路就是通过对一个标志位的置位来表示加锁还是未加锁,比如0表示加锁,1表示未加锁。当处于加锁的状态,就实现了对临界区(一个共享资源访问的保护片段,当有线程进入临界区,其他线程只能等待,互斥锁就是一种实现机制)的保护。
下面是其结构体:
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
struct thread_info *owner;
const char *name;
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
上面的count是一个标志位,已经有注释了。那么问题是,count本身也是一个变量,又怎么保证获取并设置该标志位的过程是原子操作呢?
比较著名的一种做法是 test-and-set-lock(TSL),是一种不可中断的原子运算,实现方式有硬件或者软件的方式,如果感兴趣可以看下维基百科的介绍: 检查并设置-维基百科
上述原子操作,在单核系统是完全没有问题的,但在多核系统中,虽然是原子操作,但单靠它是完全没法做到CPU间的同步的。基于此,操作系统可以通过控制总线(比如保持同一时间只能有一个处理器操作),基于缓存锁定(保证同一时间只有一个处理器在操作缓存)或者其他实现方式。
互斥锁本身是有缺点的,因为如果线程抢不到锁的话,就会进入休眠状态,待有锁资源再被换起。休眠和
换起的这两个状态,都是耗费CPU的,因为CPU要实现线程的切换。基于此,操作系统本身还有一种实现,叫做自旋锁。它和互斥锁一样,都属于独占锁。但它在未获得锁资源的时侯,不会进入休眠,而是不断重试,直到成功获取锁。
通过自旋锁,线程没必要进入休眠状态,避免了上述互斥锁带来的缺点。但同时,它本身也是存在缺点的。因为,如果一直没有线程释放锁,那么他就要不断地,永远地重试下去。而这本身对CPU来说也是一种程度的消耗,尤其线程本身执行的任务时间又较长的话。
对于无限循环的问题,最后是有重试次数限制的,如果超过一定次数还未获得锁,就会挂起线程。
在JDK1.6之后,又提出了自适应自旋锁。自适应自旋锁重试次数或时间不再固定,而是由前一次在同一个锁对象的自旋时间及锁的拥有着的状态来决定。如果上一个自旋时间很短。那它这次也认为自己可以获得锁,这时它就允许自旋时间长一些。
此外,在单核系统中,只有一个线程获取锁,没必要不断重试,不像是在多核系统,你可以等到别的线程释放锁,可以进行重试。
那么说到这,实际情况到底该怎么用呢?
比如可以先尝试采用自旋锁,重试一定次数或者一定时间,如果还获取不到锁,再真正进入休眠等等。看到StackOverFlow有人说过,其实实际中互斥锁用的还是比较多,自旋锁只有在满足一定条件下才可能得到性能的提升,比如多核,比如任务不能太过耗费时间等等。
说了一大堆,看看Java的synchronized是如何实现的。
使用synchronized的代码块在编译的时侯会插入monitorenter和monitorexit字节码指令.
上图中有两个exit,主要是为了保证异常的情况下可退出,即其自身含有tryfinally。
对于被synchorized修饰的方法,会被一个叫ACC_SYNCHRONIZED的修饰,有该修饰的方法就会尝试获取锁资源。
对象应用的锁是保存在对象头中的,对象头中的MARK WORD存储的就是锁的信息。至于具体头中保存的锁信息是什么,还要根据具体的锁机制有关,有重量级锁,轻量级锁还有偏向锁。
(图片来源于网络)
重量级锁就是Java最原始的一种锁,线程上来就是尝试获取锁,无锁就直接挂起,并被放入队列中。其实在这也可以看出来,这是一种非公平锁,线程都是先尝试获取锁,获取不成功,再被放入同步队列,每个新来的都可以插队。
也是基于此,自Java1.6后,又引入了轻量级锁和偏向锁。
因此,synchronized锁的升级过程是 无锁->偏向锁->轻量级锁->重量级锁,且这个过程是不可逆的。
偏向锁
其加锁思路:
1、当对象第一次被线程获得锁的时侯,首先检查其是否是可偏向状态(可见上图的偏向锁标志位,且是无锁状态);
2、如果是,那么线程会通过CAS获得偏向锁,如果成功,则开始执行同步代码;如果失败,就往下走;
3、当发现当前对象的偏向锁是当前线程时(就是通过对象头中的线程id),会不必再进行CAS操作,直接向线程栈中添加已调Display hdr是空的Lock Record,然后开始执行同步代码;
3、当发现当前对象的锁不是当前线程时,当到达安全局点(该时间节点没有任何字节码执行),首先会检查偏向的线程是否还存活,如果不存活,直接将对象锁状态置为无锁状态(01),然后获取锁,如果仍存活,就直接转向偏向锁。
释放锁:
线程不主动释放锁,只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁,而他的释放也很简单,直接将Lock Record的只向对象的引用字段设置为null。
由此可见,偏向锁的加锁和释放可能都不需要CAS的操作。
偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
偏向锁的应用场景基本上是实际上只有一个线程在执行同步块的任务。
轻量级锁
当某个线程持有了偏向锁,且另外的线程有锁竞争时就会升级为轻量级锁。
轻量级锁的获取锁的思路是:
1、若该对象无锁,虚拟机会在当先线程栈中建立一个名为Lock Record的空间,然后将目标对象头中的Mark Record拷贝过来,并记录只向对象mark record的指针。
图片来源于网络:
2、然后通过CAS(Compare and Set,后面会讲,一种乐观锁机制),试着修改对象的锁信息为自己的Lock Record地址,如果修改成功,即加锁成功;如果失败,向下走;
3、首先判断是不是对象锁指向的是当前线程栈的,如果是,证明是重入了,此时也证明获取锁成功,可执行同步代码块。与此同时,将线程栈中Lock Record的(Displaced Mark Word)设置为null.
4、到这步,证明获取锁失败了,那么线程可以通过自旋锁的形式再次尝试获取锁,如果仍未成功(仍未成功是指自旋一定次数或时间仍然未获取到锁),直接膨胀为重量级锁。
解锁的过程是:
1、首先遍历所有线程栈中owner(Lock Record的第二部分,上面的图中已经标记)值向当前对象的Lock Record空间;
2、如果displaced hrd是null,证明是重入的,就将obj设置为null,然后继续;
3、同样是通过CAS的方式,试着将当前锁对象的Mark word替换成displaced hdr;
4、如果失败,则直接膨胀为重量级锁。
死锁
死锁有两种情况:
1、在mutex lock下,假如一个已经获得锁的线程又重新获得锁,那么此时也会被挂起,这样就一直阻塞住了。当然,目前Java实现的锁是可重入的。
比如synchorized的重入性时通过一个计数器来实现的,每一个锁都会关联一个线程和计数器,当没有获取锁时,计数器时0,同个线程每获取一次同样的锁,计数器都会+1.
2、比较著名的了。就是线程A占用资源 1,2;线程B占用资源2,1,两个线程都在等待对方释放资源,锁都不释放,此时就会出现死锁。
为了避免死锁就需要我们在业务中注意,比如在一个线程尽量使用一个锁,一个锁尽量只占用一个资源等等。
ReentrantLock是一个实现Lock接口的锁,他其实和内置锁synchorized是一样的,都属于独占锁。只是比synchorized多了一些方法,此外需要显示地加锁和释放锁,而不是像synchorized那样隐士地释放锁。
如果不是有什么特殊要求,synchorized就够了,目前的性能是足够的,除非有些特定场景,必须要我们业务自己去管理锁:
1.ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。
2.ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
3.ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。这个意思就是如果一个持有锁的线程长期不释放锁,阻塞等待的线程可以终端去做别的事情。然而snchronized是不允许的。
看下获取锁的源码:
其核心是Sync,该抽象类是继承了AbstractQueuedSynchronizer,即AQS。通过state这个volatile变量以及CAS操作来判断是否能成功获取锁。如果不能获取锁,就会进入一个双向链表构成的双端队列。
即执行的是:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
之所以用双端队列,原因:
1、一个入队,一个出队,FIFO;
2、当前节点需要判断前一个节点的状态。如果是前节点是head,就尝试获取锁,如果pre节点的等待状态waitStatus,如果是SIGNAL,即被阻塞,就挂起,防止做无效的自旋。
3、还是说waitStatus状态很重要,如果某些节点是CANCEL,会从后往前把CANCEL的剔除。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先是非公平锁:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
变量c是获取到当前的状态,是一个volatile 变量。可以看到,其采用CAS操作获取锁,只要可获取到锁,就可插队,是一种非公平锁,也是默认采用的一种方式,主要是为了减少线程切换,增大吞吐量。此外,好像也没有太多情况要求必须要使用公平锁。
此外,ReentantLock支持公平锁:
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
相对于非公平锁,它就是要判断是否队列中有等待的线程,只有没有线程再等待,才会执行CAS操作,才有机会获取锁。
此外,此外该锁支持重入。
锁的释放:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
上面只有c是0时,锁才会真正被释放,否则只会将volatile变量减1.
上面的synchorized和ReentrantLock都属于独占锁,即同一时刻只能有一个线程拥有锁,其他线程处于阻塞状态;还有一种锁,叫做共享锁,它允许在同一时刻读锁可以被多个线程拥有,但写锁只有一个线程持有,Java中的ReentrantReadWriteLock就是一种共享锁。
相对于ReendtentLock,它也使用一个volatile变量state为同步状态,但其用了高16位表示读锁,低16位为写锁。
写锁的加锁过程:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
上面的代码过程就是首先获取总的锁数,再获取写锁数目。
如果总锁数不为0,写锁数为0或者不是当前线程,意味着此时有读锁,此时获取写锁失败。这步保证读锁的数据是准确的。
然后如果当前锁数目为0,假如当前线程需要阻塞或者CAS获取锁失败,则获取写锁失败。
写锁的释放过程和ReentantLock基本类似,因为写锁本质也是排他锁,都是通过减小状态计数的数量:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
再看下读锁的获取过程:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
如果写锁被其他线程占有,那么其获取读锁失败;
看该线程是否在读线程阻塞队列,不在就尝试使用CAS获取读锁。
读锁的释放相对比较复杂的,首先线程内部本身是维护了一个读锁的个数(采用ThreadLocal的数据结构),此外还要维护一个读锁的总数,就是上面提到的volatile变量的state的高16位,读锁总数的减小是先线程安全的,因为它使用的CAS操作。例子:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
读写锁的应用可以应用在缓存中。比如缓存有效的情况下,支持并发读;当缓存失效的时侯,只能独写,即只允许一个线程写。
上面的几种锁都提到了一个同步队列,即AQS(AbstractQueuedSynchronizer)队列,锁的实现是完全基于它的,它主要用于资源访问的控制。实际上在JAVA中,不仅是锁,还有很多同步器都依赖于AQS,比如Semaphore,CountDownLatch等等。
AQS的结构主要是采用了一个volatile状态位state + 一个FIFO的双向队列的方式。
state是一个状态位,表示是否获取锁,0是未获取,1是已获取,大于1是已重入的次数(互斥锁)。state本身是一个volatile变量,其本身通过CAS进行更新。
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
FIFO双向队列结构类似下面的图(来自并发编程的艺术):
每个节点都是一个Node,存储了线程的引用和状态。
static final class Node {
// 当前Node的状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
volatile Thread thread;
}
独占锁的基本流程:通过CAS获取锁,即CAS state的值,如果成功即获取到锁,如果失败,就会将线程加到同步队列队尾,并且线程开始自旋,当然不是一直自旋,如果其前驱节点是head,那么其就会一直自旋等待池有锁的线程释放锁,如果不是,其就会被阻塞,waitStatus变为SIGNAL,直到下次被唤醒,此时如果前驱节点是取消状态,还要去除对应节点。
public final void acquire(int arg) {
//tryAcquire可以是字类实现的获取锁的方是, addWaiter是获取锁失败加入到同步队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter主要是将当前线程加到同步队列对尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这里要注意几个问题:
1、由于是多线程,所以加到队尾时要使用CAS操作。但是我们在进行head节点设置的时候,由于只能有一个线程获取到锁,所以并不需要CAS
2、先设置前驱节点node.prev = pred;,再设置tail。保证tail的前驱节点不是空。
接下来就是线程自旋的过程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果前驱节点时头节点,就自旋获取锁。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//根据前驱节点的状态来决定后续操作,如果是SIGNAL,直接就挂起,如果是CANCEL,就清除节点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果一直获取不到,就会取消获取,这里就是进行节点删除,此外如果是自旋之后,还要唤醒后继线程
if (failed)
cancelAcquire(node);
}
}
上面是独占锁家所的过程,接着是释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//只有head不为空,且waitStatus不为0时才需要唤醒后继线程。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
独占锁的释放比较简单,因为其不涉及到并发,所以直接就唤醒后继线程就可以,如果下一个节点的状态不是取消,则可唤醒,够泽从尾部开始向前找,直到找到符合条件(waitStatus<0)的线程唤醒。
共享锁和独占锁的区别是多个线程可以同时获取到锁。在AQS中的区别是
1、多个一个waitStatus的状态,PROPAGATE,这个是为了解决并发释放锁时可能会出现队列不再被唤醒的问题;
在释放锁的时侯,如果当前waitStatus是0的话,就要将其变为PROPAGATE。随后在自旋获取锁的过程要进行setHeadAndPropagate操作,这步是要唤醒后继的线程,这个就是为了防止因为并发导致线程不能被正常唤醒。
关于AQS更详细的一篇文章: Java队列同步器AQS
补充:
1、锁消除:在编译器运行期间,对于一些使用代码同步,但不可能出现数据共享竞争的锁进行释放。
2、锁粗化:如果一连串的操作是对同一个对象的频繁的加锁释放锁,有时甚至在循环体中进行加锁释放锁操作,这种高频的锁操作也会带来一定的性能影响。因此虚拟机检测到有这种情况发生的时侯就会把锁操作加到外部,就是将锁粗化。
volatile
volatile是一个比synchorized更轻量级的一种同步机制。
它最重要的两个特性就是:
1、保持变量的内存可见性;
2、禁止指令重排。
被其修饰的变量保证内存可见性,意思是说一个线程对变量的修改,其他线程是可以看到的。它的实现方式主要是通过操作系统层面,即在执行写操作的时侯,CPU会在总线发送一个Lock指令(或者采用缓存锁定),使得当前只能有一个处理器处理内存数据,上面也提到过了。
虽然volatile保证可见性,但是还要格外注意,避免非原子性的操作,否则会出现异常,比如下面这个例子:
public class LockPra {
public static volatile Integer s = 0;
public static ReentrantLock lock = new ReentrantLock();
public class Thre implements Runnable {
public void test(){
s++;
log.info("{} set s into {}",Thread.currentThread().getName(),s);
}
@Override
public void run() {
log.info("start thread:{}",Thread.currentThread().getName());
test();
log.info("thread:{} fininshed",Thread.currentThread().getName());
}
}
public void runLock(){
log.info("Start main at {}",DateLib.getTime());
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i=0;i<9;i++){
executorService.execute(new Thre());
}
executorService.shutdown();
try{
Thread.sleep(1L);
}catch (Exception e){
}
log.info("End main at {},S:",DateLib.getTime(),s);
}
}
s++是一个非原子性的操作,首先执行读,然后写,读和写并不是原子操作。结果就像下面:
线程1和2在读的时侯都获取了当前最新值0,当线程1修改了i的值,线程2中的i值失效了,但是由于读写不是原子操作,此时它并不会再去读一次线程i的有效值,而是直接执行了+1的操作,所以都更新了当前的值为1。如果要实现同步,还是要加锁。volitale更适合一写,多读的情况或者至少写操作不依赖于当前变量的值。
volatile的实现原理就是通过加入内存屏障。
(1)在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。
(2)在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。
(3)在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序。
说到内存可见性,除了volatile,还有锁,以及final。锁自不必说。说一下final
final也可以保证内存的可见性,意思是一旦构造器初始化完成被final修饰的变量,且没有通过this引用传递,那么其他线程就能看见final变量的值。
对于final的介绍可以看这篇文章: final关键字深入解析
volatile变量第二个特性:可以保证代码有序性。
因为处理器本身在执行代码的时侯,会根据情况进行优化,可能会发生指令重排,即程序执行的顺序可能和代码编写顺序不一致。volatile可以保证有序性。有序性到底有何好处呢?
一个非常著名的就是单例模式的DCL(Double Checking Lock).
public class TestL {
public static Signle signle;
public Signle getSignle(){
if(null == signle){
synchronized (this){
if(null == signle){
signle = new Signle();
}
}
}
return signle;
}
public class Signle {
}
}
上面一是判断单例是不是null,如果是null,利用同步锁,初始化。乍一看,没啥问题的。当判断不是null时,尝试获取锁,获取成功就实例化对象。
但上面的操作在多线程可能会出现错误,某个线程可能会获得一个未初始化完成的实例。
一个对象的初始化其实是分几步来进行的:
1、内存地址分配;
2、初始化对象;
3、将内存空间地址赋值给对象的引用。
在初始化的过程中,非常有可能因为指令重排,导致步骤2和3执行顺序颠倒。而这引起的一个问题就是,假如线程A在初始化的过程中,但还并未实际初始化对象的时,线程B判断此时并不是null(第一个null),就直接返回这个不成熟的对象,那么此时就会出现问题的。
而利用volatile修饰变量,就可以解决这个问题,它会禁止2和3两个步骤重排序。
在Java中有个原则叫做happens-before原则:
- 程序顺序规则:同一个线程中的,前面的操作 happen-before 后续的操作。(即单线程内按代码顺序执行。但是,在不影响在单线程环境执行结果的前提下,编译器和处理器可以进行重排序,这是合法的。换句话说,这一是规则无法保证编译重排和指令重排)
- 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
- volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
- 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
- start()规则:如果线程A执行操作ThreadB.start()(启动线程B),那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作。
- join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
- 程序中断规则:对线程interrupted()方法的调用先行于被中断线程的代码检测到中断时间的发生。
- 对象finalize规则:一个对象的初始化完成(构造函数执行结束)先行于发生它的finalize()方法的开始。
JVM通过内存屏障实现了有序性,禁止指令重排,内存屏障是一组处理器指令,具体的可以网上查阅,只要知道volatile变量保证下面两点:
1)当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;
2)在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。
说到DCL,额外延伸,除了使用volatile解决双重检查锁定问题外,还有一种方法可解决DCL,那就是基于类的初始化。
public class TestL {
private static class ClassInstan {
public static Signle signle = new Signle();
}
public Signle getSignle(){
return ClassInstan.signle;
}
public class Signle {
}
在调用getSignle方法时,会进行类的初始化。当有个多个线程来调用时侯,JVM会为线程加一把初始化锁,这样获取到锁的线程,第一次会执行初始化过程,其他未获取初始化锁的会进行等待,待初始化完成后,会通知所有等待的线程。通过类的初始化,不管Signle对象初始化过程中是否有指令重排序,对于其他线程来讲都是不可见的,只有等到初始化完成后,才可看到。
CAS
上面说了很多关于锁的概念,其中很多地方都提到了CAS,CAS是一种乐观锁的概念。悲观锁就是我认为数据肯定有别人会改,所以在改之前,我必须加锁,上面说的都是悲观锁(虽然他们内部使用了CAS,但不是针对变量本身来说的),比如数据库的锁 select ---for update;而乐观锁是只有在真正更新的过程中才会去检查数据是否有被修改。因此,乐观锁的性能要优于悲观锁的。比如Redis中的watch,Java中的原子类 (AtomicInteger等等),都是采用CAS的思想。
下面就具体解释一下Java乐观锁CAS的原理。
Java中 utils.cocurrent.atomic中的类都是原子类,都采用了CAS的思想。在上面提到的几种锁,在获取锁的过程中也都用到了CAS思想。现在拿AtomicInteger为例子:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
unsafe就是用来获取内存中的数据,Java需要通过本地方法获取内存数据;valueoffset是value在AtomicInteger中偏移量,即内存偏移地址,是一个静态变量;value就是存储的值。
还要说一下,Unsafe类不是提供给用户使用的类,从其代码就可以看出来:
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
关于Unsafe的介绍可以看下面的一篇文章: Java魔法类:Unsafe应用解析
AtomicInteger中比较重要的是下面的方法。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
getIntVolatile是获取在对象中valueset偏移处的值,然后循环比较该处的值,和原始值va5是否一致,如果一致就更新为var5+var4,并退出循环.
compareAndSwapInt定义:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
可以看到,该方法是一个系统本地方法,具体的就不说了,但其最终是会调用一个汇编指令:cmpxchg。
该指令可以保证比较交换的原子操作,这个在上面已经提到了,就是通过Lock指令或缓存锁定来实现的。
上面就是CAS实现的一个基本原理,从上面也可以看到,CAS本身也存在这几个问题:
1、无限循环
在进行比较更新的时侯,是一个while循环,即不成功的时侯,它会一直循环,不断尝试。该逻辑带来的问题就是会增加系统开销。
2、ABA
这个是比较著名的一个问题了。线程1在修改A的时侯,突然挂起了,这个时侯其他线程修改将值成了B,后又被线程修改成了A,而恰好此时线程1又要执行比较交换,发现原始值还是A,此时它就又执行update操作,而此时原始值已经被修改过了,值已经经历了A-B-A的过程。这其中一个典型的场景就是链表或栈等数据结构,通过当前节点获取下个节点,如果又ABA可能会有问题。
那解决的方式可以通过版本号的方式,或者增加引用。在Java中使用AtomicStampedReference解决ABA问题,它的思想就是不仅比较当前引用的值,也比较一个类似于时间戳的值,两者都相等才可以修改。
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
上面的stamp就是一个版本号,每次更新的时候都会变更。执行CAS的时候,不仅当前引用要一致,其中的版本号stamp也必须相同。这样就完美解决了ABA问题。
参考资料:
深入分析 java 8 编程语言规范:Threads and Locks
【死磕Java并发】—–深入分析synchronized的实现原理
微信分享/微信扫码阅读