JAVA 并发编程实战学习笔记(二)

《JAVA 并发编程实战学习笔记 二》


概述

  • 《学习笔记一》从线程安全的定义,常见的线程安全错误,导致错误发生的原因,安全的对象发布,不可变对象,同步容器类等描述了并发编程中问题发生的原因,场景,解决方式和一些良好实践以及工具类(关于同步工具类的详解部分可以在《AQS队列同步器中(二)》看到),也是并发编程实战的第一部分,现在学习第二部分结构化并发应用程序的内容。

任务执行:

  • 概述:在正常负载的情况下,服务器应用程序应该表现出良好的吞吐量和快速的响应性–能够快速地处理很多的任务,从应用程序提供方来说希望能够支持更多的用户,节约成本;用户则需要更好的使用体验。而且当程序负载过高时,正确的表现不应该是直接失败,而是降低处理任务的速率。

  • 串行任务:书中列举了一个简单串行WEB服务器例子,以一次用户请求作为任务边界:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    class SingleThreadWebServer{
    public static void main(String[] args)throws IOException{
    ServerSocket socket = new ServerSocket(80);
    while(true){
    Socket connection = socket.accept();
    handleRequest(connection);
    }
    }
    }
    • 它的问题在于,当接受了一个用户请求之后,使用当前线程去处理这个任务,在任务处理完成之前如果还有其他线程请求这个服务器,只能等待当前的任务处理完成。在只使用了一小部分系统资源的情况下,却不能同时处理多个独立任务,在性能和效率来说是比较差的。

  • 因为要处理的任务彼此之间是独立的,所以可以考虑优化成并行执行任务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class SingleThreadWebServer{
    public static void main(String[] args)throws IOException{
    ServerSocket socket = new ServerSocket(80);
    while(true){
    final Socket connection = socket.accept();
    Runnable task = new Runnable(){
    public void run(){
    handleRequest(connection);
    }
    };
    new Thread(task).start();
    }
    }
    }
    • 改进后的代码将每个任务的执行过程封装在了线程中,而主线程可以继续监听其他的用户请求并处理,在系统资源富余的情况下,提高了吞吐量和响应性
  • 不足

    • 线程的生命周期开销很高 :在JVM和操作而言,线程创建和销毁都是代价的,在请求速率非常高的情况下,大量创建线程会消耗比较多的系统资源。
    • 资源限制 : 可运行的线程数量受到所处软硬件的环境的影响,创建大量的线程并不代表它们都可以获得资源去执行,只有一小部分线程可以顺利执行,其他的线程都会处于闲置状态,却又各自持有一定的系统资源,例如内存;当CPU有空闲,唤醒所有线程去竞争的时候,也会产生额外的消耗。所以当目前已经有足够的线程在执行的情况下,创建更多的线程非但不会带来性能的提升,反而会由于资源被占用和竞争CPU导致处理性能降低
  • Executor框架

    • 概述:任务是一组逻辑工作单元,而线程是使任务异步执行的机制。 之前我们已经看到了单纯的串行执行任务和通过无节制创建线程处理任务的不足,串行的性能比较糟糕,而无节制创建线程又会带来资源管理的问题

    • 在生产-消费者模式中,生产者和消费者之间的工作队列通常是个有界队列,这样在生产速率或者消费速率较低的时候可以制定合理的策略处理消息的生产,有界队列可以很好地实现对资源的管理

    • Executor 框架是JUC提供的一种灵活的线程池实现,Executor 基于生产-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者,使用 Executor 框架实现并行任务,会指定一个线程池的大小,并重复利用池中已经创建的线程。将前面的例子使用Executor 框架实现并发部分:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      class TaskExecutionWebServer{
      private static final int NTHREADS = 100;
      private static final Executor exec = new Executors.newFixedThreadPool(NTHEREADS);

      public static void main(String[] args) throws IOException{
      ServerSocket socket = new ServerSocket(80);
      while(true){
      final Socket connection = new ServeSocket(80);
      Runnable task = new Runnable(){
      public void run(){
      handleRequest(connection);
      }
      };
      exec.execute(task);
      }
      }
      }
      • 使用 Executor 框架的基本方式就是上面这样:为每个任务创建 Runnable 代表任务的执行过程,然后将 Runnable 作为参数传递给 execute 方法,异步地执行任务。

      • 通过实现 Executor 框架还可以自行实现不同策略的execute,例如为每个任务创建一个新的线程:

        public void execute(Runnable r){new Thread(r).start();}

        或者每次都使用同一个 Runnable 对象,使任务串行地执行:

        public void execute(Runnable r){r.run();}

      • 这样的异步方式,将任务的提交和执行解耦开来,还可以根据不同的任务类型指定不同的执行策略,具备很大的灵活性

    • 线程池

      • 概述:简单来说是管理线程的资源池,使用线程池有几点优势
        • 当任务到达时,因为已经提前在线程池池中初始化了一定的线程,可以直接执行任务不需要先等待线程创建
        • 线程池的处理策略通常是复用已有的空闲线程,而不是创建新线程,避免了多余的创建销毁线程的动作。
      • 使用
        • Executors 提供了一些静态方法,可以很方便地创建不同类型的线程池
          • newFixedThreadPool : 创建一个固定长度的线程池,当创建的线程数量达到上限时线程池的规模不会再变化。
          • newCachedThreadPool : 创建一个可缓存的线程池,线程池的规模没有做限制,当处理需求增加时会创建新的线程处理,在大量不间断请求的情况下,可能并不适用
          • newScheduledThreadPool : 也是一个固定长度的线程池,可以以延迟或者定时的方式执行任务,,类似于Timer(定时器)
          • newSingleThreadExecutor : 单线程的线程池,如果它因为异常结束了线程,会创建另一个线程代替它执行,它能确保任务按照在队列中的顺序来串行执行
    • Callable 和 Future : 携带结果的异步任务

      • 概述:前面看到的 Executor 框架使用,是以一个 Runnable 代表任务执行,一个很明显的限制条件就是无法返回任务的执行结果,而 Callable 接口作为任务提交时可以返回一个 Future, Future 的 get() 方法可以返回任务的执行结果,例如计算等。

      • Callable : 可以作为 submit(Callable) 的参数传递,返回结果是一个 Future,Future 的get()方法用来获取Callable 任务的结果,但是 get() 的返回是根据任务运行状态决定的

        • 任务正在执行:无参的 get() 方法阻塞,直到任务完成或者抛出异常。

        • 任务被取消:抛出 CancellationException

        • 任务已完成 : 立即返回任务执行结果

        • 任务抛出异常 : get()方法会将该异常封装成 ExecutionException 继续抛出

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          public class Renderer{
          private static final ExecutorService exec = Executors.newFixedThreadPool(10);
          ....(){
          Future future = exec.submit(new Callable<String>(){
          public String call() {
          return "result";
          }
          });
          System.out.printle(future.get());
          }
          ...
          }
    • CompletionService :

      • 概述: 如果执行了一组计算任务,并且希望在计算后获得结果,可以保留与每个任务相关的Future然后把结果统一,稍微繁琐了一点,CompletionService 可以简化这个过程

      • CompletionService 把 Executor 和 BlockingQueue 的功能结合起来,例如声明泛型类型为 Callable进行初始化,所有需要执行的任务由 CompletionService 的 submit(Callable) 方法执行;随后可以调用 CompletionService 的 take() 方法获取已经执行完成的任务的结果,这个操作就是基于阻塞队列实现的。

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        public class Renderer{
        private final ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<String> service = new ExecutorCompletionService<String>(executor);
        public void doExecute(){
        for(int i=0;i<10;i++){
        service.submit(new Callable<String>(){
        public String call(){
        return "result";
        }
        })
        }
        }

        public void getResult(){
        for(int j=0;j<10;j++){
        Future future = service.take();
        System.out.println(future.get());
        }
        }


        }
        • 构建一个 CompletionService 需要一个 ExecutorService 对象,多个 CompletionService 可以共享一个 Executor 对象,意味着可以以来公有的 Executor 构建出不同的基于特定计算使用的 CompletionService

取消和关闭:

  • 概述: 前面提到过中断(interrupt)的概念:中断是一种线程之间的协作机制,当A线程中断B线程,并不代表B线程会立即停止运行,它只会建议B线程运行到可以停止的地方停下来。也就是说线程并没有一种可以立刻暂停正在执行中的线程的方法,原因有很多,立刻暂停线程也是不建议的,因为可能会导致严重的数据不一致现象,正确的行为是安全地暂停正在执行的任务,再退出任务

  • 任务取消:如果外部代码能够在任务完成之前将其置为完成状态,这个操作就被称为是可取消的。

    • 需要取消任务的原因:

      • 用户主动撤销请求
      • 有时间限制的请求
      • 应用程序事件:应用程序对问题空间进行分解,使不同的任务搜索不同的区域,当其中一个任务查找到结果时,其他的搜索任务需要取消
      • 发生错误:和前一个类似,都是多任务并行,当其中一个任务发生错误,例如因为内存不足无法再保存数据,其他的同样类型的任务也需要取消
      • 关闭:程序或者服务关闭
    • 设置取消标志

      • 概述:JAVA只有一些协作机制,使请求取消的任务和代码都遵循一种商定的协议。对任务设置一个取消请求标志,每次任务执行之前检查这个标志,在某个时刻设置这个标志位取消请求,之后的任务就不会再执行

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        //为了作为任务存在,实现了Runnable
        public class PrimeGenerator implement Runnable{
        private final List<Integer> primes = new ArrayList<>();
        //使用 volatile 变量保护取消状态
        private volatile boolean cancelled;

        public void run(){
        //检查取消标志
        if(!cancelled){
        //执行任务
        ......
        }
        }

        //设置任务取消状态
        public void cancel(){
        cancelled = true;
        }
        }
        • 设置取消状态的方法,最好在 finally 块中调用,防止因为程序错误导致无法正常取消任务
    • 中断

      • 概述:上面的的取消任务的例子通过在每次任务执行之前检查取消标志,决定是否执行任务,但是如果执行任务的线程因为某个方法阻塞,外部代码调用取消的方法也无法成功取消任务,因为一个人任务正在等待,所以无法再检查取消标志。

      • 每个线程都具备 boolean 类型的中断状态,当中断一个线程时,它的中断状态就会被设置为true。Thread 类中提供了中断,查询中断状态以及清除中断状态的方法。

      • 而 Thread 类本身的阻塞方法例如 Thread.sleep() 以及对象的Object.wait() 方法,都会先检查线程的中断标志,并在发现按中断时提前返回;发现中断的情况下它们执行的操作是:清除中断状态,抛出 InterruptedException,表示阻塞提前结束。

      • 对于中断操作的正确理解是:它并不会真正地中断一个正在运行中的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。

      • 中断策略:如同任务的取消策略一样,线程也应该包含中断策略。中断策略规定线程如何解释中断请求–当发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子操作,以及以多快的速度响应中断。最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务及(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略:例如暂停服务或者重新开始服务。

      • 真正能够处理中断请求的线程不一定是执行任务的线程,而是线程所有者,例如服务或线程池。因此当执行中的线程遇到中断请求,可以在处理完自身的一些逻辑之后,抛出中断异常,告知线程所有者发生了中断。如果除了将中断异常抛出去,还需要做其他的处理,需要保存中断状态:Thread.currentThread().interrupt().

      • 响应中断:

        • 处理中断异常的两种方式:
          • 抛出 InterruptedException :这样你的方法也变成了一个可中断的阻塞方法
          • 恢复中断状态:传递异常给外层代码,让它去处理异常。
      • 通过Future来实现取消

        • 前面我们已经知道 Executor.submit()方法返回一个 Future 包含任务执行的结果信息。Future 本身拥有一个 cancel 方法,该方法带有一个 mayInterruptIfRunning 参数,表示任务能否接受中断,而不是能否检测并处理中断,这一点很重要:对于能够响应中断的任务,可以设置为true,可以将运行中的任务中断;如果设置为false,则会选择在下一次任务开始之前拒绝执行任务,因为任务无法中断。
      • 处理不可中断的阻塞:

        • 如果一个线程由于等待某个内置锁而阻塞,那么无法响应中断,因为线程会认为它会获得锁。因此可中断的获得锁的方法就显得灵活而重要,例如Lock类的 lockInterruptibly,同步工具类实现的 acquire(long timeout) 等。
      • 采用newTaskFor封装非标准的取消:

        • newTaskFor 是一个工厂方法

          newTaskFor(Runnable runnable ,T value){ return new FutureTask<T>(runnable,value);}

          newTaskFor(Callable callable){return new FutureTask<T>(callable);}

          两种实现,分别基于runnable 和 Callable 作为参数实现,返回了 FutureTask 。书上提供了一个很好的例子,基于特定的应用场景(能否响应中断),封装取消的动作:

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          //实现 Callable 接口定义带有取消方法和创建新任务的CancellableTask<T>接口
          public interface CancellableTask<T> extends Callable<T>{
          void cancel();
          RunnableFuture<T> newTask();
          }

          //实现自定义的取消操作线程池
          public class CanaellingExecutor extends ThreadPoolExecutor{
          protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
          //判断了一下参数的Callable 类型
          if(callable instanceof CancellableTask){
          return ((CancellableTask<T>) callable).newTask();
          }else{
          return super.newTaskFor(callable);
          }
          }
          }

          //实现基于 Socket IO 任务使用的Task类
          public abstract class SocketUsingTask<T> implements CanaellableTask{

          private Socket socket;
          protected synchronized void setSocket(Socket s){socket = s;}

          //取消方法会执行 socket 关闭,
          public synchronized class cancel(){
          try{
          if(socket != null){
          socket.close();
          }catch(IOException e){}
          }
          }

          public RunnableFuture<T> newTask(){
          return new FuturreTask<T>(this){
          public boolean cancel(boolean mayInterruptIfRunning){
          try{
          //委托给SocketUsingTask的cancel 方法实现取消
          SocketUsingTask.this.cancel();
          }finally{
          //最后调用一次原本的 cancel 方法
          return super.cancel(mayInterruptIfRunning);
          }
          }
          }
          }
          }
          • 上面 SocketUsingTask 类的实现,通过 CancellableTask 实现了套接字的关闭和普通的cancel 动作,扩展了可用性,使它不仅能够在调用可中断方法时确保响应取消操作,还可以调用套接字方法。
    • 停止基于线程的服务

      • 关闭ExecutorService :

        • ExecutorService 提供了两种关闭方法: shutdown() 和 shutdownNow();

        • shutdown() 方法会让 ExecutorService 拒绝接受新的任务,并且等待正在执行的任务执行完成,再关闭服务。

        • shutdownNow() 方法会直接强行关闭 ExecutorService ,虽然比较快速,但是风险更大,因为任务可能只执行了一半。

        • 所以需要根据实际情况选择关闭的方式,通常是使用 shutdown() 方法关闭服务,然后加上 ExecutorService 的 awaitTermination(Timeout,Unit) 等待一段时间等服务关闭

          try{exec.shutdown() ; exec.awaitTermination(TIMEOUT,UNIT)}finally{...}

          这种形式的使用,也可以在catch到异常之后或者等待超时直接调用 exec.shutdownNow() 方法关闭服务。

      • 毒丸对象:Poison Pill

        • 关闭生产-消费者模式服务,可以使用毒丸对象,它代表当发生异常时,生产者将毒丸对象置入工作队列,消费者获取到毒丸对象时,停止后续的消费任务。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          //持有毒丸对象的服务
          public class IndexingService{
          private static final File POISON = new File();
          private final Consumer consumer = new Consumer();
          private final Producer producer = new Producer();
          }

          //生产者
          class Producer{
          void produce(){
          try{
          //异常代码
          }catch(Exception e){

          }finally{
          try{
          //生产者发生异常,置入毒丸对象告知消费者
          queue.put(POISON);
          break;
          }
          }
          }
          }

          //消费者
          class Consumer{
          void consume(){
          File file = queue.take();
          if(file == POISON){
          //发现毒丸对象,停止消费者任务
          break;
          }else{
          ...
          }
          }
          }
          • 这种方式只有在生产消费者数量都已知的情况下,才知道需要多少个毒丸对象才能停止消费者线程的消费。如果是多个生产者,就需要置入多个毒丸对象。当时当生产者数量庞大时,就显得不那么适用了。
      • shutdownNow 的局限性和获取已取消的任务

        • 调用 shutdownNow() 方法时,它会尝试取消正在执行的任务,并且返回取消的任务和尚未执行的任务。

        • 但是我们并不知道那些任务是已经开始尚未结束的任务,除非任务本身维护了中断状态。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          //定义一个Set集合用于存放所有被取消的任务
          //在封装的ExecutorService的 execute() 方法里判断服务是否已经关闭且当前任务是否已经被中断
          //满足这两点就作为被取消的任务加入到取消任务Set集合中
          public void execute(final Runnable runnable){
          exec.execute(new Runnable(){
          public void run(){
          try{
          runnable.run();
          }finally{
          //检查服务是否关闭且当前任务是否被取消
          if(isShutDown() && Thread.currentThread().isInterrupted()){
          set.add(runnable);
          }
          }
          }
          })
          }
          • 这也是一个委托实现,从而判断任务的执行状态,并且保存取消的任务
      • 处理非正常的线程终止

        • 导致线程死亡的原因是 RuntimeException ,这个异常表示出现了某种编程错误或者其他不可修复的错误,因此它们不会被捕获,而是直接输出栈追踪信息到控制台。
        • 未捕获异常的处理:在 Thread API中提供了UncauhtExceptionHandler ,它能检测出某个线程因为未捕获异常而终结的情况。可以通过实现 UncaughtExceptionHandler 接口,自定义捕获了到未捕获异常时的处理,例如将相关的线程信息和异常信息写入日志文件。
        • 要为线程设置自定义异常捕获,需要实现一个 ThreadFactory ,为线程池中的所有线程设置 UncaughtExceptionHandler.

小结:

  • 任务,线程,服务以及应用程序等模块的生命周期结束问题,可能会增加它们在设计和实现时的复杂性(通常是非正常的生命周期结束)。
  • JAVA没有提供某种抢占式的机制来取消操作或者终结线程,而是通过 中断 的协作机制,让线程决定在什么时候停止自己的操作。但这要依赖如何构建取消操作的协议,以及能否始终遵守这些协议。通过使用FutureTask 和 Executor 框架,可以帮助我们构建可取消的任务和服务。