JAVA为我们提供了一些效果非常不错的并发工具类,这里主要介绍一下如下几个工具类的使用,并不会去深究实现原理(其实原理都是通过自旋CAS,CAS对应的处理器原子操作指令是CMPXCHG)。
Semaphore
CountDownLatch
CyclicBarrier
Executors
设定信号量,只有获得信号量的线程才能够往后执行业务逻辑,没有获得信号量的线程只能阻塞等待唤醒重新尝试获得信号量,可用于服务限流。
/**
* Semaphore作用:设定信号量,只有获得信号量的线程才能够往后执行业务逻辑,
* 没有获得信号量的线程只能阻塞等待唤醒重新尝试获得信号量,可用于服务限流
* @author 爱吃鱼的乌贼
*/
public class SemaphoreTest {
public static void main(String[] args) {
//设定信号量,只允许2个线程同时处理
Semaphore semaphore = new Semaphore(2);
for(int i=0;i<10;i++) {
new Thread(new Runnable() {
public void run() {
try {
//获取信号量
semaphore.acquire();
System.out.println("Thread:"+Thread.currentThread().getId()+"获得信号量"+new Date());
Thread.sleep(5000);
System.out.println("Thread:"+Thread.currentThread().getId()+"释放信号量"+new Date());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
能够使一个线程等待其他线程完成各自的工作后再执行
/**
* CountDownLatch 能够使一个线程等待其他线程完成各自的工作后再执行
* @author 爱吃鱼的乌贼
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Task1(countDownLatch).start();
new Task2(countDownLatch).start();
countDownLatch.await();
System.out.println("起锅烧油");
}
public static class Task1 extends Thread{
CountDownLatch countDownLatch;
public Task1(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("正在洗菜中....");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("菜已洗好!");
countDownLatch.countDown();
}
}
public static class Task2 extends Thread{
CountDownLatch countDownLatch;
public Task2(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("正在煮饭中....");
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("饭已煮好!");
countDownLatch.countDown();
}
}
}
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
/**
* CyclicBarrier 栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
* 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
* @author 爱吃鱼的乌贼
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier =new CyclicBarrier(10);
for(int i=0;i<10;i++) {
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(new Random().nextInt(1000)+1000);
System.out.println("Thread:"+Thread.currentThread().getId()+"已经准备好");
cyclicBarrier.await();
System.out.println("Thread:"+Thread.currentThread().getId()+"开始出发");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
/**
* Executors:主要用来创建线程池,代理了线程池的创建,使得你的创建入口参数变得简单
* @author 爱吃鱼的乌贼
*/
public class ExecutorsTest {
public static void main(String[] args) {
// 1、newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
// 这里会发现结果有重复的线程ID
System.out.println("Thread:" + Thread.currentThread().getId() + ";newCachedThreadPool");
}
});
}
// 2、newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 这里会发现两个两个执行完后才继续执行,并且线程ID不变
System.out.println("Thread:" + Thread.currentThread().getId() + ";newFixedThreadPool");
}
});
}
// 3、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
// 延迟3秒钟后执行任务
newScheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("运行时间: " + new Date());
}
}, 3, TimeUnit.SECONDS);
// 延迟1秒钟后每隔3秒执行一次任务
newScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("运行时间: " + new Date());
}
}, 1, 3, TimeUnit.SECONDS);
// 4、newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
newSingleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
// 这里会发现结果有重复的线程ID
System.out.println("Thread:" + Thread.currentThread().getId() + ";newSingleThreadExecutor");
}
});
}
}
}