多线程-JUC中的同步工具

前言

相信JUC和AQS都是大家如雷贯耳、耳熟能详的词儿了。

JUC是大名鼎鼎的java.util.concurrent包的缩写,一看名字就很牛,java提供的并发工具集。而AQS(Abstract Queued Synchronizer)则是JUC包中许多同步工具实现的基础。

我们先来看看JUC包中有哪些东西:

  • atomic原子类:著名的基于CAS的原子操作类。
  • 并发容器:包括ConcurrentMap、ConcurrentSkipListSet、CopyOnWriteArraySet、ConcurrentLinkedQueue、Dequeue等等。而创建线程池的一个重要参数:workQueue,一般就出自这里。
  • 线程池:
    • 异步机制常用的Future、Callable
    • 线程池ThreadPoolExecutor以及四种预定义的线程池SingleThreadExecutor、CachedThreaadPool、FixedThreadPool、SecheduledThreadPool
  • 同步工具
    • AQS
    • Lock:ReentrantLock、ReadWriteLock、LockSupport
    • 其他工具:CountDownLatch、CyclicBarrier、Phaser、Semaphore、Exchanger

几乎囊括了所有和多线程相关的内容。

本文我们先来总结一下JUC中的同步工具。

LockSupport

LockSupport中提供的方法不多,且都是以静态方法提供:

1
2
3
4
5
6
7
8
public static void unpark(Thread thread);
public static void park(Object blocker);
public static void parkNanos(Object blocker, long nanos);
public static void parkUntil(Object blocker, long deadline);
public static Object getBlocker(Thread t);
public static void park();
public static void parkNanos(long nanos);
public static void parkUntil(long deadline);

说白了就是将Unsafe类中的 park()unpark() 方法包装了一下,提供了挂起和唤醒线程的方法。

为啥要包装呢?

因为Unsafe类并不是普通应用代码可以直接调用的,看下Unsafe的源码:

1
2
3
4
5
6
7
8
9
10
11
private Unsafe() {}

private static final Unsafe theUnsafe = new Unsafe();

@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}

可以看到:

  1. Unsafe是单例的,外界无法通过构造方法直接构造Unsafe对象,需要调用 getUnsafe() 方法获取。
  2. getUnsafe() 方法中是有条件的,调用者必须是 Bootstrap类加载器 加载的类(比如AQS),否则会报Unsafe异常,显然我们写的普通代码是无法调用的。

因此,一般情况下,我们写的代码是无法直接调用Unsafe类中的方法的。

当然,也不是完全没有办法,比如我们可以通过反射,获取 theUnsafe 方法:

1
2
3
final Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);

但这毕竟不是官方推荐的方法,因此,LockSupport便将 park()unpark() 方法包装了一下,提供给普通应用程序使用。

Lock

ReentrantLock

通过类名就能看出来,这是一把可重入锁。

常用方法

1
2
3
4
5
6
7
8
9
10
/* 构造方法 */
public ReentrantLock(boolean fair);
public ReentrantLock();
/* 加锁 */
public void lock();
public void lockInterruptibly();
public boolean tryLock();
public boolean tryLock(long timeout, TimeUnit unit);
/* 解锁 */
public void unlock();

ReentrantReadWriteLock

其他同步工具

CountDownLatch

从类名来看,这是个向下计数的门栓。初始化时,传入计数初始值,调用await()的线程挂起等待,其他线程每调用一次countDown(),计数值减一,当计数值减到0,调用await()的方法被唤醒继续执行。

常用方法

1
2
3
4
5
6
7
8
/* 构造方法 */
public CountDownLatch(int count);
/* 等待 */
public void await();
public boolean await(long timeout, TimeUnit unit);
/* 计数相关 */
public void countDown();
public long getCount();

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class T06_TestCountDownLatch {
public static void main(String[] args) {
Thread[] threads = new Thread[10];
CountDownLatch latch = new CountDownLatch(threads.length);

for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
try {
Thread.sleep((new Random().nextInt(9) + 1) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
System.out.println(Thread.currentThread().getName() + " count down.");
}, "T" + i);
}

for (Thread thread : threads) {
thread.start();
}

System.out.println("Waiting latch...");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End latch");
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
Waiting latch...
T5 count down.
T7 count down.
T4 count down.
T6 count down.
T1 count down.
T3 count down.
T9 count down.
T8 count down.
T2 count down.
T0 count down.
End latch

CyclicBarrier

CyclicBarrier是个计数拦截器,创建CyclicBarrier时,需要传入计数初始值。每个线程调用wait()方法,将线程挂起。直到调用wait()的线程数达到计数值,所有线程一起唤醒。

常用方法

1
2
3
4
5
6
/* 构造方法 */
public CyclicBarrier(int parties, Runnable barrierAction);
public CyclicBarrier(int parties);
/* 等待 */
public int await();
public int await(long timeout, TimeUnit unit);

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(10, () -> System.out.println("满人"));

for(int i = 0; i < 10; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " waiting...");
barrier.await();
System.out.println(Thread.currentThread().getName() + " finish.");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread-0 waiting...
Thread-4 waiting...
Thread-5 waiting...
Thread-3 waiting...
Thread-6 waiting...
Thread-2 waiting...
Thread-1 waiting...
Thread-8 waiting...
Thread-7 waiting...
Thread-9 waiting...
满人
Thread-6 finish.
Thread-0 finish.
Thread-7 finish.
Thread-5 finish.
Thread-8 finish.
Thread-3 finish.
Thread-1 finish.
Thread-2 finish.
Thread-4 finish.
Thread-9 finish.

Phaser

常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 构造方法 */
public Phaser();
public Phaser(int parties);
public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);

public int register();
public int bulkRegister(int parties);

public int arrive();
public int arriveAndDeregister();
public int arriveAndAwaitAdvance();

public int awaitAdvance(int phase);
public int awaitAdvanceInterruptibly(int phase);
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);

Semaphore

信号量,和锁的区别是,锁只能允许一个线程进入同步代码段,但是信号量可以允许多个线程进入同步代码段。

常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* 构造方法 */
public Semaphore(int permits);
public Semaphore(int permits, boolean fair);
/* 获取单个许可 */
public void acquire();
public void acquireUninterruptibly();
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);
/* 释放单个许可 */
public void release();
/* 获取多个许可 */
public void acquire(int permits);
public void acquireUninterruptibly(int permits);
public boolean tryAcquire(int permits);
public boolean tryAcquire(int permits, long timeout, TimeUnit unit);
/* 释放多个许可 */
public void release(int permits);

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestSemaphore {

/* 允许两个线程同时运行 */
static Semaphore s = new Semaphore(2, true);

static class MyRunnable implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
s.acquire();
System.out.println(name + " running...");
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(name + " finish.");
s.release();
}
}
}

public static void main(String[] args) {
new Thread(new MyRunnable(), "T1").start();
new Thread(new MyRunnable(), "T2").start();
new Thread(new MyRunnable(), "T3").start();
new Thread(new MyRunnable(), "T4").start();
new Thread(new MyRunnable(), "T5").start();
new Thread(new MyRunnable(), "T6").start();
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
T1 running...
T2 running...
T2 finish.
T1 finish.
T3 running...
T4 running...
T3 finish.
T4 finish.
T5 running...
T6 running...
T5 finish.
T6 finish.

Exchanger

Exchanger就如其类名,用于线程之间交换数据。

常用方法

1
2
3
4
5
/* 构造方法 */
public Exchanger();
/* 交换数据 */
public V exchange(V x);
public V exchange(V x, long timeout, TimeUnit unit);

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TestExchanger {

static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
new Thread(()->{
String s = "T1";
System.out.println("t1 ready to exchange...");
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": " + s);

}, "t1").start();


new Thread(()->{
String s = "T2";
System.out.println("t2 waiting 3 sec...");
try {
Thread.sleep(3000);
System.out.println("t2 ready to exchange...");
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": " + s);

}, "t2").start();
}
}

输出:

1
2
3
4
5
t1 ready to exchange...
t2 waiting 3 sec...
t2 ready to exchange...
t2: T1
t1: T2

实现