本文介绍PacketQueue,相对于FrameQueue来说比较简单,可以类比Android中的MessageQueue。
typedef struct MyAVPacketList {
AVPacket pkt;
struct MyAVPacketList *next;
int serial;
} MyAVPacketList;
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;
int nb_packets;
int size;
int64_t duration;
int abort_request;
int serial;
SDL_mutex *mutex;
SDL_cond *cond;
MyAVPacketList *recycle_pkt;
int recycle_count;
int alloc_count;
int is_buffer_indicator;
SDL_Profiler videoBufferProfiler;
SDL_Profiler audioBufferProfiler;
void *ffp;
} PacketQueue;
packet_queue_init:初始化;
packet_queue_start:启动队列,设置abort_request为0,先放一个flush_pkt;
packet_queue_put:存入一个节点,;
packet_queue_put_nullpacket:存入一个空节点;
packet_queue_put_private:存入一个节点,后唤醒packet_queue_get等待锁;
packet_queue_get:获取一个节点;
packet_queue_get_or_buffering:去缓冲等待水位后获取一个节点;
packet_queue_abort:中止,设置abort_request=1后唤醒packet_queue_get等待锁;
packet_queue_flush:清除队列内所有的节点;
packet_queue_destroy:销毁;
初始化
static int packet_queue_init(PacketQueue *q) {
memset(q, 0, sizeof(PacketQueue));
q->mutex = SDL_CreateMutex();
q->cond = SDL_CreateCond();
q->abort_request = 1;
return 0;
}
static void packet_queue_start(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);
SDL_UnlockMutex(q->mutex);
}
put操作
/*
* 存入null结点,eof和error时候存入,表示流结束
*/
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index) {
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {
int ret;
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);
return ret;
}
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {
if (q->abort_request) {
return -1;
}
// 如果有已经回收的就复用该回收的结点,没有就申请一个;
MyAVPacketList *pkt1 = q->recycle_pkt;
if (pkt1) {
q->recycle_pkt = pkt1->next; // 移动到下一个
q->recycle_count++;
} else {
q->alloc_count++;
pkt1 = av_malloc(sizeof(MyAVPacketList));
}
if (!pkt1) {
return -1;
}
pkt1->pkt = *pkt;
pkt1->next = NULL;
// 遇到flush_pkt就升级serial序列号,标志刚开始或进行了seek
if (pkt == &flush_pkt) {
q->serial++;
}
pkt1->serial = q->serial;
// 赋值first_pkt和last_pkt,定义链表的起点和终点;
if (!q->last_pkt) { // 条件判断同 !q->first_pkt
q->first_pkt = pkt1;
} else {
q->last_pkt->next = pkt1;
}
q->last_pkt = pkt1;
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
/* XXX: should duplicate packet data in DV case */
SDL_CondSignal(q->cond);
return 0;
}
get操作
/*
* block: 是否阻塞
* 返回1表示获取到了,返回值<=0表示没获取到
*/
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial) {
MyAVPacketList *pkt1;
int ret;
// 加锁
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
pkt1 = q->first_pkt;
if (pkt1) {
q->first_pkt = pkt1->next;
if (!q->first_pkt) {
// 说明只有一个结点
q->last_pkt = NULL;
}
q->nb_packets--;
q->size -= pkt1->pkt.size + sizeof(*pkt1);
q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
*pkt = pkt1->pkt;
if (serial) {
*serial = pkt1->serial;
}
// 把pkt1持有的pkt给出去后进行回收,放到recycle_pkt链表头部
pkt1->next = q->recycle_pkt;
q->recycle_pkt = pkt1;
ret = 1;
break;
} else if (!block) {
ret = 0;
break;
} else {
// wait阻塞,等待put唤醒
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
/*
* 阻塞等待直到退出或者有AVPacket数据
* >= 0 即取到值;
*/
static int packet_queue_get_or_buffering(FFPlayer *ffp, PacketQueue *q, AVPacket *pkt, int *serial,
int *finished) {
if (!ffp->packet_buffering)
return packet_queue_get(q, pkt, 1, serial); // queue为空时会阻塞等待
while (1) {
int new_packet = packet_queue_get(q, pkt, 0, serial); // 非阻塞,直接返回
if (new_packet < 0) {
// abort_request了
return -1;
} else if (new_packet == 0) {
// 队列为空,去缓冲
if (q->is_buffer_indicator && !*finished) {
ffp_toggle_buffering(ffp, 1);
}
// 再阻塞获取,等待水位填充满
new_packet = packet_queue_get(q, pkt, 1, serial);
if (new_packet < 0) {
// abort_request了
return -1;
}
}
if (*finished == *serial) {
av_packet_unref(pkt);
continue;
} else {
break;
}
}
return 1;
}
重置、销毁操作
// stream_close时第一个调用它,主要是置abort_request为1,阻断后续所有流程
static void packet_queue_abort(PacketQueue *q) {
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);
SDL_UnlockMutex(q->mutex);
}
// seek或destory时调用
static void packet_queue_flush(PacketQueue *q) {
SDL_LockMutex(q->mutex);
// 释放所有pkt
MyAVPacketList *pkt, *pkt1;
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
// 回收,放到链表头部
pkt->next = q->recycle_pkt;
q->recycle_pkt = pkt;
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
// 清空所有pkt,包括recycle_pkt,stream_close处调用
static void packet_queue_destroy(PacketQueue *q) {
packet_queue_flush(q);
SDL_LockMutex(q->mutex);
while (q->recycle_pkt) {
MyAVPacketList *pkt = q->recycle_pkt;
if (pkt)
q->recycle_pkt = pkt->next;
av_freep(&pkt);
}
SDL_UnlockMutex(q->mutex);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}