【JUC并发编程】
什么是JUC
线程是 Java 中浓墨重彩的一笔。
JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。
这里列举两个之前学习 Java基础 用到过的两个接口:
基础概念
线程和进程
- 进程:一个程序的一次动态执行过程,
.exe
- 线程:进程可细分的最小单位,一个进程往往可以包含多个线程
Java 默认有几个线程?
2 个 !
main线程
GC线程
对于Java有哪几种开启线程的方式?
- Thread
- Runnable
- Callable
Java真的可以开启线程嘛?
并不行,需要使用
public native void start0()
方法来调用本地方法使用其他语言开启线程
1 | public synchronized void start() { |
并发和并行
并发:多个线程操作同一个资源(交替执行)
本质:充分利用 CPU 的资源
并行:多个线程 同时 操作一个资源(同时执行)
查看 CPU 核心数(CPU核心数代表最多可以并行操作多少线程)
1 | public static void main(String[] args) { |
线程有几个状态?
六个
1
2
3
4
5
6
7
8 public enum State {
NEW, //新生
RUNNABLE, //运行
BLOCKED, //阻塞
WAITING, //等待,死死的等
TIMED_WAITING, //超时等待
TERMINATED; //终止
}Wait 和 Sleep 的区别:
wait sleep Object Thread 会释放锁 不会释放(睡着了都怎么释放) 必须在同步代码块中使用 可以在任何地方睡 不需要捕获异常 必须捕获异常
Lock锁(重点)
synchronized
1 | public class SaleTicket { |
加入 synchronized
关键字之前:
加入 synchronized
关键字之后:
Lock接口
使用方法
实现类
公平锁和非公平锁
- 公平锁:十分公平,可以先来后到
- 非公平锁:十分不公平,可以插队(默认)
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25class Ticket{
//属性
private int number = 30;
Lock lock = new ReentrantLock();
//方法
public void sale() {
//加锁
lock.lock();
try {
//业务代码
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张票,剩余:" + number);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//解锁
lock.unlock();
}
}
}运行结果与使用
synchronized
一致synchronized 和 Lock锁 的区别
synchronized | Lock |
---|---|
是一个内置的关键字 | 是一个Java类 |
无法判断锁的状态 | 可以判断是否获取到了锁 |
会自动释放锁 | 必需要手动释放(不释放会出现死锁) |
Thread1(获得锁、阻塞)、Thread2(等待,傻傻的等) | 不一定会等下去 |
可重入锁、不可中断的、非公平的 | 可重入锁、可以中断的、非公平的(可以设置) |
适合锁少量的同步代码 | 适合锁大量的同步代码 |
锁是什么?如何判断锁的是谁?
普通方法上锁,锁住的是调用方法的对象实例
静态方法上锁,锁住的是整个类
生产者和消费者问题
问题描述:
本质是线程之间的通信问题
两个线程交替执行,A、B 同时操作同一个变量
A:num + 1
B:num - 1
要保证 num 在 0、1 之间交替
大有门道!
Synchronized版
1 | public class test { |
问题:
如果存在 A、B、C、D 四个线程,线程还是安全的嘛
答:不安全,存在虚假唤醒问题
解决方法: if —> while
用 if 判断时,当被其他线程唤醒就会直接接着执行后面的指令
用while判断时,当被其他线程唤醒后还会再次进行判断
JUC版
只是对 synchronized版本 的覆盖
1 | public class test { |
JUC进阶版
改进:==Condition:精准的通知和唤醒线程==
A - B - C 顺序调用
1 | public class Test { |
集合类不安全
普通集合类在单线程下安全的不得了,多线程就8行了。
List不安全
1 | public static void main(String[] args) { |
存在线程安全问题,爆出错误:java.util.ConcurrentModificationException
并发修改异常
解决方法:
List<Object> list = new Vector<>()
``List
List<Object> list = CopyOnWriteArrayList<>()
CopyOnWrite(COW),写入时复制,计算机程序设计领域的一种优化策略
Set不安全
1 | public static void main(String[] args) { |
依然存在安全问题!
解决方法:
Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
HashSet底层:
1 | //就是一个HashMap! |
Map不安全
先简单回顾一下HashMap
1 | //初始容量 |
测试:
1 | public static void main(String[] args) { |
结果:
解决方法:
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
Map<String, String> map = new ConcurrentHashMap<>();
==抽空看研究ConcurrentHashMap源码==
Callable
简介
Runnable与Callable比较
Runnable | Callable |
---|---|
没有返回值 | 可以有返回值 |
不能抛出异常 | 可以抛出异常 |
run() |
call() |
代码测试
传统方法
1
2
3
4
5
6
7
8
9
10
11
12public class CallableTest {
public static void main(String[] args) {
new Thread(new MyThread()).start();
}
}
class MyThread implements Runnable{
public void run() {
}
}使用 Callable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
//new Thread(new FutureTask<>()).start();
//new Thread(new FutureTask<>( Callable )).start();
MyThread thread = new MyThread();
FutureTask<String> futureTask = new FutureTask<>(thread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); //启动两个线程,call()执行了一次(缓存)
//get方法可能会产生阻塞!把他放在最后一行或者使用异步通信来处理
String str = futureTask.get();
System.out.println(str);
}
}
class MyThread implements Callable<String> {
public String call() {
System.out.println("call 成功被调用");
return "1024";
}
}
常用辅助类(必会)
CountDownLatch
- java.util.concurrent
- java.lang.Object
- java.util.concurrent.CountDownLatch
1 public class CountDownLatch extends ObjectA synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A
CountDownLatch
is initialized with a given count. Theawait
methods block until the current count reaches zero due to invocations of thecountDown()
method, after which all waiting threads are released and any subsequent invocations ofawait
return immediately. This is a one-shot phenomenon – the count cannot be reset. If you need a version that resets the count, consider using aCyclicBarrier
.A
CountDownLatch
is a versatile synchronization tool and can be used for a number of purposes. ACountDownLatch
initialized with a count of one serves as a simple on/off latch, or gate: all threads invokingawait
wait at the gate until it is opened by a thread invokingcountDown()
. ACountDownLatch
initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
- 不等计数器归零:
1 | public static void main(String[] args) throws InterruptedException { |
等待计数器归零
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void main(String[] args) throws InterruptedException {
//总数是6
CountDownLatch countDownLatch = new CountDownLatch(6);
//-1
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "out");
countDownLatch.countDown(); //数量 -1
},String.valueOf(i)).start();
}
countDownLatch.await(); //等待计数器归零,然后再向下执行
System.out.println("over");
}
用法
countDownLatch.countDown();
countDownLatch.await();
每次有线程调用
countDownLatch.countDown()
,计数器数量 -1 ,计数器变为 0 ,countDownLatch.await()
就会被唤醒,继续执行下面的操作。
CyclicBarrier
java.util.concurrent
java.util.concurrent.CyclicBarrier
1 public class CyclicBarrier extends ObjectA synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
A
CyclicBarrier
supports an optionalRunnable
command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
可以简单理解为 加法计数器
1 | /** |
Semaphore
- java.util.concurrent
java.util.concurrent.Semaphore
All Implemented Interfaces:Serializable
1 public class Semaphore extends Object implements SerializableA counting semaphore. Conceptually, a semaphore maintains a set of permits. Each
acquire()
blocks if necessary until a permit is available, and then takes it. Eachrelease()
adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; theSemaphore
just keeps a count of the number available and acts accordingly.
1 | public static void main(String[] args) { |
用法
semaphore.acquire()
:获得,假设如果已经满了,等待,等待被释放为止!semaphore.release()
:释放,会将当前的信号量释放 +1,然后唤醒等待的线程作用
多个资源互斥的使用!并发限流,控制最大的线程数!
ReadWriteLock(读写锁)
简介
java.util.concurrent.locks
- All Known Implementing Classes:
1 public interface ReadWriteLockA
ReadWriteLock
maintains a pair of associatedlocks
, one for read-only operations and one for writing. Theread lock
may be held simultaneously by multiple reader threads, so long as there are no writers. Thewrite lock
is exclusive.读操作可以被多线程执行,写只能被一个线程去写
独占锁(写锁):一次只能被一个线程占有
共享锁(读锁):多个线程可以同时占有
示例
不加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//write
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(String.valueOf(temp), UUID.randomUUID().toString().substring(0,5));
}, String.valueOf(i)).start();
}
//read
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(String.valueOf(temp));
}, String.valueOf(i)).start();
}
}
}
/**
* 自定义缓存
*/
class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
//存,写
public void put(String key,Object value) {
System.out.println(Thread.currentThread().getName() + "写入!" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完毕!");
}
//取,读
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "读取!" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕!");
}
}输出结果
写入操作时存在插队现象
加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheWithLock myCache = new MyCacheWithLock();
//write
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(String.valueOf(temp), UUID.randomUUID().toString().substring(0,5));
}, String.valueOf(i)).start();
}
//read
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(String.valueOf(temp));
}, String.valueOf(i)).start();
}
}
}
/**
* 自定义缓存
*/
class MyCacheWithLock{
private volatile Map<String, Object> map = new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock lock = new ReentrantReadWriteLock();
//存,写,只希望同时只有一个线程在写入
public void put(String key,Object value) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入!" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完毕!");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
//取,读,希望所有人可以读
public void get(String key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取!" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕!");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}结果:
阻塞队列
简介
不是新的东西!
- 多线程并发处理
- 线程池
四组API
方式 | 抛出异常 | 有返回值,不抛异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() —> false | put() | offer(e,timeout,TimeUnit) |
移除 | remove() | poll() —> null | take() | poll(e,timeout,TimeUnit) |
查看队列首部 | element() | peek() | - | - |
同步队列
SynchronousQueue
,和其他 BlockingQueue
不同,必须将 put()
的元素先 take()
出来,才能 put()
下一个元素!
线程池(重点)
池化技术
程序运行的本质:占用系统资源!
我们需要引入池化技术来优化资源的使用
池化技术:事先准备好一些资源,有人要用,就过来拿,用完之后还回来!
线程池的好处
- 降低资源的消耗
- 提高响应速度
- 方便管理
总结:线程复用、可以控制最大并发数、管理线程
线程池核心
三大方法、七大参数、四种拒绝策略
三大方法
1 | public static void main(String[] args) { |
七大参数
源码分析:
1 | public static ExecutorService newSingleThreadExecutor() { |
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
1 | public static ExecutorService newCachedThreadPool() { |
本质:
1 | public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 |
四种拒绝策略
手动创建线程池
1 | /** |
最大线程(maximumPoolSize)该如何定义?(调优)
CPU密集型
12核 —> 12条线程
1
2 //获取CPU的核心数
System.out.println(Runtime.getRuntime().availableProcessors()); //12IO密集型
判断程序中十分耗 IO 的线程数 x,设置最大线程数 > x
函数式接口(必须掌握)
Java程序员需要掌握的旧技术:泛型、枚举、反射
Java程序员需要掌握的新技术:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口:只有一个方法的接口
简化编程模型,在新版本的框架底层被大量使用
1 |
|
四大原生的函数式接口:
Consumer
Function
Predicate
Supplier
函数式接口
1 | /** |
断定型接口
1 | /** |
消费型接口
1 | /** |
供给型接口
1 | /** |
Stream流式计算
大数据 : 存储 + 计算
存储:集合、MySQL
计算:交给流来操作!
1 |
|
1 | /** |
ForkJoin
简介
ForkJoin(分支合并)出现在 JDK 1.7,可以并行执行任务,提高效率。
在大数据量的情况下使用!
特点
工作窃取:
多个线程执行任务时,提前执行完任务的线程可以将没执行完任务的现场的任务拿过来执行,提高效率(维护了一个双端队列)
实例
1 | /** |
1 | /** |
执行结果:
异步回调
……
JMM
Volatile的理解
Volatile 是 Java 虚拟机提供轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM
JMM(Java Memory Model,Java内存模型),不像JVM,JMM是不存在的东西,是一种概念、约定。
关于JMM的一些同步约定:
- 线程解锁前,必须把共享变量立刻刷回主存
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
8种操作:
- lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
- unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
- assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
- store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
- write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
存在的问题
1 | public class Demo { |
线程A 在执行时 线程B 修改了变量值并且存到了主存中,但是 线程A 却不知道主存中的值发生了变化,仍在使用原来旧的值。
Volatile
1.保证可见性
上面问题的解决方法:
private volatile static int num = 0;
2.不保证原子性
原子性:不可分割
线程A 在执行任务的时候,不能被打扰,也不能被分割,要么同时成功,要么同时失败
如果不加 lock 和 synchronized ,怎么保证原子性?
使用原子类(底层用的是CAS)来解决原子性问题!
3.避免指令重排
指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码——》编译器优化的重排——》指令并行也可能会重排——》内存系统也会重排——》执行
避免指令重排:内存屏障(在单例模式中使用频繁)
深入理解单例模式
饿汉式
1 | /** |
懒汉式
1 | /** |
深入理解CAS
简介
1 | public class CASDemo { |
unsafe类
缺点
- 自旋锁循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA 问题
ABA问题
ABA问题:狸猫换太子
解决方法:引入 原子引用(乐观锁)
各种锁的理解
1.公平锁、非公平锁
- 公平锁:非常公平,不能插队(必须先来后到)
- 非公平锁:非常霸道,可以插队(T1:3h,T2:3s),默认都是非公平锁
1 | /** |
2.可重入锁
可重入锁(递归锁):拿到了外面的锁之后就可以自动获得里面的锁
1 | //synchronized版 |
3.自旋锁
SpinLock
4.死锁
参考
狂神说Java:https://www.bilibili.com/video/BV1B7411L7tE
Java官方文档:https://docs.oracle.com/javase/8/docs/api/
EnjoyMoving:https://zhuanlan.zhihu.com/p/29881777