在 JDK1.5 后,推出了几个并发的工具类,位于 JUC(JAVA.util.concurrent)包下。
CountDownLatch 类是使一个线程等待其他线程各自执行完毕后再执行。
类似于现实中某个活动需要等到全部人齐了才可以开始。
实现原理:
从ReentrantLock的实现看AQS的原理及应用
package test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test1 {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2); // 计数器初始化为 2,要等两个线程执行完毕
System.out.println("主线程开始执行");
ExecutorService es1 = Executors.newSingleThreadExecutor();
es1.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("子线程:" + Thread.currentThread().getName() + "执行");
}catch (InterruptedException e){
e.printStackTrace();
}
latch.countDown(); // 使计数器减一
}
});
ExecutorService es2 = Executors.newSingleThreadExecutor();
es2.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("子线程:" + Thread.currentThread().getName() + "执行");
latch.countDown();
}
});
System.out.println("等待两个线程执行完毕");
try {
latch.await(); // 主线程挂起,等待两个线程执行完
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("两个子线程都执行完毕,继续执行主线程");
}
}
主线程开始执行
等待两个线程执行完毕
子线程:pool-2-thread-1执行
子线程:pool-1-thread-1执行
两个子线程都执行完毕,继续执行主线程
与 CountDownLatch 功能一样,不过它可以重复循环,而 CountDownLatch 只能执行一次。
实现原理:
//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
//静态内部类Generation
private static class Generation {
boolean broken = false;
package test;
import java.util.concurrent.CyclicBarrier;
public class Test2 {
static class TaskThread extends Thread{
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run(){
try{
Thread.sleep(100);
System.out.println(getName() + "到达栅栏 A");
barrier.await(); // 等待所有线程都执行到这,才执行主线程
System.out.println(getName() + "冲破栅栏 A"); // 主线程完成后继续执行
Thread.sleep(2000);
System.out.println(getName() + "到达栅栏 B");
barrier.await();
System.out.println(getName() + "冲破栅栏 B");
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "完成任务");
}
});
for (int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
该类用于控制信号量的个数,可以控制同时访问资源的线程个数,并提供了同步机制。例如,实现一个文件允许的并发访问数。
Semaphore 的主要方法:
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3); // 创建 Semaphore 信号量,初始化许可大小为3
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
sp.acquire(); // 请求获取许可,如果有可获取许可,则继续往下指向,许可数减一。
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3 - sp.availablePermits()) + "个并发") ;
try{
Thread.sleep((long)(Math.random() * 10000));
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() + "即将离开");
sp.release(); // 释放许可证,许可数+1
}
};
service.execute(runnable);
}
}
}
这个类用于交换数据,只能用于两个线程。当一个线程运行到 exchange() 方法时会阻塞,另一个线程运行到 exchange() 时,两者交换数据,然后执行后面的程序。
package test;
import java.util.concurrent.Exchanger;
public class Test3 {
static class Producer extends Thread{ // 生产者线程
private Exchanger<Integer> exchanger; // 交换标志
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger){
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
for (int i = 1; i < 5; i++) {
try {
Thread.sleep(1000);
data = i;
System.out.println(getName() + "交换前:" + data);
data = exchanger.exchange(data); // 将此 data 与 消费者的 data 进行交换
System.out.println(getName() + "交换后:" + data);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread{ // 消费者线程
private Exchanger<Integer> exchanger; // 交换标志
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger){
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
while(true){
data = 0;
System.out.println(getName() + "交换前:" + data);
try{
data = exchanger.exchange(data); // 将此 data 与生产者的 data 进行交换,因为先执行到这,会阻塞知道生产者执行到交换
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(getName() + "交换后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
Thread.sleep(7000);
}
}