什么是JUC

线程是 Java 中浓墨重彩的一笔。

JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

image-20200504091645858

这里列举两个之前学习 Java基础 用到过的两个接口:

image-20200504091815405

image-20200504091917934

基础概念

线程和进程

  • 进程:一个程序的一次动态执行过程,.exe
  • 线程:进程可细分的最小单位,一个进程往往可以包含多个线程

Java 默认有几个线程?

​ 2 个 !

  • main线程

  • GC线程

对于Java有哪几种开启线程的方式?

  • Thread
  • Runnable
  • Callable

Java真的可以开启线程嘛?

​ 并不行,需要使用 public native void start0() 方法来调用本地方法使用其他语言开启线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public synchronized void start() {

if (threadStatus != 0)
throw new IllegalThreadStateException();

group.add(this);

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}

private native void start0();

并发和并行

  • 并发:多个线程操作同一个资源(交替执行

    本质:充分利用 CPU 的资源

  • 并行:多个线程 同时 操作一个资源(同时执行

    查看 CPU 核心数(CPU核心数代表最多可以并行操作多少线程)

image-20200504094108209
1
2
3
4
5
public static void main(String[] args) {
//获取cpu核心数
//cpu密集型、io密集型
System.out.println(Runtime.getRuntime().availableProcessors()); //12
}

线程有几个状态?

​ 六个

1
2
3
4
5
6
7
8
public enum State {
NEW, //新生
RUNNABLE, //运行
BLOCKED, //阻塞
WAITING, //等待,死死的等
TIMED_WAITING, //超时等待
TERMINATED; //终止
}

Wait 和 Sleep 的区别:

wait sleep
Object Thread
会释放锁 不会释放(睡着了都怎么释放)
必须在同步代码块中使用 可以在任何地方睡
不需要捕获异常 必须捕获异常

Lock锁(重点)

synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SaleTicket {

public static void main(String[] args) {
Ticket ticket = new Ticket();

// Runnable @FunctionalInterface 函数式接口,jdk1.8之后可以使用 lambda 表达式
// ()->{}

//线程A
new Thread(()->{
for (int i = 0; i < 50; i++) {
ticket.sale();
}
},"A").start();
//线程B
new Thread(()->{
for (int i = 0; i < 50; i++) {
ticket.sale();
}
},"B").start();

}

}

/**
* 资源类
* OOP思想
* 只写属性、方法
*/
class Ticket{
//属性
private int number = 30;
//方法
public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张票,剩余:" + number);
}
}
}

加入 synchronized 关键字之前:

image-20200504102033618

加入 synchronized 关键字之后:

image-20200504102234831

Lock接口

  • 使用方法

    image-20200504102537490

  • 实现类

image-20200504102513162

  • 公平锁和非公平锁

    • 公平锁:十分公平,可以先来后到
    • 非公平锁:十分不公平,可以插队(默认)
  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    class Ticket{
    //属性
    private int number = 30;

    Lock lock = new ReentrantLock();

    //方法
    public void sale() {

    //加锁
    lock.lock();

    try {
    //业务代码
    if (number > 0) {
    System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张票,剩余:" + number);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }finally {
    //解锁
    lock.unlock();
    }
    }
    }

    运行结果与使用synchronized 一致

  • synchronizedLock锁 的区别

synchronized Lock
是一个内置的关键字 是一个Java类
无法判断锁的状态 可以判断是否获取到了锁
会自动释放锁 必需要手动释放(不释放会出现死锁
Thread1(获得锁、阻塞)、Thread2(等待,傻傻的等) 不一定会等下去
可重入锁、不可中断的、非公平的 可重入锁、可以中断的、非公平的(可以设置)
适合锁少量的同步代码 适合锁大量的同步代码

锁是什么?如何判断锁的是谁?

普通方法上锁,锁住的是调用方法的对象实例

静态方法上锁,锁住的是整个类

生产者和消费者问题

问题描述:

本质是线程之间的通信问题

两个线程交替执行,A、B 同时操作同一个变量

A:num + 1

B:num - 1

要保证 num 在 0、1 之间交替

大有门道

Synchronized版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class test {

public static void main(String[] args) {
Data data = new Data();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}

}

/**
* 是一个资源类
* 只提供要操作的属性和方法,具有较低耦合性
* 六字口诀:等待、业务、通知
*/
class Data{

private int number = 0;

public synchronized void increment() throws InterruptedException {
//线程数大于二时,用 if 会出大问题
while (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,+1 完毕
this.notifyAll();
}

public synchronized void decrement() throws InterruptedException {
//线程数大于二时,用 if 会出大问题
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,-1 完毕
this.notifyAll();

}
}

image-20200505090908390

问题:

如果存在 A、B、C、D 四个线程,线程还是安全的嘛

答:不安全,存在虚假唤醒问题

解决方法: if —> while

​ 用 if 判断时,当被其他线程唤醒就会直接接着执行后面的指令

​ 用while判断时,当被其他线程唤醒后还会再次进行判断

image-20200505091658081

JUC版

只是对 synchronized版本 的覆盖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class test {

public static void main(String[] args) {
Data data = new Data();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}

}

/**
* 是一个资源类
* 只提供要操作的属性和方法,具有较低耦合性
* 六子口诀:等待、业务、通知
*/
class Data {

private int number = 0;

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void increment() throws InterruptedException {
lock.lock();
try {
//业务代码
while (number != 0) {
//等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,+1 完毕
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws InterruptedException {
lock.lock();
try {
//业务代码
while (number == 0) {
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,+1 完毕
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

JUC进阶版

改进:==Condition:精准的通知和唤醒线程==

​ A - B - C 顺序调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class Test {

public static void main(String[] args) {
Data data = new Data();


new Thread(()->{
for (int i = 0; i < 10 ; i++) {
data.printA();
}
},"A" ).start();
new Thread(()->{
for (int i = 0; i < 10 ; i++) {
data.printB();
}
},"B" ).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printC();
}
},"C" ).start();

}

}

class Data{

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int num = 1;

public void printA() {
lock.lock();
try {
while (num != 1) {
condition1.await();
}
num = 2;
System.out.println("AAAAAA");
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (num != 2) {
condition2.await();
}
num = 3;
System.out.println("BBBBBB");
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (num != 3) {
condition3.await();
}
num = 1;
System.out.println("CCCCCC");
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

集合类不安全

普通集合类在单线程下安全的不得了,多线程就8行了。

List不安全

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ArrayList<Object> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}

存在线程安全问题,爆出错误:java.util.ConcurrentModificationException 并发修改异常

image-20200505114208518

解决方法:

  1. List<Object> list = new Vector<>()

  2. ``List list = Collections.synchronizedList(new ArrayList<>())`

    image-20200505113916028

  3. List<Object> list = CopyOnWriteArrayList<>()

  4. CopyOnWrite(COW),写入时复制,计算机程序设计领域的一种优化策略

    Set不安全

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public static void main(String[] args) {
    Set<String> set = new HashSet<>();

    for (int i = 1; i <= 10; i++) {
    new Thread(() -> {
    set.add(UUID.randomUUID().toString().substring(0, 5));
    System.out.println(set);
    }, String.valueOf(i)).start();
    }
    }

    image-20200505115426546

    依然存在安全问题!

    解决方法:

    1. Set<String> set = Collections.synchronizedSet(new HashSet<>());
    2. Set<String> set = new CopyOnWriteArraySet<>();

    image-20200505115640044

    HashSet底层:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //就是一个HashMap!
    //使用HashMap的key来存放数据(key是不能重复的)

    //不变的值
    private static final Object PRESENT = new Object();

    public HashSet() {
    map = new HashMap<>();
    }

    public boolean add(E e) {
    return map.put(e, PRESENT)==null;
    }

    Map不安全

    先简单回顾一下HashMap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //初始容量
    static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;

    //加载因子,减少哈希碰撞
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    //链表转化成红黑树的门槛
    static final int TREEIFY_THRESHOLD = 8;

    测试:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static void main(String[] args) {
    //一般不这样用
    //默认等价于: new HashMap<>(16.0.75);
    Map<String, String> map = new HashMap<>();

    for (int i = 1; i <= 30; i++) {
    new Thread(()->{
    map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
    System.out.println(map);
    },String.valueOf(i)).start();
    }
    }

    结果:image-20200505152249722

    解决方法:

    1. Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
    2. Map<String, String> map = new ConcurrentHashMap<>();

    ==抽空看研究ConcurrentHashMap源码==

    Callable

    简介

    image-20200505153025107

    Runnable与Callable比较

    Runnable Callable
    没有返回值 可以有返回值
    不能抛出异常 可以抛出异常
    run() call()

    代码测试

    • 传统方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public class CallableTest {
      public static void main(String[] args) {
      new Thread(new MyThread()).start();
      }
      }

      class MyThread implements Runnable{
      @Override
      public void run() {

      }
      }
    • 使用 Callable接口

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      public class CallableTest {

      public static void main(String[] args) throws ExecutionException, InterruptedException {
      // new Thread(new Runnable()).start();
      //new Thread(new FutureTask<>()).start();
      //new Thread(new FutureTask<>( Callable )).start();

      MyThread thread = new MyThread();
      FutureTask<String> futureTask = new FutureTask<>(thread);
      new Thread(futureTask,"A").start();
      new Thread(futureTask,"B").start(); //启动两个线程,call()执行了一次(缓存)

      //get方法可能会产生阻塞!把他放在最后一行或者使用异步通信来处理
      String str = futureTask.get();
      System.out.println(str);

      }

      }

      class MyThread implements Callable<String> {

      @Override
      public String call() {
      System.out.println("call 成功被调用");
      return "1024";
      }
      }

    常用辅助类(必会)

    CountDownLatch

    1
    public class CountDownLatch extends Object

    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

    A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon – the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

    A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

    • 不等计数器归零:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public static void main(String[] args) throws InterruptedException {
    //总数是6
    CountDownLatch countDownLatch = new CountDownLatch(6);
    //-1

    for (int i = 1; i <=6 ; i++) {
    new Thread(()->{
    System.out.println(Thread.currentThread().getName() + "out");
    countDownLatch.countDown(); //数量 -1
    },String.valueOf(i)).start();
    }

    //countDownLatch.await(); //等待计数器归零,然后再向下执行
    System.out.println("over");
    }

    image-20200505161657950

    • 等待计数器归零

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public static void main(String[] args) throws InterruptedException {
      //总数是6
      CountDownLatch countDownLatch = new CountDownLatch(6);
      //-1

      for (int i = 1; i <=6 ; i++) {
      new Thread(()->{
      System.out.println(Thread.currentThread().getName() + "out");
      countDownLatch.countDown(); //数量 -1
      },String.valueOf(i)).start();
      }

      countDownLatch.await(); //等待计数器归零,然后再向下执行
      System.out.println("over");
      }

    image-20200505161819615

    • 用法

      countDownLatch.countDown();

      countDownLatch.await();

      每次有线程调用 countDownLatch.countDown(),计数器数量 -1 ,计数器变为 0 ,countDownLatch.await()就会被唤醒,继续执行下面的操作。

    CyclicBarrier

    1
    public class CyclicBarrier extends Object

    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

    A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

    可以简单理解为 加法计数器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /**
    * 集齐七颗龙珠召唤神龙
    */
    public static void main(String[] args) {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    System.out.println("召唤神龙成功!");
    });

    for (int i = 1; i <= 7; i++) {
    final int temp = i;
    new Thread(()->{
    System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");

    try {
    cyclicBarrier.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    }).start();
    }
    }

    Semaphore

    • java.util.concurrent

    java.lang.Object

    • java.util.concurrent.Semaphore

    • All Implemented Interfaces:Serializable

      1
      public class Semaphore extends Object implements Serializable

      A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public static void main(String[] args) {
    // 线程数量:停车位
    Semaphore semaphore = new Semaphore(3);

    for (int i = 1; i <=6 ; i++) {
    new Thread(()->{
    //acquire()
    try {
    semaphore.acquire();
    System.out.println(Thread.currentThread().getName()+"抢到车位!");
    TimeUnit.SECONDS.sleep(2); //停两秒钟
    System.out.println(Thread.currentThread().getName()+"离开车位!");

    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    //release()
    semaphore.release();
    }

    },String.valueOf(i)).start();
    }
    }
    • 用法

      semaphore.acquire():获得,假设如果已经满了,等待,等待被释放为止!

      semaphore.release():释放,会将当前的信号量释放 +1,然后唤醒等待的线程

    • 作用

      多个资源互斥的使用!并发限流,控制最大的线程数!

    ReadWriteLock(读写锁)

    简介

    java.util.concurrent.locks

    • All Known Implementing Classes:

    ReentrantReadWriteLock


    1
    public interface ReadWriteLock

    A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.

    读操作可以被多线程执行,写只能被一个线程去写

    独占锁(写锁):一次只能被一个线程占有

    共享锁(读锁):多个线程可以同时占有

    示例

    • 不加锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      public class ReadWriteLockDemo {

      public static void main(String[] args) {
      MyCache myCache = new MyCache();

      //write
      for (int i = 1; i <= 5; i++) {
      final int temp = i;
      new Thread(() -> {
      myCache.put(String.valueOf(temp), UUID.randomUUID().toString().substring(0,5));
      }, String.valueOf(i)).start();
      }

      //read
      for (int i = 1; i <= 5; i++) {
      final int temp = i;
      new Thread(() -> {
      myCache.get(String.valueOf(temp));
      }, String.valueOf(i)).start();
      }
      }

      }

      /**
      * 自定义缓存
      */
      class MyCache{

      private volatile Map<String, Object> map = new HashMap<>();

      //存,写
      public void put(String key,Object value) {
      System.out.println(Thread.currentThread().getName() + "写入!" + key);
      map.put(key, value);
      System.out.println(Thread.currentThread().getName() + "写入完毕!");
      }

      //取,读
      public void get(String key) {
      System.out.println(Thread.currentThread().getName() + "读取!" + key);
      map.get(key);
      System.out.println(Thread.currentThread().getName() + "读取完毕!");
      }
      }

      输出结果

      ​ 写入操作时存在插队现象

      image-20200506093655430

    • 加锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      public class ReadWriteLockDemo {

      public static void main(String[] args) {
      MyCacheWithLock myCache = new MyCacheWithLock();

      //write
      for (int i = 1; i <= 5; i++) {
      final int temp = i;
      new Thread(() -> {
      myCache.put(String.valueOf(temp), UUID.randomUUID().toString().substring(0,5));
      }, String.valueOf(i)).start();
      }

      //read
      for (int i = 1; i <= 5; i++) {
      final int temp = i;
      new Thread(() -> {
      myCache.get(String.valueOf(temp));
      }, String.valueOf(i)).start();
      }
      }

      }


      /**
      * 自定义缓存
      */
      class MyCacheWithLock{

      private volatile Map<String, Object> map = new HashMap<>();
      //读写锁,更加细粒度的控制
      private ReadWriteLock lock = new ReentrantReadWriteLock();

      //存,写,只希望同时只有一个线程在写入
      public void put(String key,Object value) {
      lock.writeLock().lock();

      try {
      System.out.println(Thread.currentThread().getName() + "写入!" + key);
      map.put(key, value);
      System.out.println(Thread.currentThread().getName() + "写入完毕!");
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      lock.writeLock().unlock();
      }
      }

      //取,读,希望所有人可以读
      public void get(String key) {
      lock.readLock().lock();

      try {
      System.out.println(Thread.currentThread().getName() + "读取!" + key);
      map.get(key);
      System.out.println(Thread.currentThread().getName() + "读取完毕!");
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      lock.readLock().unlock();
      }
      }
      }

      结果:

      image-20200506094037329

    阻塞队列

    简介

    image-20200506094836508

    image-20200506101956987

    不是新的东西

    • 多线程并发处理
    • 线程池

    四组API

    方式 抛出异常 有返回值,不抛异常 阻塞等待 超时等待
    添加 add() offer() —> false put() offer(e,timeout,TimeUnit)
    移除 remove() poll() —> null take() poll(e,timeout,TimeUnit)
    查看队列首部 element() peek() - -

    同步队列

    SynchronousQueue,和其他 BlockingQueue 不同,必须将 put() 的元素先 take() 出来,才能 put() 下一个元素!

    线程池(重点)

    池化技术

    程序运行的本质:占用系统资源!

    我们需要引入池化技术来优化资源的使用

    池化技术:事先准备好一些资源,有人要用,就过来拿,用完之后还回来!

    线程池的好处

    1. 降低资源的消耗
    2. 提高响应速度
    3. 方便管理

    总结:线程复用、可以控制最大并发数、管理线程

    线程池核心

    三大方法、七大参数、四种拒绝策略

    三大方法

    image-20200507094427242

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
        public static void main(String[] args) {
    //单个线程
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    // 创建一个固定大小的线程池
    // Executors.newFixedThreadPool(5);
    // //可伸缩的
    // Executors.newCachedThreadPool();
    try {
    for (int i = 0; i < 10; i++) {
    threadPool.execute(()->{
    System.out.println(Thread.currentThread().getName());
    });
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    //程序结束前需要关闭线程池
    threadPool.shutdown();
    }


    }
    七大参数

    源码分析:

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    本质:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public ThreadPoolExecutor(int corePoolSize,			//核心线程池大小
    int maximumPoolSize, //最大核心线程池大小
    long keepAliveTime, //超时没人调用就会被释放
    TimeUnit unit, //超时单位
    BlockingQueue<Runnable> workQueue, //阻塞队列
    ThreadFactory threadFactory, //创建线程的
    RejectedExecutionHandler handler//拒绝策略) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
    null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }
    四种拒绝策略

    image-20200507101002241

    手动创建线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    /**
    * public ThreadPoolExecutor(int corePoolSize,
    * int maximumPoolSize,
    * long keepAliveTime,
    * TimeUnit unit,
    * BlockingQueue<Runnable> workQueue,
    * ThreadFactory threadFactory,
    * RejectedExecutionHandler handler)
    */
    public static void main(String[] args) {

    //自定义线程池
    ExecutorService threadPool = new ThreadPoolExecutor(
    2,
    5,
    3,
    TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(3),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()); //银行满了还有人进来,不处理这个人的,抛出异常

    try {
    for (int i = 1; i <= 8; i++) {
    threadPool.execute(()->{
    System.out.println(Thread.currentThread().getName());
    });
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    //程序结束前需要关闭线程池
    threadPool.shutdown();
    }
    }

    最大线程(maximumPoolSize)该如何定义?(调优)

    1. CPU密集型

      12核 —> 12条线程

      1
      2
      //获取CPU的核心数
      System.out.println(Runtime.getRuntime().availableProcessors()); //12
    2. IO密集型

      判断程序中十分耗 IO 的线程数 x,设置最大线程数 > x

    函数式接口(必须掌握)

    Java程序员需要掌握的旧技术:泛型、枚举、反射

    Java程序员需要掌握的新技术:lambda表达式、链式编程、函数式接口、Stream流式计算

    函数式接口:只有一个方法的接口

    ​ 简化编程模型,在新版本的框架底层被大量使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @FunctionalInterface
    public interface Runnable {
    /**
    * When an object implementing interface <code>Runnable</code> is used
    * to create a thread, starting the thread causes the object's
    * <code>run</code> method to be called in that separately executing
    * thread.
    * <p>
    * The general contract of the method <code>run</code> is that it may
    * take any action whatsoever.
    *
    * @see java.lang.Thread#run()
    */
    public abstract void run();
    }

    四大原生的函数式接口:

    • Consumer
    • Function
    • Predicate
    • Supplier

    函数式接口

    image-20200507110148623

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * Function 函数式接口,有一个输入参数,有一个输出
    * 只要是 函数式接口,就可以用 lambda表达式简化
    */
    public class Demo01 {

    public static void main(String[] args) {
    //工具类,输出输入的值
    // Function<String, String> function = new Function<String, String>() {
    // @Override
    // public String apply(String s) {
    // return s;
    // }
    // };
    Function<String,String> function = (str) -> { return str; };

    System.out.println(function.apply("123"));

    }
    }

    断定型接口

    image-20200507112345415

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * Predicate 断定型接口,有一个输入参数,返回值只能是布尔值
    */
    public class Demo02 {
    public static void main(String[] args) {
    //判断字符串是否为空
    // Predicate<String> predicate = new Predicate<String>() {
    // @Override
    // public boolean test(String str) {
    // return str.isEmpty();
    // }
    // };
    Predicate<String> predicate = (str) -> { return str.isEmpty();};
    }
    }

    消费型接口

    image-20200507112539535

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /**
    * Consumer 消费性接口,只有输入,没有返回值
    */
    public class Demo03 {
    public static void main(String[] args) {

    // Consumer<String> consumer = new Consumer<String>() {
    // @Override
    // public void accept(String str) {
    // System.out.println(str);
    // }
    // };

    Consumer<String> consumer = (str) -> { System.out.println(str); };
    }
    }

    供给型接口

    image-20200507112937140

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * Supplier 供给型接口:没有参数,有返回值
    */
    public class Demo04 {

    public static void main(String[] args) {

    // Supplier supplier = new Supplier<Integer>() {
    // @Override
    // public Integer get() {
    // return 1024;
    // }
    // };

    Supplier<Integer> supplier = ()->{ return 1024;};
    }
    }

    Stream流式计算

    大数据 : 存储 + 计算

    ​ 存储:集合、MySQL

    ​ 计算:交给流来操作!

    image-20200507113911453

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class User {

    private int id;
    private String name;
    private int age;

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /**
    * 题目要求:一分钟内完成此题,只能用一行代码实现
    * 现在有五个用户,筛选:
    * 1.ID 必须是偶数
    * 2.年龄必须大于23岁
    * 3.用户名转为大写字母
    * 4.用户名字母倒着排序
    * 5.只输出一个用户
    */
    public class Test {
    public static void main(String[] args) {
    User u1 = new User(1, "a", 21);
    User u2 = new User(2, "b", 22);
    User u3 = new User(3, "c", 23);
    User u4 = new User(4, "d", 24);
    User u5 = new User(5, "e", 25);
    //集合就是存储
    List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

    //计算交给Stream
    //lambda表达式、链式编程、函数式接口、Stream流式计算
    list.stream()
    .filter(u->{return u.getId()%2 == 0;})
    .filter(u->{return u.getAge()>23;})
    .map(u->{return u.getName().toUpperCase();})
    .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
    .limit(1)
    .forEach(System.out::println);
    }
    }

    ForkJoin

    简介

    ForkJoin(分支合并)出现在 JDK 1.7,可以并行执行任务,提高效率。

    在大数据量的情况下使用!

    image-20200508092713288

    特点

    工作窃取

    ​ 多个线程执行任务时,提前执行完任务的线程可以将没执行完任务的现场的任务拿过来执行,提高效率(维护了一个双端队列)

    实例

    image-20200508100716554

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    /** 
    * ForkJoin
    * 1.ForkJoinPool
    * 2.forkJoinPool.execute(ForkJoinTask forkJoinTask)
    * 3.计算类继承ForkJoinTask
    */
    public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    //临界值
    private Long thershold = 10000L;

    public ForkJoinDemo(Long start, Long end) {
    this.start = start;
    this.end = end;
    }

    @Override
    protected Long compute() {
    if (end - start < thershold) {
    Long sum = 0L;
    for (Long i = start; i <= end; i++) {
    sum += i;
    }
    return sum;
    } else {
    Long middle = (start + end) / 2;
    ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
    ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
    //拆分
    task1.fork();
    task2.fork();
    //合并并返回
    return task1.join() + task2.join();
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    /**
    * 求和计算任务
    * 程序员的369等
    * 3:头铁直接求和
    * 6:ForkJoin
    * 9:Stream并行流
    */
    public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    test1();
    test2();
    test3();
    }

    public static void test1() {
    long start = System.currentTimeMillis();

    Long sum = 0L;
    for (Long i = 1L; i <= 10_000_1000; i++) {
    sum += i;
    }

    long end = System.currentTimeMillis();
    System.out.println("3:"+"sum = "+ sum +" time= "+ (end - start));
    }

    //ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ForkJoinDemo task = new ForkJoinDemo(0L, 10_000_1000L);
    ForkJoinTask<Long> submit = forkJoinPool.submit(task);

    Long sum = submit.get();

    long end = System.currentTimeMillis();
    System.out.println("6:"+"sum = "+ sum +" time= "+ (end - start));

    }

    //Stream并行流
    public static void test3() {
    long start = System.currentTimeMillis();

    long sum = LongStream.rangeClosed(0L, 10_000_1000L).parallel().reduce(0, Long::sum);

    long end = System.currentTimeMillis();
    System.out.println("9:"+"sum = "+ sum +" time= "+ (end - start));

    }


    }

    执行结果:

    image-20200508103137218

    异步回调

    ……

    JMM

    Volatile的理解

    Volatile 是 Java 虚拟机提供轻量级的同步机制

    1. 保证可见性
    2. 不保证原子性
    3. 禁止指令重排

    什么是JMM

    JMM(Java Memory Model,Java内存模型),不像JVM,JMM是不存在的东西,是一种概念、约定。

    关于JMM的一些同步约定:

    1. 线程解锁前,必须把共享变量立刻刷回主存
    2. 线程加锁前,必须读取主存中的最新值到工作内存中
    3. 加锁和解锁是同一把锁

    8种操作:

    image-20200509092515374

    • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
    • unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
    • read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
    • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
    • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
    • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
    • write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。

    存在的问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class Demo {

    private static int num = 0;

    public static void main(String[] args) {

    new Thread(() -> {
    while (num == 0) {

    }
    }).start();

    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    num = 1;
    System.out.println(num);

    }

    }

    image-20200509094216939

    线程A 在执行时 线程B 修改了变量值并且存到了主存中,但是 线程A 却不知道主存中的值发生了变化,仍在使用原来旧的值。

    Volatile

    1.保证可见性

    上面问题的解决方法:

    private volatile static int num = 0;

    2.不保证原子性

    原子性:不可分割

    线程A 在执行任务的时候,不能被打扰,也不能被分割,要么同时成功,要么同时失败

    如果不加 lock 和 synchronized ,怎么保证原子性?

    使用原子类(底层用的是CAS)来解决原子性问题!

    image-20200509095742877

    3.避免指令重排

    指令重排:你写的程序,计算机并不是按照你写的那样去执行的。

    源代码——》编译器优化的重排——》指令并行也可能会重排——》内存系统也会重排——》执行

    避免指令重排:内存屏障(在单例模式中使用频繁)

    深入理解单例模式

    饿汉式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * 饿汉式单例
    * 在还没有用的时候就把对象都创建出来了,可能会造成浪费空间
    */
    public class Hungry {

    private byte[] data = new byte[1024 * 1024];

    //构造器私有
    private Hungry() {

    }

    private final static Hungry HUNGRY = new Hungry();

    public static Hungry getInstance() {
    return HUNGRY;
    }
    }

    懒汉式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
    * 懒汉式单例
    * 单线程下ok
    * 多线程下有问题 ——》 加锁
    */
    public class LazyMan {

    private LazyMan() {
    System.out.println(Thread.currentThread().getName() + "ok");
    }

    //原子性操作
    private volatile static LazyMan lazyMan;

    //双重检测锁模式
    public static LazyMan getInstance() {
    if (lazyMan == null) {
    synchronized (LazyMan.class) {
    if (lazyMan == null) {
    /**
    * 不是原子性操作
    * 1. 分配内存空间
    * 2. 执行构造方法,初始化对象
    * 3. 把对象指向这个空间
    */
    lazyMan = new LazyMan();
    }
    }
    }
    return lazyMan;
    }

    //测试多线程并发
    public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
    new Thread(() ->{
    LazyMan.getInstance();
    }).start();
    }
    }

    }

    深入理解CAS

    简介

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class CASDemo {

    public static void main(String[] args) {
    AtomicInteger atomicInteger = new AtomicInteger(2020);

    /**
    * CAS(CompareAndSet):比较并交换
    * public final boolean compareAndSet(int expect, int update)
    * except:期望
    * update:更新
    * 如果期望的值达到了,就更新
    * CAS 是 CPU 的并发原语
    */
    atomicInteger.compareAndSet(2020,2021);
    atomicInteger.get();
    atomicInteger.getAndIncrement();
    }
    }

    unsafe类

    image-20200509110744442

    image-20200509111600981

    缺点

    1. 自旋锁循环会耗时
    2. 一次性只能保证一个共享变量的原子性
    3. ABA 问题

    ABA问题

    ABA问题:狸猫换太子

    image-20200509112905245

    解决方法:引入 原子引用(乐观锁)

    各种锁的理解

    1.公平锁、非公平锁

    • 公平锁:非常公平,不能插队(必须先来后到)
    • 非公平锁:非常霸道,可以插队(T1:3h,T2:3s),默认都是非公平锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * Creates an instance of {@code ReentrantLock}.
    * This is equivalent to using {@code ReentrantLock(false)}.
    */
    public ReentrantLock() {
    sync = new NonfairSync();
    }

    /**
    * Creates an instance of {@code ReentrantLock} with the
    * given fairness policy.
    * @param fair {@code true} if this lock should use a fair ordering policy
    */
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }

    2.可重入锁

    可重入锁(递归锁):拿到了外面的锁之后就可以自动获得里面的锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    //synchronized版
    public class Demo01 {

    public static void main(String[] args) {
    Phone phone = new Phone();

    new Thread(() -> {
    phone.sms();
    }, "A").start();

    new Thread(() -> {
    phone.sms();
    }, "B").start();
    }
    }

    class Phone {
    public synchronized void sms() {
    System.out.println(Thread.currentThread().getName()+" sms");
    call(); //这也有锁
    }

    public synchronized void call() {
    System.out.println(Thread.currentThread().getName()+" call");
    }
    }

    image-20200509120043095

    3.自旋锁

    SpinLock

    image-20200509120442162

    4.死锁

    参考

    狂神说Java:https://www.bilibili.com/video/BV1B7411L7tE

    Java官方文档:https://docs.oracle.com/javase/8/docs/api/

    EnjoyMoving:https://zhuanlan.zhihu.com/p/29881777

    Author: wxshhh
    Link: http://wxshhh.github.io/2023/03/01/%E3%80%90JUC%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E3%80%91%E4%B8%80/
    Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
    avatar
    wxshhh
    中国科学技术大学软件工程在读
    Follow Me
    Announcement
    This is my Blog
    Recent Post