Java并发编程之基本使用

本文就是介绍简单的Java多线程的使用。

1、实现Thread子类;

2、实现Runnable接口;

3、实现Callable接口;

4、线程池;

1、实现Thread子类

  protected class MyThread extends Thread {

        @Override
        public void run() {
            log.info(String.format("I am %s",Thread.currentThread().getName()));
            try
            {
                Thread.sleep(1000L);
            }catch (Exception e){
                log.error("{} in {}",e.getMessage(),Thread.currentThread().getName());
            }
            log.info("finish {}",Thread.currentThread().getName());
        }
    }

只要继承了Thread基类,并重载run即可

2、实现Runnable接口

 protected class ThreadRuable implements Runnable {

        @Override
        public void run() {
            System.out.println(String.format("I am %s",Thread.currentThread().getName()));
            try
            {
                Thread.sleep(1000L);
            }catch (Exception e){
                log.error("{} in {}",e.getMessage(),Thread.currentThread().getName());
            }
            log.info("I am {}",Thread.currentThread().getName());
        }
    }

只要实现了Runnable接口即可。

对于上面两种,一般情况下,使用Runnable更妥,因为Java的线程池可以有效管理实现Runnable接口的线程,而我们通常情况下使用线程池更多一些。

但Runnable本身也存在一些“缺点”,打引号的意思是说它是缺点,并不意味着就是它的不足,只是在某些特定环境下,可能不适用。即它不能返回值,也不能抛出异常,如果有些特定场景下希望可以得到返回值的话,就不能用Runnable(我觉得这样的场景不多,我们完全可以通过全局变量或者中间件来传递变量),可以考虑实现Callable接口。

3、Callable接口

 protected class ThreadCall implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            System.out.println(String.format("I am %s",Thread.currentThread().getName()));
            try
            {
                Thread.sleep(1000L);
            }catch (Exception e){
                log.error("{} in {}",e.getMessage(),Thread.currentThread().getName());
            }
            log.info("I am {} call",Thread.currentThread().getName());
            return (int)Thread.currentThread().getId();
        }
    }

    public void rune(){
        log.info("Start to exec multi thread at {}",DateLib.getCurTimeStamp());
        for (int i=1;i<10;i++){
//            Thread thread = new Thread(new ThreadRuable());    //Thread子类
//            Thread thread = new Thread(new MyThread());       //实现Runnable接口
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new ThreadCall());
           Thread thread = new Thread(futureTask);
           thread.start();
           try
           {
//               log.info("get from futuretask result :{}", futureTask.get());   //get方法会阻塞线程
           }catch (Exception e){
               log.error(e.getMessage());
           }
        }
        log.info("multi thread are finished  at {}",DateLib.getCurTimeStamp());
    }

一般Callable会配合线程池使用

  public void rune(){
        log.info("Start to exec multi thread at {}",DateLib.getCurTimeStamp());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future> futures = new ArrayList<Future>(10);
        for (int i=1;i<10;i++){
          Future future = executorService.submit(new ThreadCall());  //提交有返回值的;无返回值的使用execute方法
          futures.add(future);
        }
        executorService.shutdown();
        for (Future future:futures){
           try{
               log.info("get result from future:{}",future.get());
           }catch (Exception e){
               log.error(e.getMessage());
           }
        }
        try{
            Thread.sleep(1000L);
        }catch (Exception e){

        }
        log.info("multi thread are finished  at {}",DateLib.getCurTimeStamp());
    }

get方法会一直阻塞直到线程执行完成。

上面的Future对象提供了很多的使用方法:

public abstract interface Future<V> {
    public abstract boolean cancel(boolean paramBoolean);

    public abstract boolean isCancelled();    //检测在任务在正常完成前是否已经取消
 
    public abstract boolean isDone();     //检测任务是否已经完成

    boolean cancel(boolean mayInterruptIfRunning);  //取消任务

    public abstract V get() throws InterruptedException, ExecutionException;    //获取返回结果

    public abstract V get(long paramLong, TimeUnit paramTimeUnit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

上面使用的Future是按照添加的顺序执行的,某些场景还是希望哪些完成,就执行哪些,因此还有一种实现方式是使用CompletionService。


    public void rune(){
        log.info("Start to exec multi thread at {}",DateLib.getCurTimeStamp());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(executorService);

        for (int i=1;i<10;i++){
          executorService.submit(new ThreadCall());
        }
        executorService.shutdown();
        for (int i=1;i<10;i++){
           try{
               log.info("get result from future:{}",cs.take().get());
           }catch (Exception e){
               log.error(e.getMessage());
           }
        }
        try{
            Thread.sleep(1000L);
        }catch (Exception e){

        }
        log.info("multi thread are finished  at {}",DateLib.getCurTimeStamp());
    }

当然,完全可以把其理解为非阻塞的。

4、线程池

在日常应用中,更多的还是使用线程池的。因为无限制的创建线程是存在很多的风险的。

比如增加了系统资源的消耗,线程的建立和释放过程都占用一定的系统资源。此外,无限制的占用资源,不断地增加内存消耗,对系统来说是很危险的,因为一般我们的Server对JVM最大内存都是有限制的,很有可能发生OOM。

线程池很好得解决了上面的问题,线程池内部会有一个阻塞队列,要执行的任务会被放到阻塞队列,线程会从中去任务来执行。当有新任务加入阻塞队列中,一个空闲线程会将其取出执行。

我从网上找了一张线程池的UML图

上面Executors是基础接口,主要可执行实现Runnable的线程。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

其最重要的子接口是ExecutorService,它本身定义了具体的一些行为方法。

1,execute(Runnable command):执行Ruannable类型的任务,
2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
3,shutdown():在之前的任务都已经提交,且没有新任务接受,关闭,该操作不会等待之前的任务都执行完成。
4,shutdownNow():停止所有任务,并返回正在等待的任务列表。
5,isTerminated():测试是否所有任务都履行完毕了。
6,isShutdown():测试是否该ExecutorService已被关闭。

线程池本身也是有状态变化的,主要包括RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

线程池的基础实现类是ThreadPoolExecutor.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

Executors的几个创建线程池的方法其实都是使用该类创建的。阿里对于线程池的创建的建议也是直接使用

ThreadPoolExecutor,主要是我们自己控制各种参数,防止出现OOM。

现在解释一下里面的参数。

* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
  corePoolSize是表示核心线程池大小。如果当前线程池的线程数量小于该值,当有一个新任务需要执行时,即使现在有空闲的线程,也会新建一个线程。

* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
  maximumPoolSize表示线程池允许创建的最大的线程池数。

* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
 字面意思已经很清楚了,当线程池的数目大于核心线程数(corePoolSize)时,空闲线程可存活的时间。当超过了这个时间,空闲的线程就会被回收。


* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}

workQueue消费队列,主要有以下几种:

ArrayBlockingQueue:是一个数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

LinkedBlockingQueue:是一个单向链表实现的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,Executors.newCachedThreadPool使用这个队列。这个有点像golang中的channel。

PriorityBlockingQueue:优先级队列,时基于数组的堆结构,进入队列的元素按照优先级会进行排序。

至于选择哪种就要看实际情况。如果想要一个带有边界的就选择数组结构的,如果想要没有边界的,可考虑链表结构的队列。虽然有不同数据结构实现的队列,但他们的本质都是阻塞型队列。

一个简单的例子:

public class ThreadPoolPra {

   private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
     5,
     10,
     10,
     TimeUnit.SECONDS,
     new ArrayBlockingQueue<>(10),
           Executors.defaultThreadFactory(),
           new ThreadPoolExecutor.AbortPolicy()

   );


   public void run(){
       log.info("start to run thread pool example at {}",DateLib.getCurTimeStamp());
       for (int i = 1;i<10 ;i++){
           threadPoolExecutor.execute(
                   ()-> {
                       log.info("it is thread:{} in pool",Thread.currentThread().getName());
                       try{
                           Thread.sleep(1000L);
                       }catch (Exception e){
                           log.error(e.getMessage());
                       }
                       log.info("finish thread:{} in pool",Thread.currentThread().getName());
                   }
           );
       }
       threadPoolExecutor.shutdown();
       try{
           Thread.sleep(2000L);
       }catch (Exception e){

       }
       log.info("main thread pool example is finished at {}",DateLib.getCurTimeStamp());
   }
}

Executors类中实现了几个创建线程池的方法,但在设计业务中,并不建议直接用,主要考虑以下方面:

1) FixedThreadPool 和 SingleThreadPool :
使用的阻塞队列时链表队列LinkedBlockingQueue,允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
2) CachedThreadPool 和 ScheduledThreadPool :
允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

再多说以下ScheduledThreadPool,它使用的时DeleyWorkQueue,这时一种基于堆结构的阻塞队列,和上面的PriorityBlockingQueue类似。

  public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

线程池的任务提交过程:

1、如果当前线程池中的线程数小于corePoolSize,就会新建线程;

2、如果当前队列未满,就向队列添加任务;

3、如果队列已满,且线程数小于maxPoolSize,就新建线程;

4、如果队列已满,且线程数大于maxPoolSize,就执行拒绝策略。

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

线程池的生命周期:

RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed

再说下关于线程池大小的设定,线程池大小设置为多少才合适。这个还要看我们的程序是计算密集型的还是IO密集型的。

如果是计算密集型的,那么线程池大小一般设置为CPU数量+1即可,+1是为了确保如果偶尔因为某个线程因为页面缺失或其他原因暂停时,可以切换到多余的线程,确保CPU时钟周期不被浪费掉;对于IO 密集型的可以使用更多的线程来执行,至于设置成多少,在《Java并发编程实战》提到了一个公式,根据CPU数量,等待时间和计算时间比等等因素。也就是说实际上对于IO密集型任务来说,并没有一个严格的标准,应该设置成多少,而是要根据实际的系统配置和任务类型来决定。所以我们在编程的时侯可以自己试试,设置不同的线程,看看哪个数量的线程池利用率最高。

我在本地模拟了一下,CPU是8核,我所写的任务的等待时间核比达到6.1。当我设置最大线程数是8时,CPU利用率达到60%左右,当我线程数设置为60时,几乎快跑满整个计算机的CPU了。

但是要注意出现线程饥饿死锁。线程饥饿死锁是指任务间的相互依赖,执行中的任务一等待着等待队列中的任务二执行完再处理,但任务二需要等任务一处理完才能够执行。这通常会出现在但线程池或者线程池不够大的场景。

线程池需要注意的问题:

线程池的参数要设置合适,要考虑几个因素:

1、CPU利用率,性能提升;

2、内存,注意不要因为并发处理,导致OOM(使用有界队列,控制同一时刻并发量);

关于线程池较好的文章: 线程池的介绍

--------EOF---------
微信分享/微信扫码阅读