您当前的位置:首页 > 电脑百科 > 程序开发 > 算法

Java内置条件队列应用,实现经典的生产者消费者算法

时间:2020-11-10 10:36:53  来源:  作者:

背景

“生产者和消费者模型” 是多线程通信的典型案例,本章节将利用前一节的锁和条件队列的知识,来实现一个完整的有界缓冲区,并创建多个线程访问该有界缓冲区,模拟生产者提供数据、消费者处理数据的过程,正文如下。

生产者消费者模型

生产者和消费者模型中,因为多个线程共享同一个缓冲区,所以就涉及到两个重要的通信约束:

  1. 缓冲区满的时候,生产者不能再添加数据,应该阻塞等待,直到缓冲区有空位;
  2. 缓冲区空的时候,消费者不能再获取数据,应该阻塞等待,直到有新的数据加入缓冲区。

要保证上述约束条件,可以用 sleep 空循环,也可以使用锁和条件队列。利用锁和条件队列实现的思路是,生产者和消费者有各自要等待的条件,一旦条件不满足,就阻塞在该条件队列上,直到另一个线程唤醒自己。

实现过程

缓冲区的 “满” 和 “空” 是两个条件,如果用内置锁,对缓冲区的操作由同一把锁保护,只能共用一个条件队列;如果使用显式锁,则可以定义两个条件队列。

这里我们就用内置锁和内置条件队列来实现一个通信模型中的共享缓冲区类。设计类图结构:

Java内置条件队列应用,实现经典的生产者消费者算法

 

抽象有界限缓存

首先,创建一抽象有界缓存类 ABoundedBuffer,提供插入和删除的基本实现。

/**
 * @title       :ABoundedBuffer
 * @description :有界缓存抽象类
 * @update      :2019-12-20 上午9:29:33
 * @author      :172.17.5.73
 * @version     :1.0.0
 * @since       :2019-12-20
 */
public abstract class ABoundedBuffer<V> {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;
	
	protected ABoundedBuffer(int capacity){
		this.buf = (V[]) new Object[capacity];
	}
	
	protected synchronized final void doPut(V v){
		buf[tail] = v;
		if(++tail==buf.length){
			tail = 0;
		}
		
		++count;
	}

	protected synchronized final V doTake(){
		V v = buf[head];
		buf[head] = null;
		if(++head==buf.length){
			head = 0;
		}
		--count;
		return v;
	}
	
	public synchronized final boolean isFull(){
		return count == buf.length;
	}
	
	public synchronized final boolean isEmpty(){
		return count==0;
	}
}

定义实现类

其次,利用内置条件队列,编写子类实现可阻塞的插入和删除操作。

插入操作,依赖的条件是缓存非满,当条件不满足时,调用 wait 方法挂起线程,一旦插入成功,说明缓存非空,则调用 notifyAll 方法唤醒等待非空的线程。

删除操作,依赖的条件是非空,当条件不满足时,同样挂起等待,一旦删除成功,说明缓存非满,唤起等待该条件的线程。

完整的源码为:

import JAVA.util.Date;

/**
 * 
 * @title       :InnerConditionQueue
 * @description :使用内置条件队列,实现简单的有界缓存
 *               通过对象的 wait 和 notify 来实现挂起
 *               锁对象是 this,调用 wait/notify 的对象是同一个对象。
 *               三元关系(锁、wait/notify、条件谓词)
 *               缺陷:
 *               线程从 wait 中被唤醒时,并不代码条件谓词为真,此时还是需要再判断条件。所以必须在循环中调用wait
 *               每次醒来时都判断谓词的真假。
 *               谓词:对客体的描述或说明(是什么、怎么样、做什么),描述客体的本质、关系、特性等的词项。
 * @update      :2019-12-20 下午4:18:06
 * @author      :172.17.5.73
 * @version     :1.0.0
 * @since       :2019-12-20
 */
public class InnerConditionQueue<V> extends ABoundedBuffer<V> {

	protected InnerConditionQueue(int capacity) {
		super(capacity);
	}

	public synchronized void put(V v) throws InterruptedException{
		while(isFull()){
			System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
			wait();
		}
		
		doPut(v);
		notifyAll();
	}
	
	public synchronized V take() throws InterruptedException{
		while(isEmpty()){
			System.out.println(new Date()+" buffer is empty thread wait:"+Thread.currentThread().getName());
			wait();
		}
		
		V v = doTake();
		//每当在等待一个条件时,一定要确保在条件谓词变为真时,通过某种方式发出通知
		notifyAll();
		System.out.println(new Date()+" "+Thread.currentThread().getName()+" take:"+v);
		return v;
	}
}

测试类

最后,编写测试代码,创建一个大小为 2 的缓冲区对象,同时启动三个线程执行插入操作,主线程执行四次消费操作。测试代码如下:

import java.util.Date;

public class Main {
	public static void main(String[] args) {
		final InnerConditionQueue<String> bu = new InnerConditionQueue<String>(2);
		
		Thread t1 = new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello1");
				} catch (InterruptedException execption) {
					System.out.println("intercetp1:"+Thread.currentThread().getName());
				}
			}
		});
		Thread t2 = new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello2");
				} catch (InterruptedException execption) {
					System.out.println("intercetp2:"+Thread.currentThread().getName());
				}
			}
		});
		Thread t3 =  new Thread(new Runnable(){
			@Override
			public void run() {
				try {
					bu.put("hello3");
					Thread.sleep(50000);
					bu.put("last one...");
				} catch (InterruptedException execption) {
					System.out.println("intercetp3:"+Thread.currentThread().getName());
				}
			}
		});
		
		t1.start();
		t2.start();
		t3.start();
		
		try {
			Thread.sleep(5000);
			bu.take();
			bu.take();
			bu.take();
			bu.take();
		} catch (InterruptedException execption) {
			execption.printStackTrace();
		}
		
		System.out.println(new Date()+" main over...");
	}
}

测试结果

执行结果:t3 的第一个 put 操作会因为缓存已满而阻塞,5 秒后主线程删除两个操作后,重新被唤醒。主线程的第四个 bu.take() 操作会因为缓存为空而阻塞,直到 t3 在 50 秒后重新插入"last one" 后被唤醒,操作结束。

Tue Dec 20 10:23:53 CST 2019 buffer is Full thread wait:Thread-2
Tue Dec 20 10:23:58 CST 2019 main take:hello1
Tue Dec 20 10:23:58 CST 2019 main take:hello2
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:23:58 CST 2019 main take:hello3
Tue Dec 20 10:23:58 CST 2019 buffer is empty thread wait:main
Tue Dec 20 10:24:48 CST 2019 main take:last one...
Tue Dec 20 10:24:48 CST 2019 main over...

启示录

我们的例子中,“非空” 和 “非满” 这两种条件关联着同一个条件队列,当一个线程由于其他线程调用了notifyAll 而被唤醒时,并不意味着它等待的条件已经为真了,这也是内置条件队列的局限所在。

所以代码中的加固措施是,使用循环判断条件是否发生,如果发生,则调用 wait 阻塞自己,等待其他线程唤醒:

while(isFull()){
	System.out.println(new Date()+" buffer is Full thread wait:"+Thread.currentThread().getName());
	wait();
}

同样的功能,Java 并发包中的 ArrayBlockingQueue 是使用 ReentrantLock 和 ObjectCondition 实现的可阻塞队列,为什么 JDK 使用显式锁和显式条件队列呢?

使用内置锁的局限性在于一把锁只有一个条件队列,而这里涉及到两种等待条件,所以使用 ReentrantLock 更合适,它可以关联多个条件队列,这样就可以巧妙地处理多条件的阻塞和唤醒了!



Tags:算法   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言Kafka 中有很多延时操作,比如对于耗时的网络请求(比如 Produce 是等待 ISR 副本复制成功)会被封装成 DelayOperation 进行延迟处理操作,防止阻塞 Kafka请求处理线程。Kafka...【详细内容】
2021-12-27  Tags: 算法  点击:(1)  评论:(0)  加入收藏
分稀疏重建和稠密重建两类:稀疏重建:使用RGB相机SLAMOrb-slam,Orb-slam2,orb-slam3:工程地址在: http://webdiis.unizar.es/~raulmur/orbslam/ DSO(Direct Sparse Odometry)因为...【详细内容】
2021-12-23  Tags: 算法  点击:(7)  评论:(0)  加入收藏
一、什么是冒泡排序1.1、文字描述冒泡排序是一种简单的排序算法。它重复地走访要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地...【详细内容】
2021-12-15  Tags: 算法  点击:(16)  评论:(0)  加入收藏
前面文章在谈论分布式唯一ID生成的时候,有提到雪花算法,这一次,我们详细点讲解,只讲它。SnowFlake算法据国家大气研究中心的查尔斯&middot;奈特称,一般的雪花大约由10^19个水分子...【详细内容】
2021-11-17  Tags: 算法  点击:(24)  评论:(0)  加入收藏
基于算法的业务或者说AI的应用在这几年发展得很快。但是,在实际应用的场景中,我们经常会遇到一些非常奇怪的偏差现象。例如,Facebook将黑人标记为灵长类动物、城市图像识别系统...【详细内容】
2021-11-08  Tags: 算法  点击:(32)  评论:(0)  加入收藏
随着注册制的加速推进,新股越来越多,截止到今天A股上市公司的总数高达4500余家,A股一直就是重融资,轻投资的市场,而上市公司发行可转债这种再融资的(圈钱方式)是最能让普通投资者接...【详细内容】
2021-11-05  Tags: 算法  点击:(97)  评论:(0)  加入收藏
导读:在大数据时代,对复杂数据结构中的各数据项进行有效的排序和查找的能力非常重要,因为很多现代算法都需要用到它。在为数据恰当选择排序和查找策略时,需要根据数据的规模和类型进行判断。尽管不同策略最终得到的结果完...【详细内容】
2021-11-04  Tags: 算法  点击:(37)  评论:(0)  加入收藏
这是我在网上找的资源的一个总结,会先给出一个我看了觉得还行的关于算法的讲解,再配上实现的代码: Original author: Bill_Hoo Original Address: http://blog.sina.com.cn/s/bl...【详细内容】
2021-11-04  Tags: 算法  点击:(36)  评论:(0)  加入收藏
每个人都有过这样的经历:打开手机准备回消息或打电话,一看到微信图标右上方的小红点,于是忍不住先打开微信;看完微信,不知不觉又被另一个App牵引,直到关闭手机屏幕才发现自己早已...【详细内容】
2021-11-03  Tags: 算法  点击:(30)  评论:(0)  加入收藏
文丨互联网怪盗团在互联网行业,尤其是在投资人心目中,往往存在一种“算法迷信”或曰“技术迷信”:某公司的广告变现做得好,一定是因为有算法;某公司的云计算业务开展的好,也是因为...【详细内容】
2021-11-03  Tags: 算法  点击:(25)  评论:(0)  加入收藏
▌简易百科推荐
前言Kafka 中有很多延时操作,比如对于耗时的网络请求(比如 Produce 是等待 ISR 副本复制成功)会被封装成 DelayOperation 进行延迟处理操作,防止阻塞 Kafka请求处理线程。Kafka...【详细内容】
2021-12-27  Java技术那些事    Tags:时间轮   点击:(1)  评论:(0)  加入收藏
博雯 发自 凹非寺量子位 报道 | 公众号 QbitAI在炼丹过程中,为了减少训练所需资源,MLer有时会将大型复杂的大模型“蒸馏”为较小的模型,同时还要保证与压缩前相当的结果。这就...【详细内容】
2021-12-24  量子位    Tags:蒸馏法   点击:(9)  评论:(0)  加入收藏
分稀疏重建和稠密重建两类:稀疏重建:使用RGB相机SLAMOrb-slam,Orb-slam2,orb-slam3:工程地址在: http://webdiis.unizar.es/~raulmur/orbslam/ DSO(Direct Sparse Odometry)因为...【详细内容】
2021-12-23  老师明明可以靠颜值    Tags:算法   点击:(7)  评论:(0)  加入收藏
1. 基本概念希尔排序又叫递减增量排序算法,它是在直接插入排序算法的基础上进行改进而来的,综合来说它的效率肯定是要高于直接插入排序算法的;希尔排序是一种不稳定的排序算法...【详细内容】
2021-12-22  青石野草    Tags:希尔排序   点击:(6)  评论:(0)  加入收藏
ROP是一种技巧,我们对execve函数进行拼凑来进行system /bin/sh。栈迁移的特征是溢出0x10个字符,在本次getshell中,还碰到了如何利用printf函数来进行canary的泄露。ROP+栈迁移...【详细内容】
2021-12-15  星云博创    Tags:栈迁移   点击:(19)  评论:(0)  加入收藏
一、什么是冒泡排序1.1、文字描述冒泡排序是一种简单的排序算法。它重复地走访要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地...【详细内容】
2021-12-15    晓掌柜丶韶华  Tags:排序算法   点击:(16)  评论:(0)  加入收藏
在了解golang的map之前,我们需要了解哈希这个概念。哈希表,又称散列表(Hash table),是根据键(key)而直接访问在内存储存位置的数据结构。也就是说,它通过计算出一个键值的函数,将...【详细内容】
2021-12-07  一棵梧桐木    Tags:哈希表   点击:(13)  评论:(0)  加入收藏
前面文章在谈论分布式唯一ID生成的时候,有提到雪花算法,这一次,我们详细点讲解,只讲它。SnowFlake算法据国家大气研究中心的查尔斯&middot;奈特称,一般的雪花大约由10^19个水分子...【详细内容】
2021-11-17  小心程序猿QAQ    Tags:雪花算法   点击:(24)  评论:(0)  加入收藏
导读:在大数据时代,对复杂数据结构中的各数据项进行有效的排序和查找的能力非常重要,因为很多现代算法都需要用到它。在为数据恰当选择排序和查找策略时,需要根据数据的规模和类型进行判断。尽管不同策略最终得到的结果完...【详细内容】
2021-11-04  华章科技    Tags:排序算法   点击:(37)  评论:(0)  加入收藏
这是我在网上找的资源的一个总结,会先给出一个我看了觉得还行的关于算法的讲解,再配上实现的代码: Original author: Bill_Hoo Original Address: http://blog.sina.com.cn/s/bl...【详细内容】
2021-11-04  有AI野心的电工和码农    Tags: KMP算法   点击:(36)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条