高并发之并发编程
并发编程,主要目的是能够充分利用系统资源,能够以一个最大的资源利用率去运行程序。通常我们提并发编程一般都包括并发和并行。并发是指在一个CPU内,多个程序可以看似并发地执行(实际上并不是);并行就是可以充分使用多核CPU的优势,同一时间可以保证有多个CPU在执行任务。
讲到并发,除了出提高资源利用率,此外就是并发安全,对于共享变量,如何保证在多个并发程序执行下是安全的,也是我们谈并发编程最最关心的事情了。尤其是多线程和协程,在编程中如何确保并发安全更加重要。通常情况下,都是加各种锁,锁的类型有很多很多,大体上可以分为乐观锁和悲观锁。尤其是在Java中,互斥锁,读写锁,共享锁,可重入锁,CAS等等。
其实除了共享变量的安全性,还有其他许多问题,比如性能问题。哎?刚才不是说充分利用系统,咋又说性能问题了?那是因为多进程或者多线程的调度是要进行系统资源切换的,上下文切换的过程是要消耗系统的一部分性能的。因此如何只是并发数,如何合适选择并发模式,也是一个重要因素。
就说线程上下文切换,CPU在处理多任务时,需要不断地完成不同的线程之间的调度,而每个线程都会有自己独立的上下文环境(程序计数器,寄存器,栈等等)。当某种因素引起了切换,就要执行相应过程。
通常引起线程的上下文切换的因素主要包括:
- 当前执行任务的时间片用完之后,系统CPU正常调度下一个任务;
- 当前执行任务碰到IO阻塞,调度器将此任务挂起,继续下一任务;
- 多个任务抢占锁资源,当前任务没有抢到锁资源,被调度器挂起,继续下一任务;
- 用户代码挂起当前任务,让出CPU时间;
- 硬件中断;
第一个因素是时间片用完,CPU使用时间片轮转的方式执行为每个任务分配了一定的时间片,当用完了,就会进行任务切换。
对于线程数的设置,还要根据具体情况设定,还要看任务是IO密集型的还是CPU密集型的。如果CPU密集型的,设置过多的线程毫无用处,一般都设置为 cpu数量+1;如果是IO密集型的,多数耗时都在IO上,因此完全可以设置大一点,一般设置为2*cpu数量。
下面是我测试的不同线程的数量耗时。任务是IO密集型的。测试机器是8核CPU。
@Slf4j
@Service
public class MerchantDailyPay implements MerchantDaily {
@Autowired
SumDailyMapper sumDailyMapper;
private ExecutorService executorService = new ThreadPoolExecutor(
8,17,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000)
);
private ExecutorService executorService2 = new ThreadPoolExecutor(
17,37,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000)
);
@Override
public void getMerchantSumPay() {
Long start = System.currentTimeMillis();
log.info("start at:{}",System.currentTimeMillis());
for (int i=10;i<100;i++){
Long pay = sumDailyMapper.getSumPay((long)i);
for (int j=0;j<5000;j++){
log.info("merchant:{},j:{}",i,j);
}
}
Long end = System.currentTimeMillis();
log.info("it costs :{}",end - start);
}
@Override
public void getMerchantDailyByCon() throws ExecutionException, InterruptedException {
List<Future<Long>> teslist = new ArrayList<>();
Long start = System.currentTimeMillis();
for (int i=10;i<100;i++){
MerchantThread merchantThread = new MerchantThread(sumDailyMapper,(long)i);
teslist.add(executorService.submit(merchantThread));
}
for (Future future:teslist){
Object pay = future.get();
}
Long end = System.currentTimeMillis();
log.info("it costs :{}",end - start);
}
@Override
public void getMerchantDailyByCon2() throws ExecutionException, InterruptedException {
List<Future<Long>> teslist = new ArrayList<>();
Long start = System.currentTimeMillis();
for (int i=10;i<100;i++){
MerchantThread merchantThread = new MerchantThread(sumDailyMapper,(long)i);
teslist.add(executorService2.submit(merchantThread));
}
for (Future future:teslist){
Object pay = future.get();
}
Long end = System.currentTimeMillis();
log.info("it costs :{}",end - start);
}
}
耗时依次是43.9s,16.1s,13.9s。
我接下来使用Golang的协程跑一下:
import (
context2 "context"
"fmt"
"hnshop/hnserver/rpc/mysql"
"hnshop/hnserver/utils/log"
"sync"
"time"
)
func executeTask(merchantId int,wg *sync.WaitGroup) {
contex:=context2.Background()
engine := mysql.GetDefault("writer").Engine
//query database
sql:= "select sum(pay) as pay from sum_daily where merchant_id=" + string(merchantId) + " and sum_date>=\"2020-01-01\""
result,_:=engine.Query(sql)
log.I(contex,"test","merchantresult:%v:i:%v",merchantId,result)
for i:= 0;i<5000;i++ {
log.I(contex,"test","merchant:%v:i:%v",merchantId,i)
}
(*wg).Done()
}
func GoFun() {
start:= time.Now().Unix()
wg := sync.WaitGroup{}
for id:= 1;id<100;id++ {
wg.Add(1)
go executeTask(id,&wg)
}
wg.Wait()
end := time.Now().Unix()
print(end-start)
fmt.Print(end-start)
log.I(context2.Background(),"ce","it cost:+%v",end-start)
}
平均耗时:11.5s。
其实,之前我为此写了很多篇文章了,当然有可能不精啊:
我整理完,突然发现,我草,我都写了这么多水文了。为什么感觉还是一塌糊涂呢。
参考资料:
微信分享/微信扫码阅读