高并发之并发编程

并发编程,主要目的是能够充分利用系统资源,能够以一个最大的资源利用率去运行程序。通常我们提并发编程一般都包括并发和并行。并发是指在一个CPU内,多个程序可以看似并发地执行(实际上并不是);并行就是可以充分使用多核CPU的优势,同一时间可以保证有多个CPU在执行任务。

讲到并发,除了出提高资源利用率,此外就是并发安全,对于共享变量,如何保证在多个并发程序执行下是安全的,也是我们谈并发编程最最关心的事情了。尤其是多线程和协程,在编程中如何确保并发安全更加重要。通常情况下,都是加各种锁,锁的类型有很多很多,大体上可以分为乐观锁和悲观锁。尤其是在Java中,互斥锁,读写锁,共享锁,可重入锁,CAS等等。

其实除了共享变量的安全性,还有其他许多问题,比如性能问题。哎?刚才不是说充分利用系统,咋又说性能问题了?那是因为多进程或者多线程的调度是要进行系统资源切换的,上下文切换的过程是要消耗系统的一部分性能的。因此如何只是并发数,如何合适选择并发模式,也是一个重要因素。

就说线程上下文切换,CPU在处理多任务时,需要不断地完成不同的线程之间的调度,而每个线程都会有自己独立的上下文环境(程序计数器,寄存器,栈等等)。当某种因素引起了切换,就要执行相应过程。

通常引起线程的上下文切换的因素主要包括:

  1. 当前执行任务的时间片用完之后,系统CPU正常调度下一个任务;
  2. 当前执行任务碰到IO阻塞,调度器将此任务挂起,继续下一任务;
  3. 多个任务抢占锁资源,当前任务没有抢到锁资源,被调度器挂起,继续下一任务;
  4. 用户代码挂起当前任务,让出CPU时间;
  5. 硬件中断;

第一个因素是时间片用完,CPU使用时间片轮转的方式执行为每个任务分配了一定的时间片,当用完了,就会进行任务切换。

对于线程数的设置,还要根据具体情况设定,还要看任务是IO密集型的还是CPU密集型的。如果CPU密集型的,设置过多的线程毫无用处,一般都设置为 cpu数量+1;如果是IO密集型的,多数耗时都在IO上,因此完全可以设置大一点,一般设置为2*cpu数量。

下面是我测试的不同线程的数量耗时。任务是IO密集型的。测试机器是8核CPU。

@Slf4j
@Service
public class MerchantDailyPay implements MerchantDaily {

    @Autowired
    SumDailyMapper sumDailyMapper;

    private ExecutorService executorService = new ThreadPoolExecutor(
            8,17,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000)
    );

    private ExecutorService executorService2 = new ThreadPoolExecutor(
            17,37,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000)
    );


    @Override
    public void getMerchantSumPay() {
        Long start = System.currentTimeMillis();
        log.info("start at:{}",System.currentTimeMillis());
        for (int i=10;i<100;i++){
            Long pay = sumDailyMapper.getSumPay((long)i);
            for (int j=0;j<5000;j++){
                log.info("merchant:{},j:{}",i,j);
            }
        }
        Long end = System.currentTimeMillis();
        log.info("it costs :{}",end - start);
    }

    @Override
    public void getMerchantDailyByCon() throws ExecutionException, InterruptedException {
        List<Future<Long>> teslist = new ArrayList<>();
        Long start = System.currentTimeMillis();
        for (int i=10;i<100;i++){
            MerchantThread merchantThread = new MerchantThread(sumDailyMapper,(long)i);
            teslist.add(executorService.submit(merchantThread));
        }
        for (Future future:teslist){
            Object pay = future.get();
        }
        Long end = System.currentTimeMillis();
        log.info("it costs :{}",end - start);
    }

    @Override
    public void getMerchantDailyByCon2() throws ExecutionException, InterruptedException {
        List<Future<Long>> teslist = new ArrayList<>();
        Long start = System.currentTimeMillis();
        for (int i=10;i<100;i++){
            MerchantThread merchantThread = new MerchantThread(sumDailyMapper,(long)i);
            teslist.add(executorService2.submit(merchantThread));
        }
        for (Future future:teslist){
            Object pay = future.get();
        }
        Long end = System.currentTimeMillis();
        log.info("it costs :{}",end - start);
    }

}

耗时依次是43.9s,16.1s,13.9s。

我接下来使用Golang的协程跑一下:

import (
	context2 "context"
	"fmt"
	"hnshop/hnserver/rpc/mysql"
	"hnshop/hnserver/utils/log"
	"sync"
	"time"
)

func executeTask(merchantId int,wg *sync.WaitGroup)  {
	contex:=context2.Background()
	engine := mysql.GetDefault("writer").Engine
	//query database
	sql:= "select sum(pay) as pay from sum_daily where merchant_id=" + string(merchantId) + " and sum_date>=\"2020-01-01\""
	result,_:=engine.Query(sql)
	log.I(contex,"test","merchantresult:%v:i:%v",merchantId,result)
	for i:= 0;i<5000;i++ {
		log.I(contex,"test","merchant:%v:i:%v",merchantId,i)
	}
	(*wg).Done()
}

func GoFun()  {
	start:= time.Now().Unix()
	wg := sync.WaitGroup{}
	for id:= 1;id<100;id++ {
		wg.Add(1)
		go executeTask(id,&wg)
	}
	wg.Wait()
	end := time.Now().Unix()
	print(end-start)
	fmt.Print(end-start)
	log.I(context2.Background(),"ce","it cost:+%v",end-start)
}

平均耗时:11.5s。

其实,之前我为此写了很多篇文章了,当然有可能不精啊:

Python异步IO原理

并发编程之I/O多路复用及几种I/O模式

并发编程之多进程

并发编程之多线程详解

并发编程之异步IO

Java的NIO及Netty

Java多线程共享变量同步机制

Java并发编程之基本使用

Java并发容器之ConcurrentMap

Java并发容器之ConcurrentLinkeQueue

Java并发同步之阻塞队列与Condition接口

并发编程整理

并发编程之实例

Go通过Channel实现并发写入Log

Golang通过Channel控制并发

Go的高并发

Redis并发

Nginx实现高并发

我整理完,突然发现,我草,我都写了这么多水文了。为什么感觉还是一塌糊涂呢。

参考资料:

啃碎并发(三):Java线程上下文切换

--------EOF---------
微信分享/微信扫码阅读