Java并发编程

1.多线程基础

1.1 实现多线程的方法

  1. 实现Runnable接口

    1
    2
    3
    4
    5
    6
    public class Test_01 implements Runnable{
    @Override
    public void run() {
    System.out.println("实现Runnable接口实现多线程");
    }
    }

2.

1
2
3
4
5
6
public class Test_02 extends Thread {
@Override
public void run() {
System.out.println("继承Thread类实现多线程");
}
}
  1. 线程池创建线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
    poolNumber.getAndIncrement() +
    "-thread-";
    }

    public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
    namePrefix + threadNumber.getAndIncrement(),
    0);
    if (t.isDaemon())
    t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
    }
    }

    对于线程池而言,本质上是通过线程工厂创建线程的,默认采用DefaultThreadFactory,它会给线程池创建的线程设置一些默认值,如:线程的名字、是否守护线程,以及线程的优先级等。但无论怎么设置这些这些属性,最终还是通过new Thread()创建线程的,只不过这里的构造函数传入的参数要多一些,本质还是通过new Thread()实现的

  2. 实现有返回值的Callable创建线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class Test_03 implements Callable {
    @Override
    public Integer call() throws Exception {
    int i = new Random().nextInt();
    System.out.println(Thread.currentThread().getName() +" : "+ i);
    return i;
    }

    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 100; i++) {
    Future<Integer> future = executorService.submit(new Test_03());
    }
    executorService.shutdown();
    }
    }

    Runnable创建线程是无返回值的,而Callable和与之相关的Future、FutureTask,它们可以把线程执行的结果作为返回值返回。

……

实现线程只有一种方式

启动线程需要调用start()方法,而start方法最终会调用run()方法,分析run()方法

1
2
3
4
5
6
@Override
public void run() {
if (target != null) {
target.run();
}
}

target实际上就是一个Runnable,即使用Runnable接口实现线程时传给Thread类的对象。第二种,继承Thread方式,继承Thread之后,会把run()方法重写,最终还是会调用thread.start()方法启动线程,而start()方法最终也会调用这个已经被重写的run()方法来执行任务。创建线程本质就是构造一个Thread类,不同点在于实现线程运行内容的方式不同,可以通过实现Runnable接口,或继承Thread类重写run()方法。

1.2 实现Runnable接口比继承Thread类实现线程更好?

  1. Java不支持多继承,一旦继承了Thread类,就无法再继承其它类,限制了代码的可扩展性。
  2. Runnable里只有一个run()方法,定义了需要执行的内容,实现了Runnable与Thread类的解耦,Thread类负责线程启动和属性设置,权责分明。

1.3 如何正确停止线程?

对于Java而言,最正确的停止线程的方式是使用interrupt,但interrupt仅仅起到通知被停止线程的作用,而对于被停止的线程而言,它拥有完全的自主权,即可以选择立即停止,也可以一段时间后停止,也可以不停止。Java希望程序间可以相互通知、相互协作的管理线程,如果贸然停止线程可能会造成一些安全性问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。

1
2
3
while (!Thread.currentThread().isInterrupted() && more work to do) {
    do more work
}

一旦调用某个线程的interrupt后,该线程的中断标记位就会被设置成true,每个线程都有这样的标记位,当线程执行时应定期检查这个标记位。上面代码可以看到,while循环判断语句中,先通过Thread.currentThread().isInterrupt()判断是否被中断,随后检查是否还有工作要做。

1.4 sleep期间能否感受到中断?

如果sleep、wait等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,线程是可以感受到中断信号的,并会抛出InterruptedException,同时清除中短信号,将中断标记位设为false。

处理方式:

  1. 方法签名抛异常,run()强制try/catch

    1
    2
    3
    void subTask() throws InterruptedException {
        Thread.sleep(1000);
    }

    要求每一个方法的调用方有义务去处理异常。调用方要不使用try/catch并在catch中正确处理异常,要不将异常声明到方法签名中。如果每层逻辑都遵守规范,便可以将中断信号传递到顶层,最终让run()方法可以捕获到异常。而对于run()方法而言,它本身没有抛出checkedException的能力,只能通过try/catch来处理异常。层层传递异常保障了异常不会被遗漏,而对于run()方法,就可以根据不同的业务逻辑来进行相应的处理。

  2. 再次中断

    1
    2
    3
    4
    5
    6
    7
    8
    private void reInterrupt() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
    }

    在catch语句中再次中断线程。如果线程在休眠期间被中断,那么会自动清除中断信号。如果这时手动添加中断信号,中断信号依然可以被捕捉到。

1.5 为什么用volatile标记位的停止方法是错误的?

stop()会直接把线程停止,会导致出现数据完整性等问题。suspend()和resume()并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,容易导致死锁问题。

volatile修饰标记位适用的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class VolatileCanStop implements Runnable {
private volatile boolean canceled = false;

@Override
public void run() {
int num = 0;
while (!canceled && num < 1000000) {
if (num % 10 == 0) {
System.out.println(Thread.currentThread().getName() + ":" + num + "是10的倍数");
}
num++;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
VolatileCanStop canStop = new VolatileCanStop();
Thread thread = new Thread(canStop);
thread.start();
Thread.sleep(3000);
canStop.canceled = true;
}
}

启动线程,经过3s,把volatile修饰的标记位设置为true,那么下一次while循环中判断出canceled的值为true,就跳出while循环,线程停止。

volatile修饰标记位不适用的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class VolatileCanNotStop {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue storage = new ArrayBlockingQueue(8);
Producer producer = new Producer(storage);
Thread producerThread = new Thread(producer);
producerThread.start();
Thread.sleep(500);
Consumer consumer = new Consumer(storage);
while (consumer.needMoreNums()) {
System.out.println(consumer.storage.take() + "被消费了");
Thread.sleep(100);
}
System.out.println("消费者不需要更多数据了。");

// 一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况却停不下来
producer.canceled = true;
System.out.println(producer.canceled);
}
}

class Producer implements Runnable {
public volatile boolean canceled = false;
BlockingQueue storage;

public Producer(BlockingQueue storage) {
this.storage = storage;
}

@Override
public void run() {
int num = 0;
try {
while (!canceled && num <= 10000) {
if (num % 50 == 0) {
storage.put(num);
System.out.println(Thread.currentThread().getName() + ":" + num + "是50的倍数,被放到仓库中了");
}
num++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生产者结束运行");
}

}
}

class Consumer {
BlockingQueue storage;

public Consumer(BlockingQueue storage) {
this.storage = storage;
}

public boolean needMoreNums() {
if (Math.random() > 0.97) {
return false;
}
return true;
}
}

线程被长时间阻塞的情况,就无法及时感受中断:尽管已经把canceled的标记位设置为true,但生产者仍然没有被停止,是因为生产者在执行storage.put(num)时发生阻塞,在它被叫醒之前是没有办法进入下次循环判断canceled的值的,这种情况下volatile没有办法让生产者停下来的,如果用interrupt语句来中断,即使生产者处于阻塞状态,仍然能够感受到中断信号,并做相应处理。

1.6 线程是如何在6种状态之间转换的?

线程的6种状态

  1. New(新建)
  2. Runnable(可运行)
  3. Blocked(被阻塞)
  4. Waiting(等待)
  5. Timed Waiting(计时等待)
  6. Terminated(被终止)
  • New 新建
    New表示线程被创建但尚未启动的状态:new Thread()新建一个线程时,如果线程没有开始运行start()方法,所以也没有开始执行run()方法里面的代码,此时它的状态就是New。一旦线程调用了start(),就变成Runnable。

  • Runnable 可运行
    Java中的Runnable状态对应操作系统线程状态中的两种状态,分别是Running和Ready,即Java中处于Runnable状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配CPU资源。所以,如果一个正在运行的线程是Runnable状态,当它运行到任务的一半时,执行该线程的CPU被调度去做其他事情,导致该线程暂时不运行,它的状态仍为Runnable,因为它有可能随时被调度回来继续执行任务。

  • Blocked 被阻塞
    从Runnable状态进入Blocked状态只有一种可能,就是进入synchronized保护的代码块/方法时没有抢到monitor锁,Blocked仅仅针对synchronized monitor锁。

  • Waiting 等待
    线程进入Waiting

    1. 没有设置Timeout参数的Object.wait()方法
    2. 没有设置Timeout参数的Thread.join()方法
    3. LockSupport.park()方法

    Blocked与Waiting的区别是Blocked在等待其它线程释放monitor锁,而Waiting则是在等待某个条件,比如join的线程执行完毕,或者是notify()/notifyAll()。

  • Timed Waiting 限期等待
    Waiting和Time Waiting区别:有没有时间限制,Timed Waiting会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。

    线程进入Timed Waiting

    1. 设置了时间参数的Thread.sleep(long millis)方法
    2. 设置了时间参数的Object.wait(long timeout)方法
    3. 设置了时间参数的Thread.join(long millis)方法
    4. 设置了时间参数的LockSupport.parkNanos(long nanos)方法和LockSupport.parkUntil(long deadline)方法

    Blocked—>Runnable:线程获取monitor锁

    Waiting—>Runnable:执行了LockSupport.unpark(),或join的线程运行结束,或者被中断。

    Waiting—>Blocked:其它线程调用notify()或notifyAll(),因为唤醒Waiting线程的线程如果调用notify()或notifyAll(),必须首先持有该monitor锁,所以处于Waiting状态的线程被唤醒时拿不到该锁,就会进入Blocked状态,直到执行notify()/notifyAll()的唤醒线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,抢到就会从Blocked状态回到Runnable状态。

    TimedWaiting类似,但如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态。

  • Terminated
    线程进入Terminated

    1. run()方法执行完毕,线程正常退出。
    2. 出现一个没有捕获的异常,终止了run()方法,最终导致意外终止。

Tips

  1. 线程的状态是按照箭头方向走的,如线程从New不可以进入Blocked,它需要经历Runnable。
  2. 线程的生命周期不可逆:一旦进入Runnable就不能回到New状态;一旦被终止就不可能有任何状态的变化。所以一个线程只有一次New和Terminated状态,只有处于中间状态才可以相互转换。

1.7 为什么wait必须在synchronized保护的同步代码中使用?

1.8 为什么wait/notify/notifyAll方法被定义在Object类中,而sleep定义在Thread类中?

  1. Java中每个对象都有一把称之为monitor监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll也都是锁级别的操作,它们的锁属于对象,所以把它们定义在Object类,因为Object类是所有对象的父类。
  2. 如果把wait/notify/notifyAll方法定义在Thread类中,会带有很大的局限性,如一个线程可能持有多个锁。如何明确当前线程等待的是哪把锁呢?既然是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现。

1.9 wait/notify和sleep方法的异同

相同点

  1. 都可以让线程阻塞
  2. 都可以响应interrupt中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出InterruptedException

不同点

  1. wait方法必须在synchronized保护的代码中使用,而sleep方法并没这个要求。
  2. 在同步代码块中执行sleep方法,并不会释放monitor锁,但执行wait方法时会主动释放monitor锁。
  3. sleep方法必须定义一个时间,时间到期后会主动会恢复,而对于没有参数的wait方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不主动恢复。
  4. wait/notify是Object类的方法,而sleep是Thread类的方法。

2.线程安全

如果某个对象是线程安全的,即使用时就不需要考虑方法间的协调问题。

2.1 3种典型的线程安全问题

  1. 运行结果错误

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class WrongResult {
       volatile static int i;

       public static void main(String[] args) throws InterruptedException {
           Runnable r = new Runnable() {
               @Override
               public void run() {
                   for (int j = 0; j < 10000; j++) {
                       i++;
                   }
               }
           };

           Thread thread1 = new Thread(r);
           thread1.start();
           Thread thread2 = new Thread(r);
           thread2.start();
           thread1.join();
           thread2.join();
           System.out.println(i);
        }
    }

    i++并不是一个原子操作

  2. 发布或初始化导致线程安全问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class WrongInit {
    private Map<Integer, String> students;

    public WrongInit() {
    new Thread(new Runnable() {
    @Override
    public void run() {
    students = new HashMap<>();
    students.put(1, "王小美");
    students.put(2, "钱二宝");
    students.put(3, "周三");
    students.put(4, "赵四");
    }
    }).start();
    }

    public Map<Integer, String> getStudents() {
    return students;
    }

    public static void main(String[] args) {
    WrongInit wrongInit = new WrongInit();
    System.out.println(wrongInit.getStudents().get(1));
    }
    }

    students 这个成员变量是在构造函数中新建的线程中进行的初始化和赋值操作,而线程的启动需要一定的时间,但是我们的 main 函数并没有进行等待就直接获取数据,导致 getStudents 获取的结果为 null,这就是在错误的时间或地点发布或初始化造成的线程安全问题

  3. 活跃性问题

    分别为死锁、活锁和饥饿

    • 死锁:两个线程之间相互等待对方资源,但同时又互不相让,都想自己先执行,如代码所示。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      public class MayDeadLock {
      Object lock1 = new Object();
      Object lock2 = new Object();
      public void thread1() throws InterruptedException {
      synchronized (lock1){
      Thread.sleep(500);
      synchronized (lock2){
      System.out.println("线程1成功拿到两把锁");
      }
      }
      }
      public void thread2() throws InterruptedException {
      synchronized (lock2){
      Thread.sleep(500);
      synchronized (lock1){
      System.out.println("线程2成功拿到两把锁");
      }
      }
      }

      public static void main(String[] args) {
      MayDeadLock deadLock = new MayDeadLock();

      new Thread(new Runnable() {
      @SneakyThrows
      @Override
      public void run() {
      deadLock.thread1();
      }
      }).start();

      new Thread(new Runnable() {
      @SneakyThrows
      @Override
      public void run() {
      deadLock.thread2();
      }
      }).start();
      }
      }
    • 活锁:与死锁类似,不过活锁是活的,因为正在运行的线程并没有阻塞,它始终在运行,缺一直得不到结果。假设有一个消息队列里放着需要被处理的消息,而某个消息由于自身的错误无法被正确处理,同时队列的重试机制会把它放在队列头进行优先重试处理。

    • 饥饿:线程需要某些资源始终得不到,尤其是CPU资源,就会导致线程一直不能运行。

      1. 在Java中有1-10的线程优先级,1最低,10最高。如果某个线程的优先级为1,该线程就有可能始终分配不到CPU资源,而导致长时间无法运行。
      2. 或者是某个线程始终持有某个文件的锁,其他线程想要修改文件必须先获取锁,这时想要修改文件的线程就会陷入饥饿。

2.2 需要额外注意线程安全的场景

  1. 访问共享变量和资源

    如访问共享对象的属性、访问static静态变量、访问共享的缓存等。

  2. 依赖时序的操作

    1
    2
    3
    if (map.containsKey(key)) {
        map.remove(obj)
    }
  3. 不同数据之间存在绑定关系

    不同的数据之间是成组出现的,存在着相互对应或绑定的关系,最典型的就是IP和端口号。

  4. 对方没有声明自己是线程安全的

2.3 为什么多线程会带来性能问题

单线程是独立工作的,不需要与其他线程进行交互,但多线程之间则需要调度以及协作,调度协作就会带来性能开销从而产生性能问题。

  • 调度开销

    1. 上下文切换:线程数往往大于CPU核心数,操作系统会按照一定的调度算法,给每个线程分配时间片。而在进行调度时就会引起上下文切换,上下文切换会挂起当前正在执行的线程并保存当前的状态,然后寻找下一处即将恢复执行的代码,唤醒下一个线程。

    2. 缓存失效:进行了线程调度,切换到其他线程,CPU就会去执行不同的代码,原有的缓存就很有可能失效了,需要重新缓存新的数据。

      给被调度到的线程设置最小执行时间,即只有执行完这段时间后,才可能进行下一次的调度,由此减少上下文切换的次数。

  • 协作开销

    为了避免共享数据错乱、保证线程安全,就有可能禁止编译器和CPU对其进行重排序等优化,也可能出于同步的目的,反复把线程工作内存的数据flush到主内存,然后再从主内存refresh到其他线程的工作内存中

2.4 使用线程池的好处

  1. 线程池可以解决线程生命周期的系统开销问题,线程池里的线程可以复用,消除了线程创建带来的延迟,从而提高响应速度。
  2. 线程池可以统筹内存和CPU的使用,避免资源的使用不当。
  3. 线程池可以统一管理资源。

2.5 线程池各参数的含义

线程池的特点:

  1. 线程池希望保持较少的线程数,只有在负载变的很大时才增加线程。
  2. 线程池只有在任务队列满时才会创建多于corePoolSize的线程,如果使用的是无界队列(如LinkedBlockingQueue),线程数不会超过corePoolSize。
  3. 设置corePoolSize和maxPoolSize为相同的值,可以创建固定大小的线程池。

2.6 线程池有哪几种拒绝策略?

  • AbortPolicy:拒绝任务时直接抛出一个类型为RejectedExecutionException的RuntimeException,可以感知到任务被拒绝了,可以根据业务逻辑选择重试或放弃提交等。
  • DiscardPolicy:当新任务被提交后直接被丢弃掉,不会有任何通知。
  • DiscardOldestOlicy:丢弃任务队列的头节点,通常是存活时间最长的任务,也不会有任何通知。
  • CallerRunsPolicy:把任务交给提交任务的线程执行,即谁提交任务,谁就负责执行任务。
    1. 提交的任务不会被丢弃
    2. 提交任务的线程负责执行任务,提交任务的线程被占用,不会再提交新的任务,线程池中的线程也可以利用这段时间执行掉一部分任务,相当于是给了线程池一定的缓冲期。

2.7 有哪6种常见的线程池?什么是Java8的ForkJoinPool?

  • FixedThreadPool

    核心线程数和最大线程数是一样的,可以看作是固定线程数的线程池,没有可用的线程的时候,任务会放在队列中等待,任务的长度无限制(LinkedBlockingQueue)

  • CachedThreadPool

    线程数几乎可以无限增加(Integer.MAX_VALUE,2^31-1),该线程池的线程数量不固定,不够使用时自动增加,闲置时自动回收。队列为SynchronousQueue,队列容量为0,实际不存储任务,只对任务进行中转和传递。

  • ScheduledThreadPool

    支持定时或周期的执行任务。

    1
    2
    3
    4
    5
    6
    7
    ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
    //延迟指定时间后执行一次任务,10秒执行一次
    service.schedule(new Task(), 10, TimeUnit.SECONDS);
    //以固定的频率执行任务
    service.scheduleAtFixedRate(new Task(), 1010, TimeUnit.SECONDS);
    //与第二种类似,不过scheduledAtFixedRate以开始时间为起点,时间到就开始第二次,而scheduledWithFixedDelay以任务结束时间为下一次循环的时间起点开始计算
    service.scheduleWithFixedDelay(new Task(), 1010, TimeUnit.SECONDS);
  • SingleThreadExecutor

    原理与FixedThreadPool一样,线程只有一个,如果线程在执行过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。适合用于任务需要按被提交的顺序依次执行的场景。

  • SingleThreadScheduledExecutor

    于ScheduledThreadPool类似,如源码所示:只是将ScheduledThreadPool的核心线程数设置为1

    1
    new ScheduledThreadPoolExecutor(1)

  • ForkJoinPool

2.8 线程池常用的阻塞队列

  • LinkedBlockingQueue

    对于FixedThreadPool和SingleThreadExector,它们使用的是容量为Integer.MAX_VALUE的LinkedBlockingQueue,可以任务是无界队列。

  • SynchronousQueue

    对于CachedThreadPool,最大线程数为Integer.MAX_VALUE,所以不需要任务队列来存储任务,一旦有任务提交就直接转发给线程或创建新线程来执行。

  • DelayedWorkQueue

    对于ScheduledThreadPool和SingleThreadScheduledExecutor,DelayedWorkQueue内部元素并不是按照放入的时间排序,而是按照延迟的时间长短对任务进行排序,内部采用的是”堆”的数据结构。

2.9 为什么不应该自动创建线程池?

  • FixedThreadPool、SingleThreadPool

    使用的队列是没有上限的LinkedBlockingQueue,如果处理任务过慢,队列中堆积的任务会越来越多,占用大量内存,导致OOM。

  • CachedThreadPool

    不限制线程的数量,任务特别多时,有可能会创建非常多的线程,最终导致超过了操作系统的上限而无法创建线程,或导致内存不足。

  • ScheduledThreadPool、SingleThreadScheduledExecutor

    DelayedWorkQueue也是一个无界队列。

2.10 合适的线程数是多少?

  • CPU密集型任务

    如加密、解密、压缩、计算等大量耗费CPU资源的任务,线程数为CPU核心数的1-2倍。

  • 耗时IO型任务

    如数据库、文件的读写、网络通信等并不消耗CPU资源的任务,线程数=CPU核心数*(1+平均等待时间/平均工工作时间)

线程的平均工作时间所占比例越高,就需要越少的线程。线程的平均等待时间所占比例越高,就需要越多的线程。

2.11 如何正确关闭线程?

  • shutdown()

    安全的关闭的一个线程池,调用shutdown()之后,如果还有新任务被提交,线程池会根据拒绝策略直接拒绝后续提交的任务,执行完正在执行的任务和队列中等待的任务后关闭。

  • isShutdown()

    判断线程是否已经开始了关闭工作,即是否执行了shutdown()或shutdownNow()

  • isTerminated()

    检测线程池是否真正”终结”了,即线程池已关闭,同时线程池中的所有任务都执行完毕了。

  • awaitTermination()

    判断线程池状态,如给awaitTermination方法传入的参数为10秒,那么它会陷入10秒等待,直到

    1. 等待期间(包括进入等待之前),线程池已关闭并所有任务都执行完毕,相当于线程池”终结”了,方法便返回true。
    2. 等待超时时间到后,线程池始终未”终结”,返回false。
    3. 等待期间线程被中断,方法抛出InterruptedException异常。

    即调用awaitTermination方法后当前线程池会尝试等待一定指定的时间,如果在等待时间内,线程池已关闭并任务都执行完毕,方法返回true,否则返回false。

  • shutdownNow()

    立刻关闭,执行shutdownNow()方法之后,首先会给线程池中的线程发送interrupt中断信号,尝试中断这些任务的执行,然后会将等待的所有任务转移到一个List中并返回。

3.各种各样的”锁”

3.1 你知道哪几种锁?分别有什么特点?

  1. 偏向锁/轻量级锁/重量级锁

    特指synchronized锁的状态,通过在对象头中的mark word来表明锁的状态。

    • 偏向锁

      如果,这把锁一直不存在竞争,就没必要上锁,只需打个标记就行。对象被初始化,还没有线程来获取它的锁时,那么它就是可偏向的,当有第一个线程来访问它并尝试获取锁的时候,它就将这个线程记录下来,以后如果尝试获取锁的线程正是偏向锁的拥有者,就可以直接获取锁,开销很小,性能最好。

    • 轻量级锁

      synchronized中的代码是被多个线程交替执行的,并不存在实际的竞争、或只有短时间的竞争,用CAS就可以解决。轻量级锁是指当锁原来是偏向锁时,被另一个线程访问,说明存在竞争,那么偏向锁就会升级为轻量级锁,线程会通过自旋的形式获取锁,而不会陷入阻塞。

    • 重量级锁

      重量级锁是互斥锁,它是利用操作系统的同步机制实现的,开销相对较大。当多个线程直接实际竞争,且锁竞争时间长的时候,锁就会膨胀为重量级锁。重量级锁会让其它申请缺拿不到锁的线程进入到阻塞状态。

    偏向锁性能最好,可以避免执行CAS操作。而轻量级锁利用自旋和CAS避免了重量级锁带来的线程阻塞和唤醒,性能中等。重量级锁则会把获取不到锁的线程阻塞,性能最差。

  2. 可重入锁/不可重入锁

    可重入锁指的是线程当前已经持有这把锁了,能在不释放这个锁的情况下,再次获取这把锁。不可重入锁指的是虽然当前持有了这把锁,但如果想再次获取此锁,也必须先要释放锁后才能再次尝试获取。

  3. 共享锁/独占锁

    共享锁指同一把锁可以被多个线程同时获得,而独占锁指这个锁只能同时被一个线程获得。如读写锁中的读锁是共享锁,而写锁是独占锁。

  4. 公平锁/非公平锁

    • 公平锁

      如果线程现在拿不到这把锁,那么线程都会进入等待,开始排队,在等待队列等待时间长的线程会优先拿到这把锁,先来先得。

    • 非公平锁

      在一定情况下,忽略掉已经在排队的线程,发生插队现象。

  5. 悲观锁/乐观锁

    • 悲观锁

      在获取资源之前,必须先拿到锁,以便达到”独占”的状态。

    • 乐观锁

      并不要求在获取资源前拿到锁,也不会锁住资源,利用CAS理念,在不独占资源的情况下,完成对资源的修改。

  6. 自旋锁/非自旋锁

    • 自旋锁

      如果线程现在拿不到锁,并不直接陷入阻塞或者释放CPU资源,而是开始利用循环,不停的尝试获取锁。

    • 非自旋锁

      拿不到锁就直接放弃,或者进行其它的处理逻辑,如阻塞、排队等。

  7. 可中断锁/不可中断锁

    synchronized关键字修饰的锁代表的是不可中断锁,一旦线程申请了锁,就没有回头路,只能等拿到锁以后才能进行其它的逻辑处理。

    ReentrantLock是一种典型的可中断锁,如使用lockInterruptibly方法在获取锁的过程中,突然不想获取了,可以在中断之后去做其它的事。

3.2 悲观锁与乐观锁

  • 悲观锁

    为了确保结果的正确性,会在每次获取并修改数据时,都把数据锁住,让其他线程无法访问。

    线程A拿到了锁,并且正在操作同步资源,那么此时线程B就必须进行等待。

    当线程A执行完毕后,CPU才会唤醒正在等待这把锁的线程B再次尝试获取锁

    如果线程B获取到了锁,才可以对同步资源进行自己的操作。

  • 乐观锁

    认为自己在操作资源的时候不会有其他线程干扰,所以并不会锁住被操作对象。为了确保数据正确性,在更新之前,会去对比在修改数据期间,数据有没有被其他线程修改过。

    例子:

    • 悲观锁:synchronized关键字和Lock接口

      以Lock接口为例,如Lock的实现类ReentrantLock,类中的lock()等方法就是执行加锁,而unlock()方法就是执行解锁()。处理资源之前必须要先加锁并拿到锁,等到处理完之后再解开锁。

    • 乐观锁:原子类

      如AtomicInteger在更新数据时,多个线程可以同时操作同一个原子变量。

    两种锁各自的使用场景:

    • 悲观锁适合于并发写入多、临界区代码复杂、竞争激烈等场景,此时悲观锁可以避免大量的无用的反复尝试等消耗。
    • 乐观锁适用于读取多,修改少的场景,也适合虽然读写都很多,但是并发不激烈的场景。

3.3 synchronized背后的monitor锁

获取和释放monitor锁的时机:线程在进入synchronized保护的代码块之前,会自动获取锁;并且无论是正常退出,还是抛出异常退出,在退出的时候都会自动释放锁。

查看反汇编命令:javac SynTest.java javap -verbose SynTest.class

  • 同步代码块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
               ......
    3: monitorenter
             4: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
             7: ldc           #3                      // String lagou
             9: invokevirtual #4               // Method java/io/PrintStream.println:(Ljava/lang/String;)V
            12: aload_1
            13: monitorexit
            14: goto          22
            17: astore_2
            18: aload_1
            19: monitorexit
    ......

    monitorenter可以理解为加锁,monitorexit理解为释放锁,每个对象维护着一个记录着被锁次数的计数器。未锁定的对象的该计数器未0。

    • monitorenter

      1. 如果该monitor的计数为0,则线程获得该monitor并将其计数设置为1,该线程就是这个monitor的所有者。
      2. 如果线程已经拥有了这个monitor,则它将重新进入,并且累加计数。
      3. 如果其他线程已经拥有了这个monitor,那么这个线程就会被阻塞,直到这个monitor的计数器变为0,代表这个monitor已经被释放了,于是当前这个线程就会再次尝试获取这个monitor。
    • monitorexit

      作用:将monitor的计数器减1,直到减为0为止。代表这个monitor已经被释放了,已经没有任何线程拥有它了,也就代表着解锁。其他正在等待这个monitor的线程,此时可以再次尝试获取这个monitor的所有权。

  • 同步方法

    1
    2
    3
    4
    5
    6
        public synchronized void synMethod();
        descriptor: ()V
        flags: ACC_PUBLIC, ACC_SYNCHRONIZED
        Code:
          stack=0, locals=1, args_size=1
    ......

    被synchronized修饰的方法会有一个ACC_SYNCHRONIZED标志,当某个线程要访问某个方法时,会首先检查方法是否有ACC_SYNCHRONIZED标志,如果有则需要先获得monitor锁,方法执行之后再释放monitor锁。

3.4 synchronized与Lock

  • 相同点

    1. synchronized和Lock都是用来保护资源线程安全的
    2. 都可以保证可见性
    3. synchronized和ReentrantLock(Lock的一个实现类)都拥有可重入的特点
  • 不同点

    1. 用法区别

      synchronized关键字可以加在方法上,不需要指定锁对象(此时的锁对象为this);也可以修饰同步代码块并且自定义monitor对象。而Lock锁对象必须显示的开始加锁lock()和解锁unlock(),并且一般会在finally块中确保用unlock()来解锁,以防止发生死锁。

    2. 加解锁顺序不同

      对于Lock而言如果有多把Lock锁,Lock可以不完全按照加锁的反序解锁

      1
      2
      3
      4
      5
      lock1.lock();
      lock2.lock();
      ...
      lock1.unlock();
      lock2.unlock();

      synchronized解锁的顺序和加锁的顺序必须完全相反,obj2先解锁,obj1后解锁。

      1
      2
      3
      4
      5
      synchronized(obj1){
          synchronized(obj2){
              ...
          }
      }
    3. synchronized锁不够灵活

      一旦synchronized锁已经被某个线程获得了,此时其他线程如果还想获得,那么它只能被阻塞,直到持有锁的线程运行完毕或发生异常从而释放这个锁。Lock类在等待锁的过程中,如果使用的时lockInterruptibly方法,如果等待时间太长,可以中断退出,也可以使用tryLock()等方法尝试获取锁,如果获取不到可以执行其他逻辑。

    4. synchronized锁只能同时被一个线程拥有,但Lock锁没有这个限制。

      如在读写锁中的读锁,是可以被多个线程同时拥有的,但synchronized不行。

    5. 原理区别

      synchronized是内置锁,由JVM实现获取锁和解锁,还分为偏向锁、轻量级锁、重量级锁。Lock根据实现不同,原理也不同,如ReentrantLock内部是通过AQS来获取和释放锁的。

    6. 是否可以设置公平/非公平

      ReentrantLock可以根据需求来设置公平或非公平,synchronized则不能设置。

    如何选择:

    1. 最好既不使用Lock也不使用synchronized,尽量使用java.util.concurrent包中的机制。
    2. 尽量使用synchronized,避免忘记在finally里忘记unlock。
    3. 需要Lock的特殊功能时,如尝试获取锁、可中断、超时功能等,才使用Lock。

3.5 Lock的常用方法

  • lock()

    在线程获取锁时如果锁已被其他线程获取

    1
    2
    3
    4
    5
    6
    7
    8
    Lock lock = ...;
    lock.lock();
    try{
        //获取到了被本锁保护的资源,处理任务
        //捕获异常
    }finally{
        lock.unlock();   //释放锁
    }
  • tryLock()

    用来尝试获取锁,如果当前锁没有被其他线程占用,则获取成功,返回true,否则返回false,代表获取锁失败,可以根据是否能获取到锁来决定后续程序行为。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Lock lock = ...;
    if(lock.tryLock()) {
         try{
             //处理任务
         }finally{
             lock.unlock();   //释放锁
         } 
    }else {
        //如果不能获取锁,则做其他事情
    }
  • tryLock(long time, TimeUnit unit)

    和tryLock()类似,tryLock(long time, TimeUnit unit)会有一个超时时间,在拿不到锁时会等待一定的时间,时间期限结束后,还获取不到锁,就会返回false,如果在最开始或等待期间内获取到锁就返回true。

  • lockInterruptibly()

    除非当前线程在获取锁期间被中断,否则会一直尝试获取直到获取到为止。相当于超时时间无限长的tryLock(long time, TimeUnit unit)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
          public void lockInterruptibly() {
            try {
                lock.lockInterruptibly();
                try {
                    System.out.println("操作资源");
                } finally {
                    lock.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
  • unlock()

    用于解锁,对ReentrantLock而言,执行unlock()的时候,内部会把锁的”被持有计数器”减1,直到减到0就代表当前这把锁已经完全释放了,如果减1后计数器不为0,说明这把锁之前被”重入”了,那么锁并没有真正释放,仅仅是减少了持有的次数。

3.6 公平锁与非公平锁

公平锁:按照线程请求顺序来分配锁

非公平锁:不完全按照请求的顺序,在一定情况下,可以允许插队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class FairAndUnfair {

static class PrintQueue {
private final Lock queueLock = new ReentrantLock(false);//false:非公平锁 true:公平锁 默认false

public void printJob(Object document) {
queueLock.lock();
try {
Long duration = (long)(Math.random() * 10000);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",
Thread.currentThread().getName(), (duration / 1000));
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
queueLock.unlock();
}

queueLock.lock();
try {
Long duration = (long)(Math.random() * 10000);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n",
Thread.currentThread().getName(), (duration / 1000));
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
queueLock.unlock();
}
}
}

static class Job implements Runnable {
private PrintQueue printQueue;

public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}

@Override
public void run() {
System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.println();
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}

public static void main(String[] args) {
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}

for (int i = 0; i < 10; i++) {
thread[i].start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

非公平情况下,存在抢锁”插队”现象,如Thread 0 在释放锁后又能优先获取到锁,虽然此时在等待队列中已经有Thread 1~Thread 9在排队了。

各自的优缺点

源码分析

ReentrantLock中包含一个Sync类,这个类继承自AQS(AbstractQueuedSynchronizer)

1
2
3
4
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;

Sync有公平锁FairSync和非公平锁NonfairSync两个子类

1
2
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}

公平锁与非公平获取锁的lock()方法唯一区别就在于公平锁在获取锁时多了一个限制条件:hasQueuedPredecessors()为false,这个方法就是在判断在等待队列中是否已经有线程在排队了。公平锁,一旦有线程在排队,当前线程就不再尝试获取锁了;对于非公平锁,无论是否有线程在排队,都会尝试获取一下锁,获取不到的话,再去排队。

tryLock(),一旦有线程释放了锁,那么正在tryLock的线程就能获取到锁,即使设置的是公平锁模式,即使在它之前已经有其他正在等待队列中等待的线程,即tryLock可以插队。调用的是nonfairTryAcquire(),表明是不公平的,和锁本身是否公平锁无关。

1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

3.7 读写锁

保证多个线程同时读的效率,同时可以保证有写入操作时的线程安全。

读写锁的获取规则

  1. 如果一个线程已经占用了读锁,则此时其他线程如果要申请读锁,可以申请成功。
  2. 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁,因为读写不能同时操作
  3. 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或读锁,都必须等待之前的线程释放锁,因为读写、写写不能同时操作

要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。即读读共享,其他都互斥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ReadWriteLockDemo {
private static final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false);
private static final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

private static void read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放读锁");
readLock.unlock();
}
}

private static void write() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放写锁");
writeLock.unlock();
}
}

public static void main(String[] args) {
new Thread(() -> read()).start();
new Thread(() -> read()).start();
new Thread(() -> write()).start();
new Thread(() -> write()).start();
}
}

运行结果:

1
2
3
4
5
6
7
8
Thread-0得到读锁,正在读取
Thread-1得到读锁,正在读取
Thread-0释放读锁
Thread-1释放读锁
Thread-2得到写锁,正在写入
Thread-2释放写锁
Thread-3得到写锁,正在写入
Thread-3释放写锁

读写锁适用于读多写少的情况

3.8 读锁应该插队么?什么是读写锁的升降级?

  • 公平锁

    只要等待队列中有线程在等待,即hasQueueedPredecessors()返回true的时候,那么write和reader都会block,即不允许插队。

  • 非公平锁

    1
    2
    3
    4
    5
    6
    7
    final boolean writerShouldBlock() {
        return false// writers can always barge
    }

    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }

    写锁:随时可以插队

    读锁:

    1. 允许插队

      有可能导致需要拿到写锁的线程会陷入”饥饿”状态,它将在长时间内得不到执行。

    2. 不允许插队

      即使是非公平锁,只要等待队列的头结点是尝试获取写锁的线程,那么读锁依然不能插队,目的是避免”饥饿”。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class ReadLockJumpQueue {
    private static final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    private static void read() {
    readLock.lock();
    try {
    System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println(Thread.currentThread().getName() + "释放读锁");
    readLock.unlock();
    }
    }

    private static void write() {
    writeLock.lock();
    try {
    System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println(Thread.currentThread().getName() + "释放写锁");
    writeLock.unlock();
    }
    }

    public static void main(String[] args) throws InterruptedException {
    new Thread(() -> read(), "Thread-2").start();
    new Thread(() -> read(), "Thread-4").start();
    new Thread(() -> write(), "Thread-3").start();
    new Thread(() -> read(), "Thread-5").start();
    }
    }

    运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    Thread-2得到读锁,正在读取
    Thread-4得到读锁,正在读取
    Thread-2释放读锁
    Thread-4释放读锁
    Thread-3得到写锁,正在写入
    Thread-3释放写锁
    Thread-5得到读锁,正在读取
    Thread-5释放读锁

锁的升降级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            //在获取写锁之前,必须首先释放读锁。
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                //这里需要再次判断数据的有效性,因为在我们释放读锁和获取写锁的空隙之内,可能有其他线程修改了数据。
                if (!cacheValid) {
                    data = new Object();
                    cacheValid = true;
                }
                //在不释放写锁的情况下,直接获取读锁,这就是读写锁的降级。
                rwl.readLock().lock();
            } finally {
                //释放了写锁,但是依然持有读锁
                rwl.writeLock().unlock();
            }
        }

        try {
            System.out.println(data);
        } finally {
            //释放读锁
            rwl.readLock().unlock();
        }
    }
}

只有一处修改数据的代码,后面都是读取,如果一直使用写锁的话,就不能让多个线程同时来读取了,这个时候利用锁的降级,可以提高整体性能。

支持锁的降级,不支持升级

ReentrantReadWriteLock不支持读锁升级到写锁。

不可能有读锁和写锁同时持有的情况,升级写锁的过程中,需要等到所有的读锁都释放才能升级。另一种特殊情况,线程A、B都想升级到写锁,对于A而言,它需要等待其他线程(包括B)释放读锁,而线程B也是如此,则会发生死锁。

3.9 自旋锁

非自旋锁和自旋锁最大的区别,如果它遇到拿不到锁的情况,它会把线程阻塞,直到被唤醒;而自旋锁会不停地尝试。

自旋锁的好处

自旋锁用循环去不停地尝试获取锁,让线程始终处于Runnable状态,节省了线程切换带来的开销。

自己实现可重入的自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ReentrantSpinLock {
private AtomicReference<Thread> owner = new AtomicReference<>();
// 重入次数
private int count = 0;

public void lock() {
Thread currentThread = Thread.currentThread();
if (currentThread == owner.get()) {
++count;
return;
}
// 自旋获取锁
while (!owner.compareAndSet(null, currentThread)) {
System.out.println("自旋了!");
}
}

public void unlock() {
Thread currentThread = Thread.currentThread();
// 只有持有锁的线程才能解锁
if (currentThread == owner.get()) {
if (count > 0) {
--count;
} else {
// 此处无需CAS操作,因为没有竞争,因为只有线程持有者才能解锁
owner.set(null);
}
}
}

public static void main(String[] args) {
ReentrantSpinLock spinLock = new ReentrantSpinLock();
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");
spinLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放了了自旋锁");
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
Thread-1开始尝试获取自旋锁
Thread-0开始尝试获取自旋锁
Thread-1获取到了自旋锁
自旋了!
自旋了!
自旋了!
......
自旋了!
自旋了!
Thread-0获取到了自旋锁
Thread-1释放了了自旋锁
Thread-0释放了了自旋锁

缺点

虽然避免了线程切换的开销,但带来了新的开销,因为它需要不停地去尝试获取锁。

适用场景

自旋锁适用于并发度不是特别高,以及临界区比较短小的情况,这样可以避免线程切换来提高效率。可是如果临界区很大,线程一旦拿到锁,很久才会释放的话,那就不适合自旋锁,因为自旋会一直占用CPU却无法拿到锁,白白消耗资源。

3.10 JVM对锁的优化

  • 自适应的自旋锁

    自旋的缺点在于如果自旋时间过长,那么性能开销很大,浪费CPU资源。自适应意味着自旋的时间不再固定,而是根据最近自旋尝试的成功率、失败率,以及当前锁的拥有者的状态等多种因素来共同决定。如:最近尝试自旋获取某一把锁成功了,那么下次可能还会继续使用自旋,并且允许自旋更长时间;但如果最近自旋获取某一把锁失败了,那么可能会省略掉自旋的过程,以便减少无用的自旋,提高效率。

  • 锁消除

    1
    2
    3
    4
    5
    6
    @Override
    public synchronized StringBuffer append(Object obj) {
        toStringCache = null;
        super.append(String.valueOf(obj));
        return this;
    }

    这个方法是被synchronized修饰的同步方法,因为它可能会被多个线程同时使用。但在大多数情况下,它只会在一个线程内使用,如果编译器能确定这个StringBuffer只会在一个线程内使用,那么编译器便会做出优化,把synchronized消除,省去加锁和解锁,以便增加整体的效率。

  • 锁粗化

    如果释放了锁,紧接着什么都没做,又重新获取锁,如:

    1
    2
    3
    4
    5
    6
    7
    8
    public void lockCoarsening() {
        synchronized (this) {
            //do something
        }
        synchronized (this) {
            //do something
        }
    }

    可以把同步区域扩大,即最开始加一次锁,并且在最后直接解锁,减少性能开销。

    如果在循环中也这样做,会导致其他线程长时间无法获得锁。锁粗化的功能默认打开,用-XX:-EliminateLocks可以关闭该功能。

    1
    2
    3
    4
    5
    for (int i = 0; i < 1000; i++) {
        synchronized (this) {
            //do something
        }
    }
  • 偏向锁/轻量级锁/重量级锁

    这三种锁是特指synchronized锁的状态的,通过对象头中的mark word来表明锁的状态。

    • 偏向锁

      这把锁自始至终不存在竞争,那么没必要上锁,只要打个标记就行了。一个对象被初始化后,如果还没有任何线程来获取它的锁,它就是可偏向的,当第一个线程来访问它尝试获取锁的时候,它就记录下来这个线程,如果后面尝试获取锁的线程正是这个偏向锁的拥有者,就可以直接获取锁,开销小。

    • 轻量级锁

      synchronized中的代码块是被多个线程交替执行的,也就是不存在实际的竞争,或者只有短时间的竞争,用CAS就可以解决。轻量级锁指当锁原来是偏向锁的时候,被另一线程所访问,说明存在竞争,那么偏向锁升级为轻量级锁,线程会通过自旋的方式尝试获取锁,不会阻塞。

    • 重量级锁

      当多个线程直接有实际竞争,并且锁竞争时间比较长的时候,此时偏向锁和轻量级锁都不能满足需求,锁就会膨胀为重量级锁,会让其他申请却拿不到锁的线程进入阻塞状态。

3.10 HashMap为什么是线程不安全的?

  • 扩容期间取出的值不准确

    HashMap扩容期间,会新建一个新的空数组,并用旧的项填充到这个新的数组中。如果这个填充的过程中,如果有线程取值,很可能会取到null值。

  • 同时put碰撞导致数据丢失

    如果有多个线程同时put,而且恰好两个put的key是一样的,它们发生了碰撞,也就是根据hash值计算出来的bucket位置一样,并且两个线程又同时判断该位置是空的,可以写入,所以这两个线程的两个不同的value便会添加到数组的同一位置,就丢失了一个数据。

  • 可见性问题

    线程1给某个key放入了一个新值,那么线程2在获取对应的key的值的时候,它的可见性是无法保证的。

  • 死循环造成CPU100%

    在扩容的时候,也就是内部新建新的HashMap的时候,扩容的逻辑会反转散列桶中的节点顺序,当多个线程同时进行扩容的时候,如果两个线程同时反转的话,便可能形成一个循环,并且这种循环是链表的循环,相当于A节点指向B节点,B节点又指回A节点,在下一次想要获取该key所对应的value的时候,便会在遍历链表的时候发生永远无法遍历结束的情况。

3.11 为什么Map桶中超过8个才转为红黑树?

最开始的Map是空的,因为里面没有任何元素,往里放元素时会计算hash值,计算之后,第1个个value会占用一个桶(也称为槽点)位置,后续经过计算键值key计算hash值得到插入的数组索引i相同,那么会使用链表的形式往后延长,俗称拉链法。当链表长度大于或等于阈值(默认为8),且数组长度大于或等于MIN_TREEIFY_CAPACITY(默认64)时,就会把链表转为红黑树。当红黑树的节点小于或等于6个以后,又会恢复为链表形态。

  1. 链表查找时间复杂度:O(n) 红黑树查找时间复杂度:O(log(n))

  2. 单个TreeNode需要占用的空间大约是Node的两倍

    时间与空间的平衡

如果hash计算结果离散的好,各个值都均匀分配,很少出现链表很长的情况。在理想情况下,链表长度符合泊松分布,各个长度的命中概率依次递减,当长度为8时,概率仅为0.00000006,小于千万分之一概率,通常情况下并不会发生链表向红黑树的转换。

链表长度为8转为红黑树的设计,为了防止自定义实现了不好的hash算法导致链表长度过长,从而导致查询效率低。

3.12 Hashtable与ConcurrentHashMap的区别

  1. 出现版本不同

    Hashtable在JDK1.0就存在了,并在JDK1.2实现了Map接口;ConcurrentHashMap在JDK1.5中才出现。

  2. 实现线程安全的方式不同

    Hashtable通过synchronized关键字实现线程安全;ConcurrentHashMap利用了**CAS+synchronized+Node(volatile)**。

  3. 性能不同

    随着线程数量的增加,Hashtable性能会急剧下降,每一次修改会锁住整个对象,而其他线程在此期间不能操作,还会带来额外的上下文切换;ConcurrentHashMap只会对一部分上锁而不是全部都上锁。

  4. 迭代时的修改不同

    Hashtable(包括HashMap)不允许在迭代期间修改内容,否则会抛出ConcurrentModificationException异常,ConcurrentHashMap不会。

3.13 CopyOnWriteArrayList

ArrayList LinkedList

线程安全:Vector Collections.synchronized()

Vector内部使用synchronized来保证线程安全,并且锁的粒度比较大,都是方法级别的锁,在并发高的时候,很容易发生竞争,并发效率相对较低。

适用场景:

  • 读操作可以尽可能的快,而写即使慢一些也没关系
  • 读多写少

读写规则:

读写锁的思想是:读读共享,其他都互斥,因为读操作不会修改原有的数据,因此并发读不会有安全问题;而写操作发生时,不允许读和写操作加入。CopyOnWriteArrayList读取是完全不用加锁的,并且写入也不会阻塞读取操作,也就是说可以在写入的同时进行读取,只有写入和写入之间需要进行同步,也就是不允许多个写入同时发生,但可以在写入时允许读取发生。

特点:

  • CopyOnWrite

    当容器需要被修改的时候,不直接修改当前容器,而是先将当前容器进行Copy,复制出一个新容器,然后修改新的容器,完成修改之后,再将容器的引用指向新的容器。读写分离的思想,读和写使用不同的容器。

  • 迭代期间允许修改集合内容

    ArrayList源码里的ListItr的next()方法中有一个checkForComodification()方法:

    1
    2
    3
    4
    final void checkForComodification() {
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
    }

    modCount是保存修改次数,每次调用add、remove时都会增加,expectedComodification是迭代器的变量,创建迭代器时会初始化并记录当时的modCount,后面迭代期间如果发现modCount和expectedModCount不一致,就会抛出异常。CopyOnWriteArrayList的迭代器在迭代时,迭代器使用的依然是原数组,只不过迭代器的内容可能已经过时了。CopyOnWrite的迭代器一旦被建立,如果往之前的CopyOnWriteArrayList对象中去新增元素,在迭代器中既不会显示出元素的变更情况,同时也不会报错。

缺点:

  • 内存占用问题

  • 在元素较多或者复杂的情况下,复制的开销很大

  • 数据一致性问题

    由于CopyOnWrite容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的,只有在修改完之后才能体现出来。

源码分析:

  • 数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /** 可重入锁对象 */
    final transient ReentrantLock lock = new ReentrantLock();
    /** CopyOnWriteArrayList底层由数组实现,volatile修饰,保证数组的可见性 */
    private transient volatile Object[] array;
    /**
    * 得到数组
    */
    final Object[] getArray() {
        return array;
    }
    /**
    * 设置数组
    */
    final void setArray(Object[] a) {
        array = a;
    }
     
    /**
    * 初始化CopyOnWriteArrayList相当于初始化数组
    */
    public CopyOnWriteArrayList() {
        setArray(new Object[0]);
    }
  • add()方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public boolean add(E e) {
        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          // 得到原数组的长度和元素
            Object[] elements = getArray();
            int len = elements.length;
           // 复制出一个新数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            // 添加时,将新元素添加到新数组中
            newElements[len] = e;
            // 将volatile Object[] array 的指向替换成新数组
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

    在添加的时候首先上锁,并复制一个新数组,增加操作在新数组上完成,然后将array指向到新数组,最后解锁。上面的步骤实现了CopyOnWrite的思想:写操作是在原来容器的拷贝上进行的,并且在读取数据的时候不会锁住list。如果对容器拷贝操作的过程中有新的读线程进来,那么读到的还是旧的数据,因为那个时候对象的引用还没有被更改。

  • 迭代器 COWIterator 类

    1
    2
    3
    4
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }

    snapshot:数组的快照,即创建迭代器那个时刻的数组情况

    cursor:迭代器的游标

    迭代器在被构建的时候,会把当时的elements赋值给snapshot,而之后的迭代器所有的操作都基于snapshot数组进行的,比如:

    1
    2
    3
    4
    5
    public E next() {
        if (! hasNext())
            throw new NoSuchElementException();
        return (E) snapshot[cursor++];
    }

    可以看到,返回的内容是snapshot对象,所以,后续就算原数组被修改,这样snapshot既不会感知到,也不会受影响,执行迭代操作不需要加锁,也不会因此抛出异常。迭代器返回的结果,和创建迭代器的时候内容一致。

4.阻塞队列

4.1 什么是阻塞队列?

BlockingQueue,是一个接口,继承了Queue接口,是队列的一种,是线程安全的。

主要并发队列关系图

阻塞队列典型代表就是BlockingQueue接口的实现类,分别是ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue和LinkedTransferQueue。非阻塞队列的典型代表是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用CAS保证线程安全。

Deque为双端队列,它从头和尾都能添加和删除元素;而普通的Queue只能从一端进入,另一端出去。

特点

阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。

  • take方法

    获取并移除队列的头结点,在队列里有数据时可以正常移除,一旦执行take方法的时候,队列无数据,则阻塞,直到队列有数据。

  • put方法

    put方法插入元素时,如果队列已满,那么就无法继续插入,则阻塞,直到队列有了空闲空间。

是否有界(容量有多大)

无界队列意味着里面可以容纳非常多的元素,如LinkedBlockingQueue的上限是Integer.MAX_VALUE,约为2^31。有些阻塞队列是有界的,如ArrayBlockingQueue如果容量满了,也不会扩容,所以一旦满了,就无法再往里面放数据了。

4.2 阻塞队列常用方法

第一组:无法正常执行的情况下抛出异常;第二组:在无法正常执行的情况下不抛出异常,但会用返回值提示运行失败;第三组:在遇到特殊情况时让线程阻塞,等到可以运行再继续执行。

带有超时时间的offer和poll

1
offer(E e, long timeout, TimeUnit unit)

插入不成功时会等待指定的超时时间,时间到了依然没有插入成功,就会返回false

1
poll(long timeout, TimeUnit unit)

如果移除时,如果队列是空的就会进行等待,超时时间到了,如果队列中依然没有元素可供移除,则会返回null为提示

4.3 几种常见的阻塞队列

  • ArrayBlockingQueue

    有界队列,其内部是用数组存储元素的,利用ReentrantLock实现线程安全,在创建它的时候就需要指定它的容量,之后不可以再扩容了,可以在构造函数中指定是否公平。

    非公平:存在插队的可能;公平:等待最长时间的线程会被优先处理

  • LinkedBlockingQueue

    内部用链表实现,不指定容量时默认为Integer.MAX_VALUE,被称为无界队列。

  • SynchronousQueue

    容量为0,所以没有地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据也会阻塞,直到有消费者来取。Synchronous的容量不是1而是0,它不需要去持有元素,它所做的就是直接传递。

  • PriorityBlockingQueue

    支持优先级的无界阻塞队列,可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。同时插入的对象必须是可比较大小的,即Comparable的,否则会抛出ClassCastException。

  • DelayQueue

    具有”延迟”的功能,可以设定让队列中的任务延迟多久之后执行,如”30 分钟后未付款自动取消订单”。它是无界队列,放入的元素必须实现Delayed接口,而Delayed接口又继承了Comparable接口,拥有了比较和排序的能力。元素会根据延迟时间的长短放到队列的不同位置,越靠近头队列代表越早过期。

4.4 阻塞队列和非阻塞队列的并发安全原理

  • ArrayBlockingQueue

    1
    2
    3
    4
    5
    6
    7
    8
    // 用于存放元素的数组
    final Object[] items;
    // 下一次读取操作的位置
    int takeIndex;
    // 下一次写入操作的位置
    int putIndex;
    // 队列中的元素数量
    int count;
    1
    2
    3
    4
    // 以下3个是控制并发用的工具
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    这三个变量非常关键,第一个是ReentrantLock,下面两个Condition是由ReentrantLock产生出来的。读操作和写操作都需要先获取到ReentrantLock独占锁才能进行下一步操作。进行读操作时如果队列为空,线程就会进入到读线程专属的noEmpty的Condition的队列中去排队,等待写线程写入新的元素;同理如果队列已满,写操作的线程会进入到写线程专属的notFull队列中去排队,等待读线程将队列元素移除并腾出空间。

    put方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
            notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    LinkedBlockingQueue的内部有两把锁,分别锁住队列的头和尾,比共用一把锁的效率高。

  • 非阻塞队列ConcurrentLinkedQueue

    offer方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

    整个是以一个大的for循环,p.casNext()方法

    1
    2
    3
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    这里运用了UNSAFE.compareAndSwapObject方法来完成CAS操作,而compareAndSwapObject是一个native方法,最终会利用CPU的CAS指令保证其不可中断。非阻塞队列ConcurrentLinkedQueue使用CAS非阻塞算法+不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景。

4.5 如何选择合适的阻塞队列?

  • 线程池对于阻塞队列的选择

从以下5个角度考虑,来选择合适的阻塞队列。

  • 功能

    是否需要阻塞队列来排序,如优先级排序、优先执行等。

  • 容量

    是否需要有存储要求,还是只需要”直接传递”。

  • 能否扩容

    业务可能有高峰期、低谷期,如果需要动态扩容,就不能选择ArrayBlockingQueue。

  • 内存结构

    如ArrayBlockingQueue的内部结构是”数组”的形式,LinkedBlockingQueue的内部是链表实现的,ArrayBlockingQueue没有链表所需要的”节点”,空间链表利用率更高。

  • 性能

    如LinkedBlockingQueue拥有两把锁,操作粒度更细,并发程度高的时候,相对于只有一把锁的ArrayBlockingQueue性能会更好。SynchronousQueue性能往往优于其他实现,因为它只需要”直接传递”,而不需要存储的过程。

5.原子类

5.1 原子类如何利用CAS保证线程安全?

原子类的作用和锁有类似之处,都是为了保证并发情况下线程安全。

  • 粒度更细:原子变量可以把竞争范围缩小到变量级别,通常情况下,锁的粒度都要大于原子变量的粒度。
  • 效率更高:除高度竞争的情况下,原子类的效率通常比使用同步互斥锁的效率更高,因为原子类利用了CAS操作,不会阻塞线程。

6类原子类纵览

类型 具体类 特点
Atomic* 基本类型原子类 AtomicInteger、AtomicLong、AtomicBoolean
Atomic*Array 数组类型原子类 AtomicIntegerArray(整形数组原子类)、AtomicLongArray(长整形数组原子类)、AtomicReferenceArray(引用类型数组原子类)
Atomic*Reference 引用类型原子类 AtomicReference、AtomicStampedReference(对AtomicReference的升级,在此基础上还加了时间戳,用于解决CAS的ABA问题)、AtomicMarkableReference(和AtomicReference类似,多了一个绑定的布尔值,可以用于表示该对象已删除等场景) AtomicInteger可以让一个整数保证原子形,AtomicReference可以让一个对象保证原子性。
Atomic*FieldUpdater升级类型原子类 AtomicIntegerFieldUpdater(原子更新整形的更新器)、AtomicLongFieldUpdater(原子更新长整形的更新器)、AtomicReferenceFieldUpdater(原子更新引用的更新器) 可以把已经声明的变量进行升级,使其拥有CAS操作的能力。
Adder累加器 LongAdder、DoubleAdder
Accumulator积累器 LongAccumulator、DoubleAccumulator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AtomicIntegerFieldUpdaterDemo implements Runnable {
public static class Score {
volatile int score;
}

static Score math;
static Score computer;
static AtomicIntegerFieldUpdater<Score> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Score.class, "score");

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
computer.score++;
scoreUpdater.getAndIncrement(math);
}
}

public static void main(String[] args) throws InterruptedException {
math = new Score();
computer = new Score();
AtomicIntegerFieldUpdaterDemo updaterDemo = new AtomicIntegerFieldUpdaterDemo();
Thread thread1 = new Thread(updaterDemo);
Thread thread2 = new Thread(updaterDemo);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("普通变量的结果:"+ computer.score);
System.out.println("升级后的结果:"+ math.score);
}
}

以AtomicInteger为例,分析其如何利用CAS实现原子操作?

  • getAndAdd()方法

    1
    2
    3
    4
    //JDK 1.8实现
    public final int getAndAdd(int delta) {
       return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    • Unsafe类

      Unsafe类是CAS的核心类。Java无法直接访问底层操作系统,而需要通过native方法实现。在JDK中有一个Unsafe类,提供了硬件级别的原子操作,可以利用它直接操作内存数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class AtomicInteger extends Number implements java.io.Serializable {
       // setup to use Unsafe.compareAndSwapInt for updates
       private static final Unsafe unsafe = Unsafe.getUnsafe();
       private static final long valueOffset;
       static {
           try {
               valueOffset = unsafe.objectFieldOffset
                   (AtomicInteger.class.getDeclaredField("value"));
           } catch (Exception ex) { throw new Error(ex); }
       }
       private volatile int value;
       public final int get() {return value;}
       ...
    }

    static代码块会在类加载的时候执行,执行时会调用Unsafe的objectFieldOffset方法,从而得到当前这个原子类的value的偏移量(在内存中的偏移地址),并且赋给valueOffset变量,并且赋值给valueOffset变量,Unsafe根据内存偏移地址获取数据的原值,这样就可以通过Unsafe来实现CAS了。

    value是用volatile修饰的,它就是我们原子类存储的值的变量,由于它被volatile修饰,我们就可以保证在多线程之间看到的value是同一份,保证了可见性。

    Unsafe的getAndAddInt方法:

    1
    2
    3
    4
    5
    6
    7
    public final int getAndAddInt(Object var1, long var2, int var4) {
       int var5;
       do {
           var5 = this.getIntVolatile(var1, var2);//获取var1中的var2偏移处的值 var1:当前原子类 var2:最开始获取到的offset
       } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));//var1:object 当前原子类对象 var2:offset 即偏移量,借助它就可以获取到value的数值 var3:expectedValue 代表"期望值",传入的是刚才获取到的var5 var5+var4:newValue 是希望修改的数值,等于之前取到的数值var5+var4,var4是希望原子类所改变的数值,如+1或-1。
       return var5;
    }

    compareAndSwapInt方法的作用:判断如果现在原子类里的value的值和之前获取到的var5相等的话,那么就把计算出来的var5+var4给更新上去。一旦CAS操作成功,就会退出这个while循环,但也有可能操作失败。如果操作失败就意味着在获取到var之后,并在CAS操作之前,value的数值已经发生变化了,证明有其他线程修改过这个变量。会再次执行循环体里面的代码,重新获取var5,即获取最新的原子变量的数值,并再次利用CAS尝试更新,直到更新成功。

5.2 AtomicInteger在高并发下性能不好,如何解决?为什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AtomicLongDemo {
static class Task implements Runnable {
private final AtomicLong counter;

public Task(AtomicLong counter) {
this.counter = counter;
}

@Override
public void run() {
counter.incrementAndGet();
System.out.println(Thread.currentThread().getName()+"..."+counter.get());
}
}

public static void main(String[] args) throws InterruptedException {
AtomicLong counter = new AtomicLong(0);
ExecutorService poolExecutor = new ThreadPoolExecutor(20, 40, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (int i = 0; i < 100; i++) {
poolExecutor.submit(new Task(counter));
}
Thread.sleep(2000);
System.out.println("result:"+counter.get());
stopWatch.stop();
System.out.println(stopWatch.getTotalTimeMillis());
}
}

每一个线程是运行在自己的core中的,并且它们都有一个本地内存是自己独用的。在本地内存下方有两个CPU核心共用的共享内存。对于AtomicLong内部的value属性而言,它是被volatile修饰的,需要保证自身可见性。每次它的数值变化的时候,都需要进行flush到共享内存和refresh到本地内存。

flush和refresh操作耗费了很多资源,而且CAS也会经常失败。

LongAdder

LongAdder引入了分段累加的概念,内部一共有两个参数参与计数:

  1. base,是一个变量,用在竞争不激烈的情况下,可以直接把来家结果改到base变量上。
  2. Cell[],是一个数组,一旦竞争激烈,各个线程会分散累加到自己所对应的那个Cell[]数组的某一个对象中,而大家不会共用同一个。

竞争激烈的时候,LongAdder会通过计算出每个线程的hash值来给线程分配到不同的Cell上去,每个Cell相当于是一个独立的计数器,Cell之间并不存在竞争,所以自加过程中,大大减少了flush和refresh,以及降低了冲突的概率。空间换时间。

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
   Cell[] as = cells; Cell a;
   long sum = base;
   if (as != null) {
       for (int i = 0; i < as.length; ++i) {
           if ((a = as[i]) != null)
               sum += a.value;
       }
   }
   return sum;
}

如何选择

如何仅仅是需要用到加和减操作的场景,那么可以直接使用LongAdder。

如果需要利用CAS比如compareAndSet等操作的话,就需要使用AtomicLong来完成。

5.3 原子类与volatile

线程1和线程2分别在不同的CPU核心,每一个核心都有自己的本地内存,并且在下方也有它们的共享内存。在变量加上volatile关键字,线程1的更改会被flush到共享内存,然后又被refresh到线程2的本地内存,保证了可见性。

但对于value++这种,即使用volatile修饰value也是不能保证线程安全的,无法保证其原子性。此时可以使用原子类。

原子类和volatile的使用场景

通常情况下,volatile可以用来修饰boolean类型的标记位,对于标记位来讲,直接的赋值操作本身就具有原子性,再加上volatile保证了可见性,那么就是线程安全的了。而对于会被多个线程同时操作的计数器counter的场景,即不仅仅是赋值操作,还需要读取当前值,然后在此基础上进行一定的修改,再把它给赋值回去,此时需要使用原子类保证线程安全。

5.4 Adder与Accumlator的区别

高并发场景下AtomicLong CAS冲突概率大,会导致经常自旋。而LongAdder引入了分段锁的概念,竞争不激烈的时候,所有线程都是通过CAS对同一个Base变量进行修改,但竞争激烈的时候,LongAdder会把不同线程对应到不同的Cell上进行修改,降低了冲突的概率。

LongAccumulator就是个更通用版本的Adder,提供了自定义的函数操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LongAccumulatorDemo {
public static void main(String[] args) throws InterruptedException {
LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
ExecutorService executorService = new ThreadPoolExecutor(8, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(1,10).forEach(i->executorService.submit(()->{
accumulator.accumulate(i);
System.out.println(Thread.currentThread().getName()+"..."+accumulator.get());
}));
Thread.sleep(2000);
System.out.println(accumulator.getThenReset());
}
}

自定义函数:

1
2
3
4
LongAccumulator counter = new LongAccumulator((x, y) -> x + y, 0);
LongAccumulator result = new LongAccumulator((x, y) -> x * y, 0);
LongAccumulator min = new LongAccumulator((x, y) -> Math.min(x, y), 0);
LongAccumulator max = new LongAccumulator((x, y) -> Math.max(x, y), 0);

适用场景

  1. 需要大量的计算,并且当需要并行计算的时候。
  2. 计算的执行顺序并不关键。

6.ThreadLocal

6.1 ThreadLocal适用场景

  1. 场景1

    保存每个线程独享的对象,为每个线程都创建一个副本,这样每个线程都可以修改自己拥有的副本,而不会影响其他线程的副本,确保了线程安全。

    这种场景下,每个Thread内都有自己的实例副本,且该副本只能由当前Thread访问到并使用,相当于每个线程内部的本地变量。因为每个线程独享副本,而不是共用的,所以不存在多线程间共享的问题。

    这种场景通常用于保存线程不安全的工具类,如SimpleDateFormat。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class ThreadLocalDemo05 {
    static ThreadLocal<SimpleDateFormat> formatThreadLocal = new ThreadLocal<SimpleDateFormat>(){
    @Override
    protected SimpleDateFormat initialValue() {
    return new SimpleDateFormat("mm:ss");
    }
    };

    public static ExecutorService executorService = new ThreadPoolExecutor(16, 32, 10, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(), new ThreadPoolExecutor.AbortPolicy());

    public String date(int seconds) {
    Date date = new Date(1000 * seconds);
    SimpleDateFormat simpleDateFormat = formatThreadLocal.get();
    return simpleDateFormat.format(date);
    }

    public static void main(String[] args) throws InterruptedException {
    IntStream.range(1, 1000).forEach(i -> executorService.submit(() -> {
    String date = new ThreadLocalDemo05().date(i);
    System.out.println(Thread.currentThread().getName() + ":" + date);
    }));
    Thread.sleep(2000);
    executorService.shutdown();
    }
    }
  2. 场景2

    每个线程内需要独立保存信息,以便其他方法更方便的获取该信息的场景。每个线程获取到的信息可能都是不一样的,前面执行的方法保存了信息之后,后续方法可以通过ThreadLocal直接获取到,避免了传参,类似于全局变量的概念。

    每个线程内需要保存类似于全局变量的信息(列如拦截器中获取的用户信息),可以让不同方法直接使用,避免参数传递的麻烦却不想被多线程共享(因为不同线程获取到的用户信息不一样)。

    例如,用ThreadLocal保存一些业务内容(用户权限信息),这些信息在同一个线程内相同,但在不同的线程使用的业务内容是不相同的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class ThreadLocalDemo07 {
    public static void main(String[] args) {
    new Service1().service1();
    }
    }

    class User {
    String username;

    public User() {}

    public User(String username) {
    this.username = username;
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }
    }

    class userContextHolder {
    public static ThreadLocal<User> holder = new ThreadLocal<>();
    }

    class Service1 {
    public void service1() {
    User user = new User("张三");
    userContextHolder.holder.set(user);
    new Service2().service2();
    }
    }

    class Service2 {
    public void service2() {
    User user = userContextHolder.holder.get();
    System.out.println("Service2拿到用户名:" + user.getUsername());
    new Service3().service3();
    }
    }

    class Service3 {
    public void service3() {
    User user = userContextHolder.holder.get();
    System.out.println("Service3拿到用户名:" + user.getUsername());
    userContextHolder.holder.remove();
    }
    }

6.2 ThreadLocal是用来解决共享资源的多线程访问的问题吗?

不是,虽然ThreadLocal是用于解决多线程情况下的线程安全问题,但其资源并不是共享的,而是每个线程独占的。

如果把放到ThreadLocal中的资源用static修饰,让它变为一个共享资源的话,那么即便使用ThreadLocal,同样有线程安全问题。

ThreadLocal和synchronized是什么关系?

  • ThreadLocal是通过让每个线程独享自己的副本,避免了资源的竞争。
  • synchronized主要用于临界资源的分配,在同一时刻限制最多只有一个线程能够访问该资源

相比于ThreadLocal而言,synchronized的效率会更低一些,但花费的内存也更少。但对于ThreadLocal而言,它还有不同的使用场景。比如避免传参。

6.3 ThreadLocal的结构

Thread、ThreadLocal及ThreadLocalMap三者之间的关系

每个Thread对象中都持有一个ThreadLocalMap类型的成员变量,这个ThreadLocalMap自身类似一个Map,里面会有一个个key-value形式的,key就是ThreadLocal的引用,value就是希望ThreadLocal存储的内容。

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public T get() {
//获取到当前线程
Thread t = Thread.currentThread();
//获取到当前线程内的ThreadLocalMap对象,每个线程内都有一个ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
if (map != null) {
//获取ThreadLocalMap中的Entry对象并拿到value,每个线程内都有一个ThreadLocalMap对象
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果线程内之前没创建过ThreadLocalMap,就创建
return setInitialValue();
}

getMap方法

1
2
3
4
5
   ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

ThreadLocal.ThreadLocalMap threadLocals = null;

set方法

1
2
3
4
5
6
7
8
9
10
11
public void set(T value) {
//获取到当前线程
Thread t = Thread.currentThread();
//获取当前线程内的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
//第一个参数this:当前ThreadLocal的引用,key的类型则是ThreadLocal;第二个参数即为所传入的value
map.set(this, value);
else
createMap(t, value);
}

ThreadLocalMap类,即Thread.threadLocals

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

private static final int INITIAL_CAPACITY = 16;

private Entry[] table;
......
}

在ThreadLocalMap中会有一个Entry类型的数组,名字叫table。可以理解为一个map,其键值对为:

  • 键,当前的ThreadLocal
  • 值,实际需要存储的变量,比如user用户对象或者simpleDateFormat对象

HashMap在面对hash冲突的时候,采用的是拉链法,它会先把对象hash到一个对应的格子中,如果有冲突就用链表的形式往下链;但ThreadLocalMap采用的是线性探测法,如果发生冲突,并不会用链表的形式往下链,而是会继续寻找下一个空的格子。

6.4 为何每次用完 ThreadLocal 都要调用 remove()?

内存泄漏:当某一个对象不再有用的时候,占用的内存却不能被回收。

1
2
3
4
5
6
7
8
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;
    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Entry是extends WeakReference。弱引用的特点:如果这个对象只被弱引用关联,而没有任何强引用关联,那么这个对象就可以被回收,所以弱引用不会阻止GC。

但是这个Entry包含了一个对value的强引用。value=v这行代码就代表了强引用的发生。

Thread Ref → Current Thread → ThreadLocalMap → Entry → Value → 可能泄漏的value实例。

这条链路是随着线程的存在而一直存在,如果线程迟迟不会终止,那么当垃圾回收进行可达性分析的时候,这个value就是可达的,所以不会被回收。但与此同时可能已经完成了业务逻辑处理,不再需要这个value了,此时就发生了内存泄漏。

在执行ThreadLocal的set、remove、rehash等方法时,都会扫描key为null的Entry,如果发现某个Entry的key为null,则代表它所对应的value也没有作用了,所以就会把对应的value设置为null,这样,value对象就可以被正常回收了。但假设ThreadLocal已经不被使用了,那么实际上set、remove、rehash方法也不会被调用。

如何避免内存泄漏

调用 ThreadLocal 的 remove 方法。调用这个方法就可以删除对应的 value 对象,可以避免内存泄漏。

1
2
3
4
5
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

7.Future

7.1 Callable和Runnable的不同

  • Runnable的不足

    1. 不能返回一个返回值

    2. 不能抛出checked Exception

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      public class RunThrowException {

         /**
          * 普通方法内可以 throw 异常,并在方法签名上声明 throws
          */
         public void normalMethod() throws Exception {
             throw new IOException();
         }

         Runnable runnable = new Runnable() {
             /**
              *  run方法上无法声明 throws 异常,且run方法内无法 throw 出 checked Exception,除非使用try catch进行处理
              */

             @Override
             public void run() {
                 try {
                     throw new IOException();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
         }
      }

    Runnable规定了run()方法的返回类型是void,而且没有声明抛出任何异常。所以,当实现并重写这个方法的时候,既不能改变返回值类型,也不能更改对于异常抛出的描述。

    1
    2
    3
    public interface Runnable {
       public abstract void run();
    }
  • Callable接口

    call方法已经声明了throws Exception,前面还有一个V泛型的返回值。

    1
    2
    3
    public interface Callable<V> {
         V call() throws Exception;
    }
  • Callable和Runnable的不同之处

    1. 方法名:Callable规定的执行方法是call(),而Runnable规定的执行方法是run()
    2. 返回值:Callable的任务执行后有返回值,而Runnable的任务执行后是没有返回值的
    3. 抛出异常:call()方法可抛出异常,而run方法是不能抛出检查异常的
    4. 和Callable配合使用的Future类,通过Future可以了解任务的执行情况,或者取消任务的执行,还可获取任务的执行结果等。

7.2 Future的主要功能

Future的作用

比如当做一定较耗时的任务时,可以把任务放到子线程去执行,再通过Future去控制子线程执行的过程,最后获取到计算结果。通过异步的思想,提高程序的运行效率。

Callable和Future的关系

Callable接口相比于Runnable可以通过Future类的get方法返回结果。因此,Future类相当于一个存储器,它存储了Callable的call方法的任务结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
  • get() 获取结果

    获取任务执行的结果

    1. 当执行get的时候,任务已经执行完毕了。可以立刻返回,获取到任务执行的结果。
    2. 任务还未开始或任务正在执行中,调用get时,都会把当前线程阻塞,直到任务完成再把结果返回回来。
    3. 任务执行过程中抛出异常,调用get时,就会抛出ExecutionException,且无论执行call方法时里面抛出的异常类型是什么,在执行get方法时所获得的异常都是ExecutionException。
    4. 任务被取消了,如果任务被取消,调用get方法时则会抛出CancellationException。
    5. 任务超时,调用带延迟参数的get方法之后,如果call方法在规定时间内仍没有完成任务,get方法则会抛出TimeoutException,代表超时了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /*
    *一个Future的使用
    */
    public class OneFuture {
    static class CallableTask implements Callable{
    @Override
    public Object call() throws Exception {
    Thread.sleep(3000);
    return new Random().nextInt()+Thread.currentThread().getName();
    }
    }
    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Future future = executorService.submit(new CallableTask());
    try {
    System.out.println(future.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    executorService.shutdown();
    }
    }
  • isDone() 判断是否执行完毕

    判断当前线程是否执行完毕,返回true代表已经执行完毕,返回false则代表还没完成。但这里如果返回true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常,仍然会返回true,所以isDone方法在返回true的时候,不代表这个任务是成功执行的,只代表它执行完毕了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class GetException {
    static class CallableTask implements Callable{
    @Override
    public Object call() throws Exception {
    throw new IllegalArgumentException("Callable抛出异常!");
    }
    }
    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(20);
    Future future = executorService.submit(new CallableTask());
    try {
    for (int i = 0; i < 5; i++) {
    System.out.println(i);
    Thread.sleep(500);
    }
    System.out.println(future.isDone());
    future.get();
    } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
    }
    }
    }


    0
    1
    2
    3
    4
    true
    java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常!
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    ......

    这段代码的运行结果证明了:

    1. 即便任务抛出异常,isDone方法依然会返回true。
    2. 虽然call方法抛出的异常是IllegalArgumentException,但对于get而言,它抛出的异常依然是ExecutionException。
    3. 虽然在任务执行一开始就抛出了异常,但真正要等到执行get的时候,才看到了异常。
  • cancel 取消任务的执行

    1. 任务还未执行,任务会被正常取消,未来也不会被执行,返回true。
    2. 任务已经完成或被取消过,返回false。
    3. 任务正在执行,会根据传入的参数mayInterruptIfRunning,如果传入的参数是true,执行任务的线程会收到一个中断的信号。如果传入的是false,就代表不中断正在运行的任务,同时返回false。

    true:明确知道这个任务能够处理中断

    false:明确知道这个任务不能处理中断;不知道这个任务是否支持取消(是否能够响应中断);如果这个任务一旦开始运行,就希望它完全的执行完毕。

  • isCancelled() 判断是否被取消

用FutureTask创建Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class FutureTaskDemo {
public static void main(String[] args) {
Task task = new Task();
FutureTask futureTask = new FutureTask(task);
new Thread(futureTask).start();
try {
System.out.println("task运行结果:"+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable{
@Override
public Object call() throws Exception {
System.out.println("子线程"+Thread.currentThread().getName()+"正在计算!");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}

7.3 Future注意点

  1. 当for循环批量获取Future的结果时容易block,get方法调用时应该使用timeout限制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    public class FutureDemo {
    static class SlowTask implements Callable {
    @Override
    public Object call() throws Exception {
    Thread.sleep(5000);
    return "速度慢的任务" + Thread.currentThread().getName();
    }
    }

    static class FastTask implements Callable {
    @Override
    public Object call() throws Exception {
    return "速度快的任务" + Thread.currentThread().getName();
    }
    }

    public static void main(String[] args) {
    ExecutorService executorService = new ThreadPoolExecutor(10, 10, 10, TimeUnit.MICROSECONDS,
    new LinkedBlockingDeque<>(), new ThreadPoolExecutor.AbortPolicy());
    List<Future> futures = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
    Future future;
    if (i == 0 || i == 1) {
    future = executorService.submit(new SlowTask());
    } else {
    future = executorService.submit(new FastTask());
    }
    futures.add(future);
    }
    for (int i = 0; i < 4; i++) {
    Future future = futures.get(i);
    try {
    String result = (String)future.get();
    System.out.println(result);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    executorService.shutdown();
    }
    }


    速度慢的任务pool-1-thread-1
    速度慢的任务pool-1-thread-2
    速度快的任务pool-1-thread-3
    速度快的任务pool-1-thread-4

    第三个任务量比较小,可以很快返回结果,紧接着第四个任务也会返回结果。但由于前两个任务速度很慢,所以get方法执行时,会卡在第一个任务上。所以,即使第三、四个任务很早就得到结果了,但在此使用for循环的方式去获取结果,依然无法及时获取第三、四个任务的结果。直到5秒后,第一个任务出结果了,我们才能获取到,紧接着获取剩下任务的结果。

    此时可以使用Future的带超时参数的get(long timeout, TimeUnit unit)方法,如果在限定时间内没能返回结果,即抛出TimeoutException。

  2. Future的生命周期不可后退

    Future的生命周期不可后退,一旦完成了任务,它就永久停在了”已完成”的状态,不能重头再来,即不能让一个已经完成计算的Future再次重新执行任务。

Future产生新的线程了吗

Callable和Future本身并不能产生新的线程,它们需要借助其它的比如Thread类或者线程池才能执行任务。例如:在把Callable提交到线程池后,真正执行Callable的其实还是线程池中的线程,而线程池中的线程是由ThreadFactory产生的。

7.4 CountDownLatch、Completable

  • 线程池实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class ThreadPoolDemo {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        public static void main(String[] args) throws InterruptedException {
            ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
            System.out.println(threadPoolDemo.getPrices());
        }
        private Set<Integer> getPrices() throws InterruptedException {
            Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
            threadPool.submit(new Task(123, prices));
            threadPool.submit(new Task(456, prices));
            threadPool.submit(new Task(789, prices));
            Thread.sleep(3000);
            return prices;
        }
        private class Task implements Runnable {
            Integer productId;
            Set<Integer> prices;
            public Task(Integer productId, Set<Integer> prices) {
                this.productId = productId;
                this.prices = prices;
            }
            @Override
            public void run() {
                int price=0;
                try {
                    Thread.sleep((long) (Math.random() * 4000));
                    price= (int) (Math.random() * 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                prices.add(price);
            }
        }
    }
  • CountDownLatch

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class CountDownLatchDemo {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        public static void main(String[] args) throws InterruptedException {
            CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
            System.out.println(countDownLatchDemo.getPrices());
        }
        private Set<Integer> getPrices() throws InterruptedException {
            Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
            CountDownLatch countDownLatch = new CountDownLatch(3);
            threadPool.submit(new Task(123, prices, countDownLatch));
            threadPool.submit(new Task(456, prices, countDownLatch));
            threadPool.submit(new Task(789, prices, countDownLatch));
            countDownLatch.await(3, TimeUnit.SECONDS);
            return prices;
        }

        private class Task implements Runnable {
            Integer productId;
            Set<Integer> prices;
            CountDownLatch countDownLatch;
            public Task(Integer productId, Set<Integer> prices,
                    CountDownLatch countDownLatch) {
                this.productId = productId;
                this.prices = prices;
                this.countDownLatch = countDownLatch;
            }

            @Override
            public void run() {
                int price = 0;
                try {
                    Thread.sleep((long) (Math.random() * 4000));
                    price = (int) (Math.random() * 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                prices.add(price);
                countDownLatch.countDown();
            }
        }
    }

    执行countDownLatch.await(3, TimeUnit.SECONDS)等待时,如果三个任务都非常快速得执行完毕了,那么都已经执行了countDown方法,相当于把计数减1。如果有一个线程没有执行countDown方法,来不及在3秒内执行完毕,那么这个带超时参数的await方法也会在3秒以后,及时的放弃这一次等待,于是就把prices返回了。

  • CompletableFuture

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public class CompletableFutureDemo {
    private class Task implements Runnable {

    Integer productId;
    Set<Integer> prices;

    public Task(Integer productId, Set<Integer> prices) {
    this.productId = productId;
    this.prices = prices;
    }

    @Override
    public void run() {
    int price = 0;
    try {
    Thread.sleep((long)(Math.random() * 4000));
    price = (int)(Math.random() * 4000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    prices.add(price);
    }
    }

    private Set<Integer> getPrices() {
    Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
    CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));
    CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));
    CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));
    CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
    try {
    allTasks.get(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    return prices;
    }

    public static void main(String[] args) {
    CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();
    System.out.println(completableFutureDemo.getPrices());
    }
    }

    CompletableFuture的runAsync()方法,这个方法会异步的去执行任务。

8.线程协作

8.1 信号量

控制需要限制并发访问量的资源。

使用流程

  1. 初始化一个信号量,并传入许可证的数量。public Semaphore(int permits, boolean fair),传入两个参数,第一个参数是许可证的数量,另一个参数是是否公平,如果为true,代表是公平的策略,会把之前已经在等待的线程放入到队列中,当有新的许可证时,会按照顺序发放;如果为false,则代表非公平策略,也就有可能插队。

  2. 在调用慢服务之前,线程调用acquire()或者acquireUninterruptibly()获取许可证。如果此时信号量没有剩余的许可证,那么线程会等在acquire()的这一行代码中,不会进一步执行下面调用服务的方法。

    acquire()和acquireUninterruptibly()的区别:是否能够中断。acquire()支持中断,即在获取信号量期间,假如这个线程被中断了,那么它就会跳出acquire(),不再继续尝试获取了,而acquireUninterruptibly()方法是不会中断的。

  3. 任务执行完毕之后,调用release()释放许可证。

其他的主要方法

  1. public boolean tryAcquire()

    尝试获取许可证,获取不到不会阻塞,可以去做其他事。

  2. public boolean tryAcquire(long timeout, TimeUnit unit)

    超时时间到,依然获取不到许可证,认为获取失败,返回false。

  3. availablePermits()

    查询可用许可证的数量,返回一个整形的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SemaphoreDemo2 {
static Semaphore semaphore = new Semaphore(5);
static ThreadLocal<StopWatch> stopWatchThreadLocal = ThreadLocal.withInitial(() -> new StopWatch());

private static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "获取到许可证,开始执行任务!");
StopWatch stopWatch = stopWatchThreadLocal.get();
stopWatch.start();
try {
Thread.sleep(3000);
stopWatch.stop();
System.out.println("慢服务执行完毕,耗时:" + stopWatch.getTotalTimeMillis() + "---"
+ Thread.currentThread().getName() + "释放了许可证!");
semaphore.release();
stopWatchThreadLocal.remove();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(50, 50, 5, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 1000; i++) {
executorService.submit(new Task());
}
executorService.shutdown();
}
}

特殊用法:一次获取或释放多个许可证

semphore.acquire(2) semaphore.release(3)

注意点

  • 获取和释放的许可证数量尽量保持一致
  • 在初始化时可以设置公平性,true会让它更公平,false则会让总的吞吐量更高
  • 信号量是支持跨线程、跨线程池的,并且并不是哪个线程获得的许可证,就必须由这个线程去释放,对于获取和释放许可证的线程是没有要求的。

8.2 CountDownLatch 是如何安排线程执行顺序的?

主要方法

  1. 构造函数

    public CountDownLatch(int count){

    }

    count是需要倒数的值

  2. await()

    调用await()方法的线程开始等待,直到倒数结束,也就是count值为0的时候才会继续执行。

  3. await(long timeout, TimeUnit unit)

    和await()类似,但这里可以设置超时时间,如果超时就不等待了。

  4. countDown()

    把数值倒数1,也就是将count值减1,直到减为0时,之前等待的线程会被唤起。

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RunDemo3 {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(),
new ThreadPoolExecutor.AbortPolicy());
CountDownLatch downLatch1 = new CountDownLatch(5);
CountDownLatch downLatch2 = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
int finalI = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println(finalI + "号运动员准备完毕,等待裁判员的发令枪");
downLatch2.await();
Thread.sleep((long)(Math.random() * 10000));
System.out.println(finalI + "号运动员完成了比赛");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
downLatch1.countDown();
}
}
};
executorService.submit(runnable);
}
Thread.sleep(5000);
System.out.println("5秒准备时间已过,发令枪响,比赛开始!");
downLatch2.countDown();
System.out.println("等待5个运动员都跑完....");
downLatch1.await();
System.out.println("所有人都跑完了,比赛结束");
executorService.shutdown();
}
}

注意点

  • CountDownLatch是不能够重用的,比如已经完成了倒数,不可以在下一次继续去重新倒数。可以考虑使用CyclicBarrier或创建一个新的CountDownLatch实例。

8.3 CyclicBarrier和CountdownLatch

CyclicBarrier可以构造出一个集结点,当某一个线程执行await()的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。直到预定数量的线程都到了这个集结点之后,这个栅栏就会撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CyclicBarrierDemo {
static class Task implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
System.out.println("同学" + id + "现在从大门出发,前往自行车驿站");
Thread.sleep((long)(Math.random() * 10000));
System.out.println("同学" + id + "到了自行车驿站,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("同学" + id + "开始骑车");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
// 当线程达到集结点,执行下一次动作之前,会执行一次这个动作
@Override
public void run() {
System.out.println("凑齐3人了,GO!");
}
});
for (int i = 0; i < 6; i++) {
new Thread(new Task(i + 1, cyclicBarrier)).start();
}
}
}

执行动作barrierAction

public CyclicBarrier(int parties, Runnable barrierAction): 当parties线程到达集结点时,继续往下执行前,会执行这一次这个动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
同学1现在从大门出发,前往自行车驿站
同学5现在从大门出发,前往自行车驿站
同学6现在从大门出发,前往自行车驿站
同学4现在从大门出发,前往自行车驿站
同学3现在从大门出发,前往自行车驿站
同学2现在从大门出发,前往自行车驿站
同学5到了自行车驿站,开始等待其他人到达
同学2到了自行车驿站,开始等待其他人到达
同学6到了自行车驿站,开始等待其他人到达
凑齐3人了,GO!
同学6开始骑车
同学5开始骑车
同学2开始骑车
同学3到了自行车驿站,开始等待其他人到达
同学4到了自行车驿站,开始等待其他人到达
同学1到了自行车驿站,开始等待其他人到达
凑齐3人了,GO!
同学1开始骑车
同学3开始骑车
同学4开始骑车

CyclicBarrier和CountDownLatch的异同

相同点:都能阻塞一个或一组线程,直到某个预设条件达成,再统一出发。

不同点:

  • 作用对象不同:

    CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说CountDownLatch作用于事件,但CyclicBarrier作用于线程;CountDownLatch是在调用了countDown方法之后把数字减1,而CyclicBarrier是在某线程开始等待后把计数减1。

  • 可重用性不同:

    CountDownLatch在倒数0并且触发门闩打开后,就不能再次使用了,除非新建一个新的实例;而CyclicBarrier可以重复使用。CyclicBarrier还可以随时调用reset方法进行重置,如果重置时有线程已经调用了await方法并开始等待,那么这些线程则会抛出BrokenBarrierException异常。

  • 执行动作不同:

    CyclicBarrier有执行动作barrierAction,而CountDownLatch没这个功能。

8.4 Condition、object都wait()何notify()的关系

假设线程1需要等待某些条件满足后,才能继续运行,如等待某个时间点到达或者等待某些任务处理完毕。此时,就可以执行Condition的await方法,一旦执行了该方法,这个线程就会进入WATTING状态。通常还有另外一个线程2,它去达成对应的条件,直到这个条件达成之后,那么线程2调用signal方法或signalAll方法,代表”条件达成,之前等待这个条件的线程现在可以苏醒了“。这个时候,JVM就会找到等待该Condition的线程,并予以唤醒,线程1在此时就会被唤醒,线程状态又会回到Runnable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ConditionDemo {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

void task1() throws InterruptedException {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ":条件不满足,开始await");
condition.await();
System.out.println(Thread.currentThread().getName() + "条件满足了,开始执行后续的任务");
} finally {
lock.unlock();
}
}

void task2() throws InterruptedException {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ":需要5秒钟的准备时间");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + ":准备工作完成,唤醒其他的线程");
condition.signal();
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ConditionDemo conditionDemo = new ConditionDemo();
new Thread(new Runnable() {
@Override
public void run() {
try {
conditionDemo.task2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo.task1();
}
}

main:条件不满足,开始await
Thread-0:需要5秒钟的准备时间
Thread-0:准备工作完成,唤醒其他的线程
main条件满足了,开始执行后续的任务

注意点

  • 线程2解锁后,线程1才能获得锁并继续执行

    调用signal之后,还需要等待子线程完全退出这个锁,即执行unlock之后,这个主线程才有可能去获取到这把锁,并且当获取锁成功之后才能继续执行后面的任务。

  • signalAll()和signal()区别

    signalAll()会唤醒所有正在等待的线程,而signal()只会唤醒一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* Condition实现简易版阻塞队列
*/
public class MyBlockingQueueForCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();

public MyBlockingQueueForCondition(int maxSize) {
this.max = maxSize;
queue = new LinkedList();
}

public void put(Object object) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
queue.add(object);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}


/*
* 使用wait/notify来实现简易版阻塞队列
*/
public class MyBlockingQueueForWaitNotify {
private int maxSize;
private LinkedList<Object> queue;

public MyBlockingQueueForWaitNotify(int maxSize) {
this.maxSize = maxSize;
queue = new LinkedList<>();
}

public synchronized void put(Object object) throws InterruptedException {
while (queue.size() == maxSize) {
this.wait();
}
queue.add(object);
this.notifyAll();
}

public synchronized Object take() throws InterruptedException {
while (queue.size() == 0) {
this.wait();
}
Object item = queue.remove();
this.notifyAll();
return item;
}
}

Condition把Object的wait/notify/notifyAll转化为了一种相应的对象,其实现的效果基本一样,但是把更复杂的用法,变成了更直观可控的对象方法,是一种升级。await方法会自动释放持有的Lock锁,否则会抛出异常,和Object的wait一样,不需要自己手动释放锁。另外,调用await的时候必须持有锁,否则会抛出异常,这一点和Object的wait一样。

9.Java内存模型

9.1 什么是Java内存模型?

JVM内存结构

  • 堆是存放类实例和数组的,通常是内存中最大的一块。比如new Object()就会产生一个实例;而数组也是保存在堆上,因为在Java中,数组也是对象。

  • 虚拟机栈

    保存局部变量和部分结果,并在方法调用和返回中起作用。

  • 方法区

    它存储每个类的结构,例如运行时常量池、字段和方法数据,以及方法和构造函数的代码,包括用于类初始化以及接口初始化的特殊方法。

  • 本地方法栈

    与虚拟机栈类似,区别在于虚拟机栈为虚拟机执行的Java方法服务,而本地方法栈则是为Native方法服务。

  • 程序计数器

    最小的一块内存区域,它的作用通常是保存当前正在执行的JVM指令地址。

  • 运行时常量池

    是方法区的一部分,包含多种常量,范围从编译时已知的数字到必须在运行时解析的方法和字段引用。

为什么需要JMM(Java Memory Model, Java内存模型)

程序最终执行的效果依赖于具体的处理器,而不同的处理器的规则又不一样,需要一个标准,让多线程运行结果可以预期,这个标准就是JMM。

JMM是什么

  • JMM是规范

    JMM是和多线程相关的一组规范,需要各个JVM的实现来遵守JMM规范。因此JMM与处理器、缓存、并发、编译器有关,它解决了CPU多级缓存、处理器优化、质量重排序等导致的结果不可预期的问题。

  • JMM是工具类和关键字的原理

    如volatile、synchronized、Lock等原理都涉及JMM。重排序、原子性、内存可见性。

9.2 什么是指令重排序?为什么要进行重排序?

假设我们写了一个Java程序,实际上语句的运行顺序可能可写的代码顺序不一致。编译器、JVM或者CPU都有可能出于优化等目的,对于实际指令执行的顺序进行调整。

重排序的好处:提高处理速度

重排序的3种情况

  1. 编译器优化

    编译器(包括JVM、JIT编译器等);重排序并不意味着可以任意排序,它需要保证重排序后,不改变单线程内的语义。

  2. CPU重排序

    CPU同样会有优化行为,即使之前的编译器不发生冲排,CPU也可能进行重排。

  3. 内存的”重排序”

    内存系统不存在真正的重排序,但是内存会带来看上去和重排序一样的效果。由于内存有缓存的存在,在JMM里表现为主内存和本地内存,而主内存和本地内存的内容可能不一致,所以这也会导致程序表现出乱序的行为。

9.3 Java中的原子操作有哪些注意事项?

原子操作指一系列操作要么全部发生,要么全部不发生,不会出现执行一半的情况。

Java中的原子操作有哪些

  • 除了long和double之外的基本类型(int、byte、boolean、short、char、float)的读/写操作,都天然的具备原子性
  • 所有引用reference的读/写操作
  • 加了volatile后,所有变量的读/写操作(包含long/double)
  • java.concurrent.Atomic包中的一部分类的一部分方法,比如AtomicInteger的incrementAndGet

long和double的原子性

long和double的值需要占用64位的内存空间,而对于64位值的写入,可以分为两个32位的操作进行。因此,本来是一个整体的赋值操作,就可能被拆分为低32位和高32位两个操作。如果在这两个操作之间发生了其他线程对这个值的读操作,就可能会读到一个错误、不完整的值。

JVM的开发者可以自由选择是否把64位的long和double的读写操作作为原子操作去实现,并且规范推荐JVM将其实现为原子操作。

原子操作 + 原子操作 != 原子操作

9.4 什么是内存可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* 内存可见性问题
*/
public class VisibilityProblem {
int a = 10;
int b = 20;

private synchronized void change() {
a = 30;
b = a;
}

private synchronized void print() {
System.out.println("b=" + b + ";a=" + a);
}

public static void main(String[] args) {
while (true) {
VisibilityProblem visibilityProblem = new VisibilityProblem();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
visibilityProblem.change();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
visibilityProblem.print();
}
}).start();
}
}
}
  • 第1种情况:假设第1个线程,也就是执行change的线程先运行,并且运行完毕了,然后,第2个线程开始运行,打印出b=30;a=30
  • 第2种情况:与第1种情况相反。因为线程先start,并不代表它真的先执行,所以第2种情况是第2个线程先打印b=20;a=10,然后第1个线程再去进行change
  • 第3种情况:它们几乎同时运行,所以会出现交叉的情况。如第1个线程的change执行到一半,已经把a的值改为30了,而b的值还未来得及修改,此时第2个线程就开始打印,即打印结果为b=20;a=30
  • 第4种情况:发生可见性问题,a的值已经被第1个线程修改了,但是其他线程却看不到,由于a的最新值没能及时同步过来,打印出b=30;a=10

volatile关键字解决可见性问题

synchronized不仅保证了原子性,还保证了可见性

synchronized不仅保证了临界区内最多同时只有一个线程执行操作,同时还保证了在前一个线程释放锁之后,之前所做的所有修改,都能被获得同一个锁的下一个线程所看到,也就是能读取到最新的值。

9.5 主内存与工作内存的关系

CPU有多级缓存,导致读的数据过期

为了提高CPU的整体运行效率,减少空闲时间,在CPU和内存之间会有cache层(缓存层)。虽然缓存的容量比内存小,但是缓存的速度却比内存的速度要快得多,其中L1缓存的速度仅次于寄存器的速度。

线程间对于共享变量的可见性问题,并不是由多核引起的,而是由多级缓存引起的。每个核心在获取在获取数据时,都会将数据从内存一层层往上读取,同样,后续对于数据的修改也是先写入到自己的L1缓存中,然后等待时机再逐层往下同步,直到最终刷回内存。

假设core1修改了变量a的值,并写入到了core1的L1缓存里,但是还没来得及继续往下同步,由于core1有它自己的L1缓存,core4是无法直接获取core1的L1缓存的值,那么此时对于core4而言,变量a的值就不是core1修改后的最新的值,core4读取到的可能是一个过期的值,从而引起多线程时的可见性问题发生。

JMM的抽象:主内存和工作内存

每个线程都只能直接接触到工作内存,无法直接操作主内存,而工作内存中所保存的正是主内存的共享变量的副本,主内存和工作内存之间的通信是JMM控制的。

主内存和工作内存的关系

  1. 所有的变量都存储在主内存中,同时每个线程拥有自己独立的工作内存,而工作内存中的变量的内容内容是主内存中该变量的拷贝。
  2. 线程不能直接读/写主内存中的变量,但可以操作自己工作内存中的变量,然后再同步到主内存中,这样,其他线程就可以看到本次修改。
  3. 主内存是由多个线程所共享的,但线程之间不共享各自的工作内存,如果线程间需要通信,则必须借助主内存主内存来完成。

9.6 什么是happens-before规则?

Happens-before关系是用来描述可见性相关问题的:如果第一个操作happens-before第二个操作,那么可以认为第一个操作对于第二个操作一定是可见的。

Happens-before的规则

  1. 单线程规则

    在一个单独的线程中,按照程序代码的顺序,先执行的操作happens-before后执行的操作。

    但只要重排序后的结果依然符合happens-before关系,也就是能保持可见性的话,并不会限制重排序的发生。

  2. 锁操作规则(synchronized和Lock接口)

    如果操作A是解锁,而操作B是对同一个锁的加锁,那么hb(A,B)。

  3. volatile变量规则

    对于一个volatile变量的写操作happens-before后面对该变量的读操作。

  4. 线程启动规则

    Thread对象的start方法happens-before此线程run方法中的每一个操作。

  5. 线程join规则

    join可以让线程之间等待,假设线程A通过调用threadB.start()启动了一个新线程B,然后调用threadB.join(),那么线程A将一直等待到线程B的run方法结束(不考虑中断等特殊情况),然会join方法才返回。在join方法返回后,线程A中的所有后续操作都可以看到线程B的run方法执行的所有操作的结果,也就是线程B的run方法里面的操作hanppens-before线程A的join之后的语句。

  6. 中断规则

    对线程interrupt方法的调用happens-before检测该线程的中断事件。

  7. 并发工具类的规则

    • 线程安全的并发容器(如ConcurrentHashMap)在get某个值时一定能看到在此之前发生的put等存入操作的结果。
    • 信号量(Semaphore)会释放许可证,也会获取许可证。释放许可证的操作happens-before获取许可证的操作。
    • Future:当Future的get方法得到结果的时候,一定可以看到之前任务中所有的操作。
    • 线程池:提交任务的操作happens-before任务的执行。

9.7 volatile的作用是什么?与synchronized有什么异同?

volatile是Java中的一个关键字,是一种同步机制。当某个变量是共享变量,且这个变量被volatile修饰,那么在修改了这个变量的值之后,再读取该变量的值时,可以保证获取到的是修改后的最新的值。

相比于synchronized或者Lock,volatile更加轻量,因为使用volatile不会发生上下文切换等开销很大的情况,不会让线程阻塞。

volatile不适用于a++

volatile不适合运用于需要保证原子性的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* a++ 不适合使用volatile
*/
public class DontVolatile implements Runnable {
volatile int a;
AtomicInteger realA = new AtomicInteger();

public static void main(String[] args) throws InterruptedException {
DontVolatile dontVolatile = new DontVolatile();
Thread thread1 = new Thread(dontVolatile);
Thread thread2 = new Thread(dontVolatile);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(dontVolatile.a);
System.out.println(dontVolatile.realA);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
a++;
realA.incrementAndGet();
}
}
}

1926
2000

适用场合1:布尔标记位

第一个例子的操作是a++,这是个复合操作,不具备原子性,而下面这个例子只是把flag设置为true,这样的赋值操作本身就是具备原子性的,所以适合使用volatile。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 可以使用volatile的场景 布尔标记位
*/
public class YesVolatile1 implements Runnable {
volatile boolean flag = false;
AtomicInteger realA = new AtomicInteger();

public static void main(String[] args) throws InterruptedException {
YesVolatile1 yesVolatile1 = new YesVolatile1();
Thread thread1 = new Thread(yesVolatile1);
Thread thread2 = new Thread(yesVolatile1);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(yesVolatile1.flag);
System.out.println(yesVolatile1.realA);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
realA.incrementAndGet();
setDone();
}
}
private void setDone() {
flag = true;
}
}

true
2000

适用场合2:作为触发器,保证其他变量的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map configOptions;
char[] configText;
volatile boolean initialized = false;
. . .
// In thread A
configOptions = new HashMap();
configText = readConfigFile(fileName);
processConfigOptions(configText, configOptions);
initialized = true;
. . .
// In thread B
while (!initialized) 
  sleep();
// use configOptions

happens-before具有传递性,根据happens-before的单线程规则,线程A中configOptions的初始化happens-before对iniialized变量的写入,而线程B中对initialized的读取happens-before对configOptions变量的使用,同时根据happens-before关系的volatile规则,线程A中对initialized的写入为true的操作happens-before线程B中随后对initialized变量的读取。

volatile的作用

  1. 保证可见性

    对于一个volatile变量的写操作happen-before后面对该变量的读操作,即如果变量被volatile修饰,那么每次修改之后,接下来在读取这个变量的时候一定能读到该变量的最新值。

  2. 禁止重排序

    as-if-serial:不管怎么重排序,单线程的执行结果不变。多线程情况下的重排序可能会导致严重的线程安全问题。使用volatile关键字可以在一定程度上禁止这种重排序。

volatile和synchronized的关系

相似性:volatile可以看作是一个轻量版的synchronized,如果一个共享变量如果自始至终只被各个线程赋值和读取,而没有其他操作的话,那么就可以用volatile来代替synchronized或者代替原子变量。

不可代替:volatile是不能代替synchronized的,volatile并没有提供原子性和互斥性的。

性能方面:volatile的读写操作都是无锁的,比synchronized性能更好。

9.8 单例模式的双重检查锁模式为什么必须加volatile?

单例模式:保证一个类只有一个实例,并且提供一个可以全局访问的入口。

为什么需要使用单例模式?

  1. 为了节省内存、节省计算。

  2. 保证结果正确。

  3. 方便管理。

    有一个私有的Singleton类型的singleton对象;同时构造方法也是私有的,为了防止他人调用构造函数来生成实例;还有一个public的getInstance方法,可通过这个方法获取到单例。

双重检查锁模式的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* volatile 双重检查锁
*/
public class Singleton {
private static volatile Singleton singleton;

private Singleton() {}

public static Singleton getInstance() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}

进行了两次if(singleton==null)检查,即”双重检查锁”。假设有两个线程同时到达synchronized语句块,那么实例化代码只会由其中先抢到锁的线程执行一次,而后抢到锁的线程会在第二个if判断中发现singleton不为null,所以跳过创建实例的语句。再后面的其他线程再来调用getInstance方法时,只需判断第一次的if(singleton==null),然后跳过整个if块,直接return实例化后的对象。

为什么需要双重检查?

如果有两个线程同时调用getInstance方法,由于singleton是空的,因此两个线程可以通过第一重if的检查,然后由于锁的存在,会有一个线程先进入同步语句,并进入第二重检查,而另外一个线程就在外面等待。不过当第一个线程执行完new Singleton()语句后,就会退出synchronized保护的区域,这时如果没有第二重if(singleton==null)判断的话,那么第二个线程也会创建一个实例,破环了单例。

如果去掉第一个检查,那么所有线程都会串行执行,效率低下。

双重检查模式中为什么需要使用volatile关键字?

singleton = new Singleton() 并非是一个原子操作,在JVM中至少做了以下3件事。

  1. 给singleton分配内存空间
  2. 调用Singleton的构造函数,来初始化singleton
  3. 将singleton对象指向分配的内存空间(执行完这步singleton就不是null了)

因为存在指令排序的优化,所以第2,3步的顺序是不能保证的,最终的执行顺序可能是1-2-3,也有可能是1-3-2。

如果是1-3-2:

使用volatile之后,相当于是表明了该字段的更新可能是在其他线程中发生的,在JDK5及后续版本所使用的JMM中,在使用了volatile后,会在一定程度禁止相关语句的重排序。

10.CAS

10.1 什么是CAS?

CAS(Compare And Swap),是一种思想,为了保证并发安全,可以使用互斥锁,而CAS的特点就是避免使用互斥锁,当多个线程同时使用CAS更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试。

CAS的思路

CAS相关的指令是具备原子性的,”比较和交换“操作在执行期间不会被打断。

CAS有3个操作数:内存值V,预期值A、要修改的值B。当预期值A和当前的内存值V相同时,才将内存值修改为B。

CAS会提前假定当前内存值V应该等于值A,而值A往往是之前读取到当时的内存值V,如果发现当前的内存值V恰好是值A的话,那CAS就会把内存值V改成B。如果执行CAS时发现此时的内存值V不等于值A,则说明在刚才计算B的期间内,内存值已经被其他线程修改过了,那么本次CAS就不应该再修改了。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 模拟CAS操作 等价代码
*/
public class SimulatedCAS implements Runnable {
private int value;

public synchronized int compareAngSwap(int expectedValue, int newValue) {
int oldValue = value;
if (oldValue == expectedValue) {
value = newValue;
System.out.println(Thread.currentThread().getName() + "执行成功!");
}
return oldValue;
}

public static void main(String[] args) throws InterruptedException {
SimulatedCAS simulatedCAS = new SimulatedCAS();
simulatedCAS.value = 100;
Thread thread1 = new Thread(simulatedCAS);
Thread thread2 = new Thread(simulatedCAS);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(simulatedCAS.value);
}

@Override
public void run() {
compareAngSwap(100, 150);
}
}

10.2 CAS的应用

并发容器

  1. ConcurrentHashMap

    putVal方法部分代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
        final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
    tab = initTable();
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
    ......
    }

    casTabAt

    1
    2
    3
    4
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    U是Unsafe类型的,Unsafe类包含compareAndSwapInt、compareAndSwapLong、compareAndSwapObject等和CAS密切相关的native层的方法,其底层正是利用CPU对CAS指令的支持实现的。

  2. ConcurrentLinkedQueue

    非阻塞并发队列ConcurrentLinkedQueue的offer方法里也有CAS的身影,offer方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;
    if (q == null) {
    if (p.casNext(null, newNode)) {
    if (p != t)
    casTail(t, newNode);
    return true;
    }
    }
    else if (p == q)
    p = (t != (t = tail)) ? t : head;
    else
    p = (p != t && t != (t = tail)) ? t : q;
    }
    }

数据库

在更新数据时,可以利用version字段在数据库中实现乐观锁和CAS操作,而在获取和修改数据时都不需要加悲观锁。

1
2
3
4
5
UPDATE student
SET
name = ‘小王’, version = 2        
WHERE  
id = 10 AND version = 1

先去比较version是不是最开始获取到的1,如果和初始值相同才去进行name字段的修改,同时也要把version的值加1。

原子类

在原子类中,如AtomicInteger,也使用了CAS。如AtomicInteger的getAndAdd方法。

1
2
3
4
5
6
7
8
9
10
11
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
var1 o object 将要修改的对象,传入的是this,也就是atomicInteger这个对象本身
var2 offset offset 偏移量,借助它就可以获取到oldvalue的数值
var5 v expectedValue 代表”期望值”
var5+var4 v+delta newValue 希望修改为的新值,var4就是希望原子类所改变的数值,比如可以传入+1,也可以传入-1

Unsafe的getAndAddInt方法是通过循环+CAS的方式来实现的,在此过程中,它会通过compareAndSwapInt方法来尝试更新value的值,如果更新失败就重新获取,然后再次更新,直到更新成功。

10.3 CAS有什么缺点?

ABA问题

CAS检查的并不是值有没有发生过变化,而是去比较这当前值和预期值是不是相等,如果变量的值从旧值A变成了新值B再变回旧值A,由于最开始是值A和现在的值A是相等的,所以CAS会认为变量的值在此期间没有发生过变化。所以,CAS并不能检测在此期间值是不是被修改过,它只能检查出现在的值和最初的值是不是一样

在变量自身之外,再添加一个版本号,A->B->A,1A->2B->3A,可以通过版本号来判断值是否变化过。

atomic包中提供了AtomicStampedReference这个类,它是专门用来解决ABA问题,解决思路正是利用版本号,AtomicStampedReference会维护一种类似<Object,int>的数据结构,其中的int就是用于计数的,也就是版本号。

自旋时间过长

由于单次CAS不一定能执行成功,所以CAS往往是配合着循环来实现的,有的时候甚至是死循环,不停重试,直到竞争不激烈的时候,才能修改成功。

如果是高并发场景,有可能导致CAS一直操作不成功,循环的时间会越来越长。CPU资源一直在被消耗,会对性能产生很大的影响,高并发情况下,通常CAS的效率是不高的。

范围不能灵活控制

通常执行CAS的时候,是针对某一个,而不是多个共享变量的,多个变量之间是独立的,简单的把原子操作组合到一起,并不具备原子性。

有一个解决方案就是利用一个新的类,来整合刚才这一组共享变量,这个新的类中的多个成员变量就是刚才的那多个共享变量,然后再利用atomic包中的AtomicReference来把这个新对象整体进行CAS操作。

相比之下,如使用synchronized关键字时,如果想把更加的代码加锁,只需把更多的代码放到同步代码块里面就可以了。

11.死锁问题

11.1 写一个必然死锁的例子

什么是死锁?

发生在并发中,两个或多个线程(或进程)被无限期的阻塞,相互等待对方手中资源。

例子

两个线程:

多个线程:

死锁的影响

数据库中:

在执行一个事务的时候可能需要获取多把锁,并一直持有这些锁直到事务完成。在某个事务中持有的锁可能在其他事务中也需要,因此在两个事务之间有可能会发生死锁的情况。当数据库检测到这一组事务发生了死锁,根据策略的不同,可能会选择放弃一个事务,被放弃的事务就会释放掉它所持有的锁,从而使其它事务继续进行。此时程序可以重新执行被强行终止的事务。

JVM中:

JVM并不会自动进行处理,发生几率不高但危害大,在巨量的次数面前,整个系统发生问题的几率也会被放大。

发生死锁的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 必然死锁的例子
*/
public class MustDeadLock implements Runnable {
public int flag;
static Object object1 = new Object();
static Object object2 = new Object();

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "的flag为" + flag);
if (flag == 1){
synchronized (object1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object2){
System.out.println("线程" + Thread.currentThread().getName()+"获取到了两把锁!");
}
}
}
if (flag == 2){
synchronized (object2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object1){
System.out.println("线程" + Thread.currentThread().getName()+"获取到了两把锁!");
}
}
}
}

public static void main(String[] args) {
MustDeadLock mustDeadLock1 = new MustDeadLock();
MustDeadLock mustDeadLock2 = new MustDeadLock();
mustDeadLock1.flag = 1;
mustDeadLock2.flag = 2;
Thread thread1 = new Thread(mustDeadLock1,"thread1");
Thread thread2 = new Thread(mustDeadLock2,"thread2");
thread1.start();
thread2.start();
}
}


线程thread1的flag为1
线程thread2的flag为2
  • 当第1个线程运行的时候,它会发现自己的flag是1,所以它会尝试先获得object1这把锁,然后休眠500毫秒。
  • 在线程1启动并休眠的期间,线程2同样会启动。由于线程2的flag是2,所以线程2首先会去获取object2这把锁,然后休眠500毫秒。
  • 当线程1的500毫秒休眠时间结束,它会尝试去获取object2这把锁,此时object2这把锁正在被线程2持有,所以线程1无法获取到object2。
  • 紧接着线程2也会苏醒过来,它将尝试获取object1这把锁,此时object1已被线程1持有。

线程1卡在获取object2这把锁的位置,而线程2卡在获取object1这把锁的位置。

11.2 发生死锁的4个必要条件

  • 互斥条件

    每个资源每次只能被一个线程(或进程)使用。

  • 请求与保持条件

    当一个线程因请求资源而阻塞时,则需对已获得的资源保持不放。

  • 不剥夺条件

    线程已获得的资源,在未使用完之前,不会被强行剥夺。

  • 循环等待条件

    只有若干个线程之间形成一种头尾相接的循环等待资源关系时,才有可能形成死锁。

11.3 如何定位死锁?

  • 命令:jstack

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    D:\IDEAProbject\JavaStudyDemo\Multithreading>jps
    3044 Launcher
    4084 MustDeadLock
    11816 Jps

    D:\IDEAProbject\JavaStudyDemo\Multithreading>jstack 4084
    Found one Java-level deadlock:
    =============================
    "thread2":
    waiting to lock monitor 0x000000001c601e68 (object 0x0000000776319ce0, a java.lang.Object),
    which is held by "thread1"
    "thread1":
    waiting to lock monitor 0x000000001c6047a8 (object 0x0000000776319cf0, a java.lang.Object),
    which is held by "thread2"

    Java stack information for the threads listed above:
    ===================================================
    "thread2":
    at com.example.MustDeadLock.run(MustDeadLock.java:34)
    - waiting to lock <0x0000000776319ce0> (a java.lang.Object)
    - locked <0x0000000776319cf0> (a java.lang.Object)
    at java.lang.Thread.run(Thread.java:748)
    "thread1":
    at com.example.MustDeadLock.run(MustDeadLock.java:22)
    - waiting to lock <0x0000000776319cf0> (a java.lang.Object)
    - locked <0x0000000776319ce0> (a java.lang.Object)
    at java.lang.Thread.run(Thread.java:748)

    Found 1 deadlock.
  • 代码:ThreadMXBean

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    /**
    * 必然死锁的例子
    */
    public class MustDeadLock implements Runnable {
    public int flag;
    static Object object1 = new Object();
    static Object object2 = new Object();

    @Override
    public void run() {
    System.out.println("线程" + Thread.currentThread().getName() + "的flag为" + flag);
    if (flag == 1) {
    synchronized (object1) {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    synchronized (object2) {
    System.out.println("线程" + Thread.currentThread().getName() + "获取到了两把锁!");
    }
    }
    }
    if (flag == 2) {
    synchronized (object2) {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    synchronized (object1) {
    System.out.println("线程" + Thread.currentThread().getName() + "获取到了两把锁!");
    }
    }
    }
    }

    public static void main(String[] args) throws InterruptedException {
    MustDeadLock mustDeadLock1 = new MustDeadLock();
    MustDeadLock mustDeadLock2 = new MustDeadLock();
    mustDeadLock1.flag = 1;
    mustDeadLock2.flag = 2;
    Thread thread1 = new Thread(mustDeadLock1, "thread1");
    Thread thread2 = new Thread(mustDeadLock2, "thread2");
    thread1.start();
    thread2.start();
    Thread.sleep(2000);
    ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
    if (deadlockedThreads != null && deadlockedThreads.length > 0) {
    for (int i = 0; i < deadlockedThreads.length; i++) {
    ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
    System.out.println(threadInfo);
    }
    }
    }
    }

    线程thread1的flag为1
    线程thread2的flag为2
    "thread2" Id=21 BLOCKED on java.lang.Object@27d6c5e0 owned by "thread1" Id=20
    "thread1" Id=20 BLOCKED on java.lang.Object@4f3f5b24 owned by "thread2" Id=21

11.4 解决死锁问题的策略

  • 避免策略

    优化代码逻辑,从根本上消除发生死锁的可能性,如调整锁的获取顺序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    /**
    * 调整锁的获取顺序来避免死锁问题
    */
    public class TransferMoney implements Runnable {
    static class Account {
    int balance;

    public Account(int balance) {
    this.balance = balance;
    }
    }

    int flag;
    static Account a = new Account(500);
    static Account b = new Account(500);

    @Override
    public void run() {
    if (flag == 1) {
    transferMoney(a, b, 200);
    }
    if (flag == 0) {
    transferMoney(b, a, 200);
    }
    }

    public static void transferMoney(Account from, Account to, int account) {
    int fromHash = System.identityHashCode(from);
    int toHash = System.identityHashCode(to);
    // 模拟网络延迟
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    if (fromHash > toHash) {
    // 先获取两把锁,然后开始转账
    synchronized (to) {
    synchronized (from) {
    if (from.balance - account < 0) {
    System.out.println("余额不足,转账失败!");
    return;
    }
    from.balance -= account;
    to.balance += account;
    System.out.println("成功转账" + account + "元!");
    }
    }
    }
    if (toHash > fromHash) {
    // 先获取两把锁,然后开始转账
    synchronized (from) {
    synchronized (to) {
    if (from.balance - account < 0) {
    System.out.println("余额不足,转账失败!");
    return;
    }
    from.balance -= account;
    to.balance += account;
    System.out.println("成功转账" + account + "元!");
    }
    }
    }

    }

    public static void main(String[] args) throws InterruptedException {
    TransferMoney r1 = new TransferMoney();
    TransferMoney r2 = new TransferMoney();
    r1.flag = 1;
    r2.flag = 0;
    Thread t1 = new Thread(r1);
    Thread t2 = new Thread(r2);
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println("a的余额" + a.balance);
    System.out.println("b的余额" + b.balance);
    }
    }

    业务实际上不在乎获取锁的顺序,调整获取锁的顺序,使先获取的账户是”转入”或”转出”无关,而是使用HashCode的值来决定顺序,从而保证线程安全。但依然有极小的概率会发生HashCode相同的情况,在实际生产中,需要排序的往往是一个实体类,而一个实体类一般都会具有主键ID,主键ID具有唯一、不重复的特点,直接使用主键ID排序,按照主键ID的大小来决定获取锁的顺序,以确保避免死锁。

  • 检测与恢复策略

    先允许系统发生死锁,然后再解除。例如系统可以在每次调用锁的时候,都记录下来调用信息,形成一个”锁的调用链图”,然后隔一段时间就用死锁检测算法来检测一下,搜索这个图中是否存在环路,一旦发生死锁,就可以用死锁恢复机制,解开死锁,进行恢复。

    1. 线程终止

      系统逐个去终止已经陷入死锁的线程,线程被终止,同时释放资源,死锁就会被解开。有各种各样的算法和策略,根据实际业务进行调整。

      • 优先级

        先终止优先级低的线程。

      • 已占用资源、还需要的资源

        如果某线程已经占有了一大堆资源,只需要最后一点点资源就可以顺利完成任务,那么系统会优先终止别的线程来优先促成该线程的完成。

      • 已经运行时间

        如果某线程已经运行很多天了,很快就要完成任务了,可以让那些刚刚开始运行的线程终止,并在之后把它们重新启动,这样成本更低。

    2. 资源抢占

      不需要把整个线程终止,而是只需要把它已经获得的资源进行剥夺,如让线程回退几步、释放资源,这样就不需要终止掉整个线程,成本更低。但如果算法不好的话,我们抢占的那个线程可能一直是同一个线程,就会造成饥饿线程,即这个线程一直被剥夺它已经得到的资源,那么它就长期得不到运行。

  • 鸵鸟策略

    如果系统发生死锁的概率极低,并且一旦发生其后果不是特别严重,可以先选择忽略它,直到发生死锁后,再人工修复。

12.final关键字

12.1 final的三种用法

final修饰变量

final修饰的变量,一旦被赋值就不能被修改了

目的:1.设计角度 2.线程安全

赋值时机:

  • 成员变量,类中的非static修饰的属性

    1. 在变量的等号右边直接赋值

      1
      2
      3
      public class FinalFieldAssignment1 {
      private final int finalVar = 0;
      }
    2. 在构造函数中赋值

      1
      2
      3
      4
      5
      6
      class FinalFieldAssignment2 {
      private final int finalVar;
      public FinalFieldAssignment2() {
      finalVar = 0;
      }
      }
    3. 在类的构造代码块中赋值(不常用)

      1
      2
      3
      4
      5
      6
      class FinalFieldAssignment3 {
      private final int finalVar;
      {
      finalVar = 0;
      }
      }
  • 静态变量,类中的被static修饰的属性

    1. 在声明变量的等号右边直接赋值

      1
      2
      3
      public class StaticFieldAssignment1 {
      private static final int a = 0;
      }
    2. 在一个静态的static初始代码块中赋值

      1
      2
      3
      4
      5
      6
      class StaticFieldAssignment2 {
      private static final int a;
      static {
      a = 0;
      }
      }

    static的final变量不能在构造函数中进行赋值

  • 局部变量,方法中的变量

    使用前赋值即可

final修饰参数,意味着在方法内部无法对参数进行修改。

1
2
3
4
5
6
public class FinalPara {
public void withFinal(final int a) {
System.out.println(a);//可以读取final参数的值
// a = 9; //编译错误,不允许修改final参数的值
}
}

final修饰方法

  1. 提高效率,早期的Java版本,会把final修饰的方法转为内嵌调用,消除方法调用的开销。
  2. final修饰的方法不可以被重写

final的private方法

1
2
3
4
5
6
7
8
public class PrivateFinalMethod {
private final void privateEat() {
}
}
class SubClass2 extends PrivateFinalMethod {
private final void privateEat() {//编译通过,但这并不是真正的重写
}
}

类中的所有private方法都是隐式的指定为自动被final修饰的,由于这个方法是private类型的,所以对于子类而言,根本获取不到父类这个方法,更别说重写了。所以其实子类并没有真正意义上的去重写父类的privateEat方法,只是方法名碰巧一样而已。

final修饰类

final修饰的类不可被继承

类是final的,不代表里面的属性就会自动加上final。

final的类里面,所有的方法,不论是public、private还是其他权限修饰符修饰的,都会自动的、隐式的被指定为是final的。

12.2 为什么加了final却依然无法拥有”不变性”?

如果对象在被创建之后,其状态就不能修改了,那么它就具备”不变性”。

final修饰对象时,只是引用不可变。当用final去修饰一个指向对象类型(而不是指向8种基本数据类型)的变量的时候,那么final起到的作用只是保证则个变量的引用不可变,而对象本身的内容依然是可变化的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Test { 
int p = 20;
public static void main(String args[]){
final Test t = new Test();
t.p = 30;
System.out.println(t.p);
}
}
30

class Test {
public static void main(String args[]) {
final int arr[] = {1, 2, 3, 4, 5}; // 注意,数组 arr 是 final 的
for (int i = 0; i < arr.length; i++) {
arr[i] = arr[i]*10;
System.out.println(arr[i]);
}
}
}
10
20
30
40
50

final修饰一个指向对象的变量的时候,对象本身的内容依然是可以变化的。

final和不可变的关系

final可以确保变量的引用保持不变,但是不变性意味着对象一旦创建完毕就不能改变其状态,它强调的是对象内容本身,而不是引用。

1
2
3
4
5
6
7
8
9
10
11
public class ImmutableDemo {
private final Set<String> lessons = new HashSet<>();
public ImmutableDemo() {
lessons.add("第01讲:为何说只有 1 种实现线程的方法?");
lessons.add("第02讲:如何正确停止线程?为什么 volatile 标记位的停止方法是错误的?");
lessons.add("第03讲:线程是如何在 6 种状态之间转换的?");
}
public boolean isLesson(String name) {
return lessons.contains(name);
}
}

包含对象类型的成员变量的类的对象,具备不可变性的例子:对于ImmutableDemo类而言,它只有这么一个成员变量,而这个成员变量一旦构造完毕后又不能改变。

12.3 为什么String被设计为是不可变的?

1
2
3
4
5
String s = "lagou";
s = "la";

String lagou = "lagou";
lagou = lagou.subString(0, 4);

只不是建了一个新的字符串而已,并把引用重新指向。

1
2
3
4
5
6
public final class String
implements Java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
//...
}

private final的char数组value,存储着字符串的每一位字符,value一旦被赋值,引用就不能修改了;并且在String的源码中,除构造函数之外,并没有任何其他方法会修改value数组里面的内容,而value的权限是private,外部的类也访问不到,所以value是不可变的。String类是被final修饰的,所以这个String类是不会被继承的。

String不变的好处

  1. 字符串常量池

  2. 用作HashMap的key

    对于key来说,最重要的就是不可变,这样才能利用它去检索存储在HashMap里面的value。由于HashMap的工作原理是Hash,也就是散列,所以需要对象始终拥有相同的Hash值才能正常运行。

  3. 缓存HashCode

    1
    2
    /** Cache the hash code for the String */
    private int hash;

    在String类中有一个hash属性,保存的是String对象的HashCode。因为String是不可变的,所以对象一旦被创建之后,HashCode的值也就不可能变化了,就可以把HashCode缓存起来。以后每次想用到HashCode的时候,不需要重新计算,直接返回缓存过的hash的值就可以了,所以使得字符串非常适合用作HashMap的key。

  4. 线程安全

    线程安全,具备不变性的对象一定是线程安全的,避免了很多不必要的同步操作。

13.AQS

13.1 为什么需要AQS?

AQS在ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、ThreadPoolExcutor中都有运用(JDK1.8),AQS是这些类的底层原理。

AQS是一个用于构建锁、同步器等线程协作工具类的框架,有了AQS之后,可以让更上层的开发极大的减少工作量,避免重复造轮子,同时也避免了上层因处理不当而导致的线程安全问题,因为AQS把这些事情都做好了。总之,有了AQS之后,构建线程协作工具类就容易多了。

13.2 AQS内部原理

状态

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

state的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义。

比如在信号量里,state表示的是剩余许可证的数量。当某一个线程衢州一个许可证之后,state会减1。

在CountDownLatch工具类里,state表示的是需要”倒数”的数量。每次调用CountDown方法时,state就会减1,直到减为0就代表这个”门闩”被放开。

在ReentrantLock中表示的是锁的占有情况。最开始是0,表示没有任何线程占有锁,如果state变成1,就代表这个锁已经被某一个线程所持有了。因为ReentrantLock是可重入的,同一个线程可以再次拥有这把锁就叫重入。如果这个锁被同一个线程多次获取,那么state就会逐渐的往上加,state的值表示重入的次数。在释放的时候也是逐步递减。

compareAndSetState:

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

利用了Unsafe里面的CAS操作,利用CPU指令的原子性保证了这个操作的原子性。

setState:

1
2
3
protected final void setState(int newState) {
state = newState;
}

对于基本类型的变量进行直接赋值时,如果加了volatile就可以保证它的线程安全。

FIFO队列

先进先出队列,主要的作用是存储等待的线程。当多个线程去竞争同一把锁的时候,就需要用排队机制把那些没拿到锁的线程串在一起;而当前面的线程释放锁之后,这个管理器就会挑选一个合适的线程来尝试抢刚刚释放的那把锁。

队列内部是双向链表的形式:

在队列中,分别用head和tail来表示头节点和尾节点,两者在初始化的时候指向一个空节点。头节点可以理解为”当前持有锁的线程”,而在头节点之后的线程被阻塞了,它们会等待被唤醒,唤醒也是由AQS负责操作的。

获取/释放方法

  • 获取方法

    获取操作通常会依赖state变量的值,获取方法在不同类中代表不同的含义,但往往和state值相关,也经常会让线程进入阻塞状态。

    如ReentrantLock中的lock方法,执行时如果发现state不等于0且当前线程不是持有锁的线程,那么就代表这个锁已经被其他线程所持有了,这个时候,当然获取不到锁,于是就让该线程进入阻塞状态。

    Semaphore中的acquire,作用是获取许可证。如果state是正数,那么代表还有剩余的许可证,数量足够就可以获取成功;但如果state是0,则代表已经没有更多的空余许可证了,会进入阻塞状态。

    CountDownLatch获取方法就是await方法,作用是”等待,直到倒数结束”。执行await的时候会判断state的值,如果state不等于0,线程就进入阻塞状态,直到其他线程执行倒数方法把state减为0,此时就代表这个门闩放开了,所以之前阻塞的线程就会被唤醒。

  • 释放方法

    释放方法通常是不会阻塞线程的。

    比如在Semaphore信号量,释放就是release方法,release()方法的作用是去释放一个许可证,会让state加1;而在CountDownLatch里面,释放就是countDown方法,作用是倒数一个数,让state减1。

13.3 AQS在CountDownLatch中应用原理

AQS用法

  1. 新建一个自己的线程协作工具类,在内部写一个Sync类,该类继承AbstractQueueSynchronizer,即AQS。
  2. 在Sync类中,根据是否是独占来重写对应的方法。独占,则重写tryAcquire和tryRelease等方法;非独占,则重写tryAcquireShared和tryReleaseShared等方法。
  3. 在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用AQS对应的方法,独占调用acquire或release等方法;非独占调用acquireShared或releaseShared或acquireSharedInterruptibly。

AQS在CountDownLatch的应用

在CountDownLatch里面有一个子类Sync,这个类正是继承自AQS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//省略其他代码...
}

构造函数:

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

CountDown构造函数将传入的count最终传递到AQS内部的state变量,给state赋值,state就代表还需要倒数的次数。

getCount:

1
2
3
public long getCount() {
return sync.getCount();
}

最终获取到的就是Sync中state的值。

countDown:

1
2
3
4
5
6
7
8
9
10
11
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

Sync中的tryReleaseShared(),doReleaseShared()对之前阻塞的线程进行唤醒。

await:

该方法时CountDownLatch的”获取”方法,调用await会把线程阻塞,直到倒数为0才能继续执行。

1
2
3
4
5
6
7
8
9
10
11
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

Sync中的tryAcquireShared(),doAcquireSharedInterruptibly()会让线程进入阻塞状态。

总结

当线程调用CountDownLatch的await方法时,便会尝试获取”共享锁”,不过一开始通常获取不到锁,于是线程被阻塞。”共享锁”可以获取到的条件是”锁计数器”的值为0,而”锁计数器”的初始值为count,当每次调用CountDownLatch对象的countDown方法时,也可以把”锁计数器”-1。直到”锁计数器”为0,于是之前等待的线程就会继续运行了,并且此时如果再有线程想调用await方法时也会立刻放行,不会再去做任何阻塞操作了。

请作者喝瓶肥宅快乐水