前言

JUC是java.util.concurrent工具包的简称,是处理线程的工具包,用来解决多线程高并发的问题。

Lock接口

synchronized

synchronized是一种同步锁。

具体修饰的对象有三种方法:

  • 修饰代码块
  • 修饰方法
  • 修饰静态方法

示例:

  1. 创建资源类,在资源类中定义属性和操作方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class SyncTicket {

/** 票数 */
private int number = 30;

/**
* 卖票
*/
public synchronized void sale(){
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ": 成功卖出一张票, 剩余票: " + (--number));
}
}
}
  1. 创建多个线程,调用资源类和操作方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SaleSyncTicket {
public static void main(String[] args) {
SyncTicket syncTicket = new SyncTicket();

// 传统创建线程方法
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
//调用卖票方法
ticket.sale();
}
}
},"AA").start();

// lambda创建线程方法
new Thread(() -> {
for (int i = 0; i < 40; i++) {
syncTicket.sale();
}
}, "BB").start();
}
}

Lock接口

为锁和等待条件提供一个框架的接口和类,不同于内置同步和监视器,Lock是类,可以通过类实现同步访问,多个接口实现类:可重入锁等。

Lock的编程步骤同synchronized:

  • 创建资源类,在资源类中定义属性和操作方法
  • 创建多个线程,调用资源类和操作方法

定义可重入锁:

1
2
3
4
5
6
// 构造函数,true:公平锁,false:非公平锁
private final ReentrantLock lock = new ReentrantLock(true);
// 上锁
lock.lock();
// 解锁
lock.unlock();

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ReentrantTicket {

private final ReentrantLock lock = new ReentrantLock(true);

private int number = 30;

public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ": 成功卖出一张票, 剩余票: " + (--number));
}
} finally {
lock.unlock();
}
}

}

public class SaleReentrantTicket {
public static void main(String[] args) {
ReentrantTicket ticket = new ReentrantTicket();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "AA").start();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "BB").start();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "CC").start();
}
}

Lock方法

1
2
3
4
5
6
7
8
9
10
public interface Lock {
// 获取锁
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
Condition newCondition();
}
  • lock()方法用来获取锁,配合finally来释放锁
1
2
3
4
5
6
7
8
9
lock.lock();
try {
// 处理任务
} catch (Exception e) {
// 异常处理
} finally {
// 释放锁
lock.unlock();
}
  • Lock.Condition可以实现等待/通知模式

关键字synchronized通过wait()/notify()可以实现等待通知模式,Lock锁的newCondition()方法返回Condition对象可以实现等待/通知模式。

notify()通知时,JVM会随机唤醒某个等待的线程,使用Condition类可以选择性的通知,Condition常用的两个方法:

  • await()使当前线程等待,同时会释放锁,当其他线程调用signal()时,线程会重新获取锁并继续执行。
  • signal()用于唤醒一个等待的线程。

ReentrantLock(可重入锁)是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。

ReentrantReadWriteLock里面提供了很多丰富的方法,获取读锁:readLock(),写锁:writeLock()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}

假设有一个线程占用了读锁,则此时其他线程如果要申请写锁,则申请的写锁的线程会一直等待读锁释放,反之同理。读写锁适用于读多写少的场景。

两者差异

synchronized与lock的异同:

  • synchronized是java关键字,内置,而Lock不是内置,是一个类,可以实现同步访问且比synchronized中的方法更加丰富

  • synchronized不会手动释放锁,而lock需手动释放锁(不解锁会出现死锁,需要在 finally 块中释放锁)

  • lock等待锁的线程会相应中断,而synchronized不会相应,只会一直等待

  • 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到

  • Lock 可以提高多个线程进行读操作的效率(当多个线程竞争的时候)

线程间通信

操作线程的时候,等待线程使用wait()。
通知另外的线程操作用notify()、notifyAll()。
假设有两个线程,该线程在执行过程中,判断值(不是该值等待,让其他线程抢),操作值,通知另外一个线程的调度。

通过使用两个线程对0这个值操作,一个线程加1,一个线程减1,交替实现多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//第一步 创建资源类,定义属性和操作方法
class Share {
//初始值
private int number = 0;

//+1的方法
public synchronized void incr() throws InterruptedException {
//第二步 判断 干活 通知
if (number != 0) { //判断number值是否是0,如果不是0,等待
this.wait(); //在哪里睡,就在哪里醒
}
//如果number值是0,就+1操作
number++;
System.out.println(Thread.currentThread().getName() + " :: " + number);
//通知其他线程
this.notifyAll();
}

//-1的方法
public synchronized void decr() throws InterruptedException {
//判断
if (number != 1) {
this.wait();
}
//干活
number--;
System.out.println(Thread.currentThread().getName() + " :: " + number);
//通知其他线程
this.notifyAll();
}
}

public class ThreadDemo1 {
//第三步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Share share = new Share();
//创建线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "DD").start();
}
}

out:

1
2
3
4
5
6
BB :: 0
CC :: 1
BB :: 0
CC :: 1
BB :: 0
DD :: -1

主要是虚拟唤醒导致:如果一个线程执行完毕后,通知其他线程,该线程又进入等待睡眠,可能会因为某些原因被唤醒后,if结构的语句就不会判断了,一直往下执行,所以需要将if换成while结构,每次都判断。因为wait在哪里睡眠就在哪里被唤醒,结果被某个异常唤醒了后回不去了,if结构不会在判断了,需要更改为while。

1
2
3
while(number != 0) { //判断number值是否是0,如果不是0,等待
this.wait(); //在哪里睡,就在哪里醒
}

线程间定制化通信

所谓定制化通信,需要让线程进行一定的顺序操作

案列:启动三个线程,按照如下要求:
AA打印5此,BB打印10次,CC打印15次,一共进行10轮

具体思路:
每个线程添加一个标志位,是该标志位则执行操作,并且修改为下一个标志位,通知下一个标志位的线程

创建一个可重入锁private Lock lock = new ReentrantLock();
分别创建三个开锁通知private Condition c1 = lock.newCondition();

具体资源类中的A线程代码操作
上锁,(执行具体操作(判断、操作、通知),解锁)放于try、finally

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void print5(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while(flag != 1) {
//等待
c1.await();
}
//干活
for (int i = 1; i <=5; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//通知
flag = 2; //修改标志位 2
c2.signal(); //通知BB线程
}finally {
//释放锁
lock.unlock();
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ShareResource {

private int flag = 1;
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();

public void notifyA(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 1) {
conditionA.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + loop);
}
flag = 2;
conditionB.signal();
} finally {
lock.unlock();
}
}

public void notifyB(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 2) {
conditionB.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + loop);
}
flag = 3;
conditionC.signal();
} finally {
lock.unlock();
}
}

public void notifyC(int loop) throws InterruptedException {
lock.lock();
try {
while (flag != 3) {
conditionC.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + loop);
}
flag = 1;
conditionA.signalAll();
} finally {
lock.unlock();
}
}

}

public class ThreadDemo3 {

public static void main(String[] args) {
ShareResource shareResource = new ShareResource();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.notifyA(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.notifyB(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
shareResource.notifyC(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CC").start();
}
}

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
AA::0
...
BB::0
...
CC::0
...
AA::1
...
BB::1
...
CC::1
...
...

集合的线程安全

List

ArrayList线程不安全示例:

方法签名:

1
public boolean add(E e)

该方法由于没有synchronized声名以及加锁,存在并发安全问题。

并发向集合添加元素

1
2
3
4
5
6
7
8
// 线程不安全
List<String> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

out:

1
2
3
4
5
6
7
8
Exception in thread "167" java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.io.PrintStream.println(PrintStream.java:897)
at cn.ysliu.sync.ThreadDemo4.lambda$main$0(ThreadDemo4.java:25)
at java.base/java.lang.Thread.run(Thread.java:834)

Vector

使用synchronized同步方法。

方法签名:

1
public synchronized boolean add(E e);
1
2
3
4
5
6
7
8
// 解决方案1
List<String> list = new Vector<>();
for (int i = 0; i < 200; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

Collections

Collections工具类创建线程安全的集合,使用synchronized同步代码块。

方法实现:

1
2
3
4
5
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
1
2
3
4
5
6
7
8
9
10
11
static class SynchronizedList<E>
extends SynchronizedCollection<E>
implements List<E> {
private static final long serialVersionUID = -7754090372962971524L;

// ...

public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
}
1
2
3
4
5
6
7
8
// 解决方案2
List<String> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 200; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

CopyOnWriteArrayList

写时复制技术,volatile和互斥锁来实现,写时加锁,读则不限制。

在修改集合元素时,加锁,复制原来的集合并修改集合元素,把修改完的复制集合重新覆盖原有集合,适用于读多写少的场景,集合数据量小,实时性要求不是很高的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}

public E get(int index) {
return elementAt(getArray(), index);
}
1
2
3
4
5
6
7
8
// 解决方案3, 写时复制,可以同时读,只能同时一个人写, 一般使用该方案
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 200; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}

Set

HashSet线程不安全示例演示:

1
public boolean add(E e);

该方法由于没有synchronized声名以及加锁,存在并发安全问题。

1
2
3
4
5
6
7
8
// 线程不安全
Set<String> set = new HashSet<>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
1
2
3
4
5
6
7
8
Exception in thread "85" java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1516)
at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.io.PrintStream.println(PrintStream.java:897)
at cn.ysliu.sync.ThreadDemo4.lambda$main$0(ThreadDemo4.java:41)
at java.base/java.lang.Thread.run(Thread.java:834)

CopyOnWriteArraySet

写时复制的Set集合,volatile和互斥锁来实现,写时加锁,读则不限制。

1
2
3
4
5
6
7
8
// 解决方案
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}

Map

HashMap线程不安全示例:

1
2
3
4
5
6
7
8
9
// 线程不安全
Map<String, String> map = new HashMap<>();
for (int i = 0; i < 100; i++) {
String key = String.valueOf(i);
new Thread(() -> {
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
1
2
3
4
5
6
7
8
9
Exception in thread "93" java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1526)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1524)
at java.base/java.util.AbstractMap.toString(AbstractMap.java:551)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.io.PrintStream.println(PrintStream.java:897)
at cn.ysliu.sync.ThreadDemo4.lambda$main$0(ThreadDemo4.java:56)
at java.base/java.lang.Thread.run(Thread.java:834)

ConcurrentHashMap

ConcurrentHashMap时HashMap的实现类。

1
2
// 解决方案,使用C	oncurrentHashMap替代HashMap
Map<String, String> map = new ConcurrentHashMap<>();

多线程锁

在某一时刻内,只能有唯一一个线程去访问这些synchronized方法。

对象锁

1
synchronized method{}

对象锁也叫实例锁,对应synchronized关键字,当多个线程访问多个实例时,它们互不干扰,每个对象都拥有自己的锁,如果是单例模式下,那么就是变成和类锁一样的功能。

对象锁防止在同一个时刻多个线程访问同一个对象的synchronized块。如果不是同一个对象就没有这样子的限制。

类锁

1
static sychronized method{}

类锁对应的关键字是static synchronized,是一个全局锁,无论多少个对象都共享同一个锁(也可以锁定在该类的class上或者是classloader对象上),同样是保障同一个时刻多个线程同时访问同一个synchronized块,当一个线程在访问时,其他的线程等待。

类锁和对象锁使用的不是同一把的锁。

具体八种情况:

  • 标准访问,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    class Phone {

    public synchronized void sendSMS(){
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    new Thread(() -> {
    phone.sendSMS();
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ------sendSMS
    ------sendEmail
  • 停4秒在短信方法内,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    class Phone {

    public synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ------sendSMS
    ------sendEmail
  • 新增普通的hello方法,是先打短信还是hello

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    class Phone {

    public synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }

    public void getHello(){
    System.out.println("---Hello---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone.getHello();
    }, "BB").start();
    }
    }

    1
    2
    3
    结果:
    ---Hello---
    ---sendSMS---
  • 现在有两部手机,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class Phone {

    public synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    Phone phone2 = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone2.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ---sendMail---
    ---sendSMS---
  • 两个静态同步方法,1部手机,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    class Phone {

    public static synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public static synchronized void sendMail(){
    System.out.println("---sendMail---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ---sendSMS---
    ---sendMail---
  • 两个静态同步方法,2部手机,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class Phone {

    public static synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public static synchronized void sendMail(){
    System.out.println("---sendMail---");
    }
    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    Phone phone2 = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone2.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ---sendSMS---
    ---sendMail---
  • 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class Phone {

    public static synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }

    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ---sendMail---
    ---sendSMS---
  • 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    class Phone {

    public static synchronized void sendSMS() throws InterruptedException {
    // 等待4秒
    TimeUnit.SECONDS.sleep(4);
    System.out.println("---sendSMS---");
    }

    public synchronized void sendMail(){
    System.out.println("---sendMail---");
    }

    }

    public class Look8 {

    public static void main(String[] args) throws InterruptedException {
    Phone phone = new Phone();
    Phone phone2 = new Phone();
    new Thread(() -> {
    try {
    phone.sendSMS();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "AA").start();

    Thread.sleep(100);

    new Thread(() -> {
    phone2.sendMail();
    }, "BB").start();
    }
    }
    1
    2
    3
    结果:
    ---sendMail---
    ---sendSMS---

公平锁和非公平锁

公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。

  • 优点:所有的线程都能得到资源,不会饿死在队列中。
  • 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。

非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。

  • 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
  • 缺点:你们可能也发现了,这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。
1
2
3
4
5
// 公平锁
private final ReentrantLock lock = new ReentrantLock(true);
// 非公平锁
private final ReentrantLock lock = new ReentrantLock(false);
private final ReentrantLock lock = new ReentrantLock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

公平锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 锁是否被占用,0:否
int c = getState();
if (c == 0) {
// 等待队列是否为空或自己的位置是否位于队列头
if (!hasQueuedPredecessors() &&
// CSA设置锁状态是否成功
compareAndSetState(0, acquires)) {
// 设置该线程为独占线程所有者
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

/**
* 等待队列是否为空或自己的位置是否位于队列头
*/
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}

非公平锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 锁是否没被占用
if (c == 0) {
// 没被占用就直接尝试获取锁,不会判断是否位于队首或队列为空
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

可重入锁

什么是可重入,就是说某个线程获得某个锁,可以再次获取锁而不会出现死锁。synchronizd(隐式)和Lock(显示)都是可重入锁。

synchronized不需要手动释放锁,但Lock需要手动释放锁,并且加锁和释放锁得次数必须一致,虽然不影响当前线程,但会导致其他线程获取锁失败。

synchronized:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SynchronizedLock {

public static void main(String[] args) {
Object o = new Object();
new Thread(() -> {
synchronized (o) {
System.out.println("外层");
synchronized (o) {
System.out.println("中层");
synchronized (o) {
System.out.println("内层");
}
}
}
}, "AA").start();
}
}

Lock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RenentrantLockDemo {

public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println("外层");
try {
lock.lock();
System.out.println("中层");
try {
lock.lock();
System.out.println("内层");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
System.out.println("aaa");
} finally {
lock.unlock();
}
}, "AA").start();
}
}

死锁

两个或两个以上进程在执行过程中,因为争夺资源而造成一种相互等待得现象,如果没有外力干涉,他们无法再执行下去。

死锁造成得原因:

  • 系统资源不足
  • 进程推进顺序不正确
  • 资源分配不当

当A线程持有锁a,尝试获取锁b;当B线程持有锁b,尝试获取锁a;谁都不放手,便造成死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Deadlock {

public static void main(String[] args) {
Object a = new Object();
Object b = new Object();

new Thread(() -> {
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "持有锁a,尝试获取锁b");
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "获取到锁b");
}
}
}, "A").start();

new Thread(() -> {
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "持有锁b,尝试获取锁a");
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "获取到锁a");
}
}
}, "A").start();
}

}

如何验证死锁:

(1) jps

1
2
3
4
5
6
7
$ jps -l

7220 cn.ysliu.reentrantlock.Deadlock
5320
9112 jdk.jcmd/sun.tools.jps.Jps
14348 org.jetbrains.jps.cmdline.Launcher

(2) jstack 进程号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ jstack 7220

Found one Java-level deadlock:
=============================
"A":
waiting to lock monitor 0x0000027de0fd3700 (object 0x0000000088797b60, a java.lang.Object),
which is held by "A"
"A":
waiting to lock monitor 0x0000027de04c4700 (object 0x0000000088797b50, a java.lang.Object),
which is held by "A"

Java stack information for the threads listed above:
===================================================
"A":
at cn.ysliu.reentrantlock.Deadlock.lambda$main$0(Deadlock.java:13)
- waiting to lock <0x0000000088797b60> (a java.lang.Object)
- locked <0x0000000088797b50> (a java.lang.Object)
at cn.ysliu.reentrantlock.Deadlock$$Lambda$14/0x0000000100066840.run(Unknown Source)
at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)
"A":
at cn.ysliu.reentrantlock.Deadlock.lambda$main$1(Deadlock.java:22)
- waiting to lock <0x0000000088797b50> (a java.lang.Object)
- locked <0x0000000088797b60> (a java.lang.Object)
at cn.ysliu.reentrantlock.Deadlock$$Lambda$15/0x0000000100066c40.run(Unknown Source)
at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)

Found 1 deadlock.

Callable

创建线程的方式:

  • 继承Thread
  • 实现Runnable:当线程终止时,无法获取线程的返回结果。
  • 实现Callable:jdk1.5以后,支持线程终止时,获取返回结果。
  • 线程池

Runnable和Callable对比:

区别 Callable Runnable
方法签名 V call() throws Exception; public abstract void run();
是否有返回值
是否能抛出异常
创建线程方式 call run

Runnable创建线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 1.实现Runnable
class MyThread implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "Exec!");
}
}

public class CallableDemo {

public static void main(String[] args) {
// 2.使用Runnable创建线程
new Thread(new MyThread()).start();
}

}

Callable创建线程:

Thread不能直接通过Callable直接创建线程,只能找一个既和Runnable有关系,又和Callable有关系的类。

image-20210822095516366

FutureTask实现接口Runnable,FutureTask构造方法支持传递Callable,通过FutureTask作为中间桥梁创建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 1.实现Callable接口
class MyThread2 implements Callable {

@Override
public Integer call() throws Exception {
return 200;
}
}

public class CallableDemo {

public static void main(String[] args) {

// Callable创建线程, 错误的创建方式、
// new Thread(new MyThread2()).start();

// Callable创建线程, 正确的创建方式
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
new Thread(futureTask, "BB").start();
// 获取Future执行结果
System.out.println(futureTask.get());

// lambda 表达式创建FutureTask
FutureTask<Integer> futureTask1 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " Exec!");
return 2048;
});
new Thread(futureTask1, "CC").start();
// 获取Future执行结果
System.out.println(futureTask1.get());
}

}
1
2
3
4
// 获取Future执行结果
public V get() throws InterruptedException, ExecutionException
// 线程是否执行结束
public boolean isDone()

JUC强大的辅助类

减少计数 CountDownLatch

通过构造函数设置线程数,通过await()阻塞当前线程,当所有线程执行结束(count减为0时),唤醒当前线程

1
2
3
4
5
6
7
8
// 初始化一个线程数
public CountDownLatch(int count)
// 使初始化的线程数减一,当计数器减为0时,释放所有等待的线程
public void countDown()
// 使当前线程阻塞,直到初始化的线程数为0,唤醒当前线程
public void await() throws InterruptedException
// 使当前线程阻塞,并设置一个超时时间,直到初始化的线程数为0,唤醒当前线程
public boolean await(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CountDownLatchDemo {

public static void main(String[] args) throws InterruptedException {
// 通过构造函数设置线程数,通过await()阻塞当前线程,当所有线程执行结束(count减为0时),唤醒当前线程
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "号同学离开了教室!");
// 执行完一个线程任务就将count数手动减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}

// 等待countDownLatch数归零
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长把教室门锁了!");
}
}

循环栅栏CyclicBarrier

该类是一个同步辅助类,允许一组线程相互等待,直到达到某个公共屏障点,在设置的一组固定大小的程序中,这些线程必须相互等待,CyclicBarrier在释放等待线程后可以重用。

1
public CyclicBarrier(int parties, Runnable barrierAction)

常用该构造方法创建一个CyclicBarrier,他将在给定数量的参与者(线程)处于等待状态时启动,并在启动CyclicBarrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CyclicBarrierDemo {

// 设置固定值
private static final int NUMBER = 7;

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("*******集齐七颗龙珠召唤神龙******");
});

for (int i = 0; i < NUMBER; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "星龙珠被集齐!");
// 直到等待的线程数达到
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "星龙珠被使用!");
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}

}

CountDownLatch是一次性的,CyclicBarrier是可循环利用的。

CountDownLatch参与线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。

Cyclic参与者的线程职责是一样的。

信号灯Semaphore

一个计数信号量,从概念上讲,信号量维护了一个许可集,使用acquire()获取一个许可对象,在许可可用前会阻塞每一个将要获取许可的对象,只有获取,才能往后执行。release()释放一个许可,供正在等待许可的线程使用。

Semaphore有两种模式,公平模式和非公平模式。

1
2
3
4
5
6
7
8
9
// 非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// 设置非公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
  • 公平模式

    按照调用acquire()的顺序来获取许可,遵循FIFO。

  • 非公平模式

    抢占式,哪个正在等待的线程抢到了许可证就可以往后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 三个停车位,6辆车,获取到凭证的车可以停车
*/
public class SemaphoreDemo {

private static Semaphore semaohore = new Semaphore(3);

public static void main(String[] args) {
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 获取凭证,获取不到凭证将阻塞
semaohore.acquire();
System.out.println(Thread.currentThread().getName() + "抢占了车位!");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "--------离开了车位!");
// 释放凭证
semaohore.release();
}
}, String.valueOf(i)).start();
}
}
}

读写锁ReentrantReadWriteLock

  • ReadWriteLock同Lock一样,是一个接口,提供readLock和writeLock两种锁的操作机制,一个是只读的锁,一个是写锁,ReentrantReadWriteLock是其实现。
  • 读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的(排他的)。每次只能有一个写线程,但是可以同时拥有多个线程并发读取数据。
  • 所有读写锁的实现必须保证写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。
  • 理论上,读写锁比互斥锁允许对于共享数据更大程度的并发。与互斥锁相比,读写锁是否能够提高性能取决于读写数据的频率,读取和写入操作的持续时间、以及读线程与写线程之间的竞争。

使用场景:读多写少可以使用读写锁。

互斥原则:

  • 读-读能共存(即可以用多个线程同时读)
  • 读-写不能共存(即读的时候不能有其他线程去修改,或者修改的时候不能有其他线程去读)
  • 写-写不能共存(即修改的时候不能再有其他线程去修改)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class MyCache {

// 创建集合map
private volatile Map<String, Object> map = new HashMap<>();
// 创建读写锁
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

// 写数据
public void put(String key, Object value) {
// 使用写锁上锁
readWriteLock.writeLock().lock();
try {
// 正在写数据
System.out.println(Thread.currentThread().getName() + "> 正在写数据操作, key:" + key);
TimeUnit.MICROSECONDS.sleep(300);
map.put(key, value);
// 写完了
System.out.println(Thread.currentThread().getName() + "> 数据写完了, key:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用写锁解锁
readWriteLock.writeLock().unlock();
}
}

// 读数据
public Object get(String key) {
Object value = null;
// 使用读锁上锁
readWriteLock.readLock().lock();
try {
// 正在读数据
System.out.println(Thread.currentThread().getName() + "< 正在读数据操作, key:" + key);
TimeUnit.MICROSECONDS.sleep(300);
value = map.get(key);
// 读完了
System.out.println(Thread.currentThread().getName() + "< 数据读完了, key:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用读锁解锁
readWriteLock.readLock().unlock();
}
return value;
}


}

public class ReadWriteLockDemo {

public static void main(String[] args) {
MyCache myCache = new MyCache();

// 创建线程放数据
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> myCache.put(String.valueOf(finalI), String.valueOf(finalI)), "Writer").start();
}

// 创建线程读数据
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> myCache.get(String.valueOf(finalI)), "Writer").start();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
输出:
Writer> 正在写数据操作, key:2
Writer> 数据写完了, key:2
Writer> 正在写数据操作, key:0
Writer> 数据写完了, key:0
Writer> 正在写数据操作, key:1
Writer> 数据写完了, key:1
Writer> 正在写数据操作, key:3
Writer> 数据写完了, key:3
Writer> 正在写数据操作, key:4
Writer> 数据写完了, key:4
Writer< 正在读数据操作, key:1
Writer< 正在读数据操作, key:0
Writer< 正在读数据操作, key:3
Writer< 正在读数据操作, key:4
Writer< 正在读数据操作, key:2
Writer< 数据读完了, key:1
Writer< 数据读完了, key:0
Writer< 数据读完了, key:4
Writer< 数据读完了, key:3
Writer< 数据读完了, key:2

写锁锁降级:

1
步骤:获取写锁 > 获取读锁 > 释放写锁 > 释放读锁

个人理解,由于写锁不支持其他线程的读操作,读锁允许其他线程读操作。

锁降级解决长时间写锁独占,这对于某些要求高响应的应用是不允许的。所以在完成写操作完成后,确保读操作能获取到锁,允许响应其他线程的读操作,锁降级的前提是所有线程都希望对数据变化敏感,而不是一直被写线程独占锁,以至于所有写操作完成其他读线程才能读,只要写完一个就能读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WriteLockDownDemo {

public static void main(String[] args) {
// 可重入读写锁对象
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 读锁
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
// 写锁
ReentrantReadWriteLock.ReadLock writeLock = readWriteLock.readLock();

// 锁降级(写降级)
// 1.获取写锁
writeLock.lock();
System.out.println("正在写");
// 2.获取读锁
readLock.lock();
System.out.println("正在读");
// 3.释放写锁
writeLock.lock();
System.out.println("写完了");
// 4.释放读锁
writeLock.lock();
System.out.println("读完了");
}

}

阻塞队列BlockingQueue

阻塞队列是共享队列(多线程操作),一个线程放数据,另外一个线程取数据

  • 当队列是空的,从队列中获取元素的操作将会被阻塞,直到其他线程往队列中插入新的元素
  • 当队列是慢满的,多队列中添加元素的操作将会被阻塞,直到其他线程移除队列中的元素

种类

  • ArrayBlockingQueue

    基于数组结构组成的有界阻塞队列,在生产者放入数据和消费者获数据,都是共用同一个锁对象,无法并行

  • LinkedBlockingQueue

    由链表结构组成的有界(大小默认为Integer.MAX_VALUE)阻塞队列,之所以能够搞笑的处理并发数据,因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能

  • DelayQueue

    使用优先级队列实现的延迟无阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取到数据的操作(消费者)才会被阻塞。

  • PriorityBlockingQueue

    支持优先级排序的无界阻塞队列,不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

  • SynchronousQueue

    一种无缓冲的等待队列,相对于有缓冲的的BlockingQueue来说少了缓冲区,不储存元素的阻塞队列,即单个元素的阻塞队列。

    声明SynchronousQueue有两种不同的方式,他们之间有着不太一样的行为。

    公平模式:

    采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而使整体公平的策略

    非公平模式(默认):

    采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者。如果生产者和消费者的处理速度有差距,很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远永远都得不到处理

  • LinkedTransferQueue

    由链表结构组成的无界阻塞队列

    预占模式,消费者线程取元素时,如果队列不为空则直接取走数据,若队列为空,生成一个节点(null)入队,消费者被等待在这个节点上,生产者线程入队时发现有一个元素为空的节点,生产者线程就不入队了,直接将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回

  • LinkedBlockingDeueu

    由链表结构组成的双向阻塞队列

    阻塞有两种情况:

    • 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空位置时再插入该元素,该操作可以通过设置超时参数,超时后返回false表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException异常
    • 都取元素时:如果当前队列为空会阻塞,直到队列不为空返回元素,同样可以通过设置超时参数

方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
  • 抛出异常:

    当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:”Queue full”

    当阻塞队列为空时,再往队列里remove移除元素会抛出NoSuchElementException

  • 特殊值:

    插入方法,成功true,失败false

    移除方法,成功返回队列的元素,队列里没有就返回null

  • 一直阻塞:

    当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据或响应中断推出

    当阻塞队列为空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

  • 超时退出:

    当阻塞队列满时,队列会阻塞生产者线程一定时间,超出时限后生产者线程会退出

线程池

连接池:连接池是创建和管理一个连接的缓冲池的计数,这写连接准备好被任何需要他们的线程使用。

线程池:一种线程的使用模式。线程过多会带来调度开销,进而影响缓存局部性能和整体性能。而线程池维护着多个线程,等待着监督管理者分配可执行的任务。为了避免在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

特点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以统一分配,调优和监控。

整体架构

image-20210825230956538

Executors是创建线程池的工具类。

种类与创建

线程工具类 (Exectours) 创建(不推荐)

1)FixThreadPool和SingleThread Pool

允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

2)CachedThreadPool和ScheduledThreadPool

允许的创建线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

  • 一池一线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 1.一池一线程
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    try {
    for (int i = 0; i < 10; i++) {
    executorService.execute(() -> {
    System.out.println(Thread.currentThread().getName() + ": 办理业务!");
    });
    }
    } catch (Exception e){
    e.printStackTrace();
    }finally {
    executorService.shutdown();
    }

    out:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
    pool-1-thread-1: 办理业务!
  • 一池多线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 2.一池多线程
    ExecutorService executorService1 = Executors.newFixedThreadPool(5);

    try {
    for (int i = 0; i < 10; i++) {
    executorService1.execute(() -> {
    System.out.println(Thread.currentThread().getName() + ": 办理业务!");
    });
    }
    } catch (Exception e){
    e.printStackTrace();
    }finally {
    executorService1.shutdown();
    }

    out:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    pool-2-thread-1: 办理业务!
    pool-2-thread-1: 办理业务!
    pool-2-thread-3: 办理业务!
    pool-2-thread-3: 办理业务!
    pool-2-thread-4: 办理业务!
    pool-2-thread-4: 办理业务!
    pool-2-thread-5: 办理业务!
    pool-2-thread-3: 办理业务!
    pool-2-thread-1: 办理业务!
    pool-2-thread-2: 办理业务!
  • 一池可扩容线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 3.一池可扩容线程
    ExecutorService executorService2 = Executors.newCachedThreadPool();

    try {
    for (int i = 0; i < 10; i++) {
    executorService2.execute(() -> {
    System.out.println(Thread.currentThread().getName() + ": 办理业务!");
    });
    }
    } catch (Exception e){
    e.printStackTrace();
    }finally {
    executorService2.shutdown();
    }

    out:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    pool-3-thread-4: 办理业务!
    pool-3-thread-5: 办理业务!
    pool-3-thread-3: 办理业务!
    pool-3-thread-9: 办理业务!
    pool-3-thread-1: 办理业务!
    pool-3-thread-6: 办理业务!
    pool-3-thread-2: 办理业务!
    pool-3-thread-8: 办理业务!
    pool-3-thread-10: 办理业务!
    pool-3-thread-7: 办理业务!

自定义线程池创建(推荐)

创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadPoolDemo2 {

public static void main(String[] args) {
// 自定义线程池创建
ExecutorService threadPool = new ThreadPoolExecutor(
// 常驻线程数量(核心)
2,
// 最大线程数量
5,
// 线程存活时间
2,
// 线程存活时间单位
TimeUnit.SECONDS,
// 阻塞队列
new ArrayBlockingQueue<>(3),
// 线程工厂,用于创建线程
Executors.defaultThreadFactory(),
// 拒绝策略(线程满了的处理策略)
new ThreadPoolExecutor.AbortPolicy()
);

try{
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + ": 处理业务!");
});
}
}catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 超出拒绝策略的限制抛出异常
java.util.concurrent.RejectedExecutionException: Task cn.ysliu.threadpool.ThreadPoolDemo2$$Lambda$14/0x0000000100066840@96532d6 rejected from java.util.concurrent.ThreadPoolExecutor@3796751b[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at cn.ysliu.threadpool.ThreadPoolDemo2.main(ThreadPoolDemo2.java:32)
pool-1-thread-5: 处理业务!
pool-1-thread-2: 处理业务!
pool-1-thread-5: 处理业务!
pool-1-thread-5: 处理业务!
pool-1-thread-5: 处理业务!
pool-1-thread-4: 处理业务!
pool-1-thread-3: 处理业务!
pool-1-thread-1: 处理业务!

底层原理

线程的创建离不开new ThreadPoolExecutor

参数介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ThreadPoolExecutor(
// 常驻线程数量(核心)
int corePoolSize,
// 最大线程数量
int maximumPoolSize,
// 线程存活时间
long keepAliveTime,
// 线程存活时间单位
TimeUnit unit,
// 阻塞队列
BlockingQueue<Runnable> workQueue,
// 线程工厂,用于创建线程
ThreadFactory threadFactory,
// 拒绝策略(线程满了的处理方法)
RejectedExecutionHandler handler)

工作流程

  • 在执行创建对象的时候不创建线程。
  • 执行execute()时创建线程。
  • 先到常驻线程,满了之后再到阻塞队列等待,阻塞队列满了之后往外扩容线程,扩容线程不能大于最大线程数。大于最大线程数和阻塞队列之后,会执行拒绝策略。

线程池工作原理

拒绝策略:

  • AbortPolicy抛异常

    直接抛出RejectedExecutionException异常阻止系统正常运行

  • CallRunsPolicy谁调用找谁。

    “调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务退回到调用者,从而降低新任务的流量。

  • DiscardOldestPolicy抛弃最久执行当前

    抛弃队列中等待最久的任务,然后把当前任务加入到队列中尝试再次提交当前任务。

  • DiscardPolicy不理不问

    该策略默默地抛弃无法处理的任务,不予任何处理也不抛出异常,如果允许丢失,这是最好的策略。

Fork与Join分支

将一个大的任务拆分成多个子任务并进行处理,最后将子任务结果合并并合成最后的计算结果。

  • ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供的两个子类:

    RecursiveAction:用于返回没有结果的任务。

    RecursiveTask:用于有返回结果的任务。

  • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行

  • RecursiveTask:继承后可以实现递归(自己调用自己)调用任务

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MyTask extends RecursiveTask<Integer> {

private static final Integer DIFF = 10;
private int start;
private int end;

public MyTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= DIFF) {
return IntStream.rangeClosed(start, end).sum();
} else {
int middle = (start + end) / 2;
MyTask myTaskA = new MyTask(start, middle);
// 调用方法拆分
myTaskA.fork();
MyTask myTaskB = new MyTask(middle + 1, end);
// 调用方法拆分
myTaskB.fork();
// 合并结果
return myTaskA.join() + myTaskB.join();
}
}
}

public class ForkJoin {

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(new MyTask(0, 100));
Integer result = forkJoinTask.get();
System.out.println("计算结果:" + result);
}
}

out:

1
计算结果:5050

异步回调

CompletableFeature在Java里被用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并通过回调可以在主线程中得到异步任务的执行状态,是否完成和是否有异常等信息。

image-20210831221946574

CompletableFuture实现了Future,CompletionStage接口,实现Future接口就可以兼容现在有的线程池框架,而CompletionStage接口才是异步编程的抽象接口,里面有多种异步方法,从而打造出强大的CompletableFuture类。

异步调用没有返回值方法

1
2
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

异步调用有返回值方法

1
2
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,无返回值
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() ->
System.out.println(Thread.currentThread().getName() + ": 无返回值"));
voidCompletableFuture.get();

// 异步调用, 有返回值
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int result = 100;
System.out.println(Thread.currentThread().getName() + "-返回结果: " + result);
return result;
});
Integer r = completableFuture.whenComplete((result, exception) -> {
System.out.println("whenComplete-异步调用结果: " + result);
System.out.println("whenComplete-异步调用异常: " + exception);
}).get();
System.out.println("异步调用结果: " + r);
}

}

out:

1
2
3
4
5
ForkJoinPool.commonPool-worker-3: 无返回值
ForkJoinPool.commonPool-worker-3-返回结果: 100
whenComplete-异步调用结果: 100
whenComplete-异步调用异常: null
异步调用结果: 100