您当前的位置:首页 > 电脑百科 > 程序开发 > 容器

Kafka的Message格式

时间:2023-06-28 14:41:35  来源:  作者:尚硅谷教育

消息引擎的核心职责就是将生产者生产的消息传输到消费者,设计消息格式是各大消息引擎框架的关键问题,因为消息格式决定了消息引擎的性能和效率。本文带大家探究消息引擎kafka当前所用的message格式是什么。

一、Kafka message format

kafka从0.11.0版本开始所使用的消息格式版本为v2,参考了 Protocol Buffer而引入了变长整型(Varints)和 ZigZag 编码。Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间, 以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。

kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要一个字节。根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话, 绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。不过需要注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节), 一个int64最长会占用10字节(大于默认的8字节)。

因为Kafka的message经历过几次的版本迭代更改,本文以v2版本为例讲述。

二、Record Batch

在Kafka中,数据是按照topic和partition的方式进行组织和存储的。每个partition的数据被分成一个或多个segment文件,并且每个segment文件包含若干个Record Batch。因此,Record Batch也是Kafka中重要的数据结构之一。

在Kafka中,Record Batch指的是一组相关的消息集合,它们具有相同的key、value类型和所属的topic和partition。每个Record Batch包含若干条消息(Record),并且这些消息被顺序地写入到磁盘中,以提高读取效率。

具体而言,Record Batch由以下几部分构成:

Record Batch Header:包含了当前Batch的元数据,如Magic Code、Batch Size、First Offset等信息。

Record Header:每个Record都附带有一个Header,用于描述该Record的元数据信息,例如时间戳、压缩类型、CRC校验值等。

Record Body:记录具体的消息内容,包括Key、Value等字段。

需要注意的是,Kafka的Record Batch通常具有比较大的体积(默认大小为16KB),因此可以将多个相关的消息打包在一起进行传输和处理,从而提高了消息的传输效率和吞吐量。另外,Kafka还支持对Record Batch进行压缩和批量操作,以进一步提高数据的传输效率和性能。

总的来说,Record Batch是Kafka中定义的一个重要数据结构,用于管理和组织消息,提高消息的读写效率和传输性能。

baseoffset: int64 标识当前的batch的起始偏移量

batchLength: int32 该batch的长度

partitionLeaderEpoch: int32 确保数据可靠性

magic: int8 魔法数字,当前为2,也即当前的message版本为v2版本

crc: int32 crc校验

attributes: int16 消息属性

bit 0~2: 是否压缩和压缩的格式

0: no compression

1: gzip

2: snAppy

3: lz4

4: zstd

bit 3: timestampType

bit 4: isTransactional (0 means not transactional)

bit 5: isControlBatch (0 means not a control batch)

bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)

bit 7~15: unused

lastOffsetDelta: int32 RecordBatch中最后一个Record的offset与first offset的差值

baseTimestamp: int64 第一条时间戳

maxTimestamp: int64 最大的时间戳,保证消息组装时的正确性

producerId: int64 支持幂等性

producerEpoch: int16 支持幂等性

baseSequence: int32 支持幂等性,消息序号

records: [Record] Record个数

用以下图表示 V2 版本消息批次的样子:

三、Record

在Kafka中,Record Batch和Record是两种不同的数据结构,但它们之间存在着紧密的关系。

Record是指Kafka中的一条消息,通常由Key、Value、Timestamp等字段组成。而Record Batch是指将多个相关的Record打包在一起进行传输和处理的数据结构,每个Record Batch通常包含若干条记录,并且这些记录具有相同的key、value类型和所属的topic和partition。

具体来说,每个Record Batch中的Record都被依次存储在一个连续的二进制数据块中,每个Record包含自己的Header和Body部分。而Record Batch则包含了当前Batch的元数据信息和所有记录的元数据信息,如Batch Size、First Offset、Last Offset、CRC校验值等。

消息格式如下所示:

#消息长度

length: varint

#消息属性

attributes: int8

# 时间戳增量

bit 0~7: unusedtimestampDelta: varlong

#偏移量增量

offsetDelta: varint

#key长度

keyLength: varint

#key值

key: byte[]

#value长度

valueLen: varint

#value值

value: byte[]

#header信息

Headers => [Header]

Record信息通过如下方式封装

public static int writeTo(DataOutputStream out,

int offsetDelta,

long timestampDelta,

ByteBuffer key,

ByteBuffer value,

Header[] headers) throws IOException {

// 消息总数

int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);

ByteUtils.writeVarint(sizeInBytes, out);

// 属性

byte attributes = 0; // there are no used record attributes at the moment

out.write(attributes);

// 时间增量

ByteUtils.writeVarlong(timestampDelta, out);

// 位移增量

ByteUtils.writeVarint(offsetDelta, out);

// key

if (key == null) {

ByteUtils.writeVarint(-1, out);

} else {

int keySize = key.remAIning();

// key size

ByteUtils.writeVarint(keySize, out);

// key

Utils.writeTo(out, key, keySize);

}

// Value

if (value == null) {

ByteUtils.writeVarint(-1, out);

} else {

int valueSize = value.remaining();

// value size

ByteUtils.writeVarint(valueSize, out);

// value

Utils.writeTo(out, value, valueSize);

}

// header

ByteUtils.writeVarint(headers.length, out);

for (Header header : headers) {

// header key

String headerKey = header.key();

byte[] utf8Bytes = Utils.utf8(headerKey);

// header key 长度

ByteUtils.writeVarint(utf8Bytes.length, out);

// header key 值

out.write(utf8Bytes);

// header value

byte[] headerValue = header.value();

if (headerValue == null) {

ByteUtils.writeVarint(-1, out);

} else {

// header value 长度

ByteUtils.writeVarint(headerValue.length, out);

// header value 值

out.write(headerValue);

}

}

return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;

}

根据以上代码逻辑,用以下图表示 V2 版本消息格式的样子:

四、总结

message(又称record)总是分批写入的。一批消息的技术术语是一个record batch:

 

  • 一个record batch包含一个或多个record。
  • 在退化的情况下,我们可以有一个包含单个record的record batch。
  • record batch和record有它们自己的headers。


Tags:Kafka   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  Search: Kafka  点击:(86)  评论:(0)  加入收藏
如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
译者 | 李睿审校 | 重楼在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时...【详细内容】
2024-01-26  Search: Kafka  点击:(47)  评论:(0)  加入收藏
深入浅出Kafka:高可用、顺序消费及幂等性
在我们旅行于数据海洋的途中,如果把 Kafka 比作是一艘承载无数信息航行的快船,前文《Kafka实战漫谈:大数据领域的不败王者》已经讲述了如何搭建起这艘快船,让它在起风的早晨开始...【详细内容】
2023-12-18  Search: Kafka  点击:(176)  评论:(0)  加入收藏
7k Star,一款开源的 Kafka 管理平台,功能齐全、页面美观!
Apache Kafka UI 是一个免费的开源 Web UI,用于监控和管理 Apache Kafka 集群,可方便地查看 Kafka Brokers、Topics、消息、Consumer 等情况,支持多集群管理、性能监控、访问控...【详细内容】
2023-12-15  Search: Kafka  点击:(133)  评论:(0)  加入收藏
利用Apache Kafka、Flink和Druid构建实时数据架构
译者 | 陈峻审校 | 重楼如今,对于使用批处理工作流程的数据团队而言,要满足业务的实时要求并非易事。从数据的交付、处理到分析,整个批处理工作流往往需要大量的等待,其中包括:等...【详细内容】
2023-12-11  Search: Kafka  点击:(232)  评论:(0)  加入收藏
运维兄弟!Kafka怎么又"超时"了?
现象凌晨,当运维刚躺下,就被业务研发的电话叫醒,"哥们!kafka服务又异常了?影响到业务了,快看看",业务研发给出的异常日志如下:基本分析 集群检查:立即确认kafka集群以及涉及到topic健...【详细内容】
2023-12-07  Search: Kafka  点击:(138)  评论:(0)  加入收藏
图解Kafka适用场景,全网最全!
消息系统消息系统被用于各种场景,如解耦数据生产者,缓存未处理的消息。Kafka 可作为传统的消息系统的替代者,与传统消息系统相比,kafka有更好的吞吐量、更好的可用性,这有利于处...【详细内容】
2023-11-29  Search: Kafka  点击:(183)  评论:(0)  加入收藏
Kafka有哪些应用场景?你能说上来几个?
下面我们来总结一下Kafka的一些应用场景:1、日志处理与分析(最常用的场景)下图显示了典型的 ELK(Elastic-Logstash-Kibana)堆栈。Kafka 有效地从每个实例收集日志流。ElasticSe...【详细内容】
2023-11-28  Search: Kafka  点击:(165)  评论:(0)  加入收藏
Kafka:解锁大数据时代的搜索与分析
在当今大数据时代,数据湖作为一种新兴的数据存储和分析解决方案,正受到越来越多企业的青睐。而作为一种高性能、可扩展的事件流平台,Kafka在数据湖领域发挥着重要的作用。本文...【详细内容】
2023-11-24  Search: Kafka  点击:(287)  评论:(0)  加入收藏
解密Kafka主题的分区策略:提升实时数据处理的关键
Kafka几乎是当今时代背景下数据管道的首选,无论你是做后端开发、还是大数据开发,对它可能都不陌生。开源软件Kafka的应用越来越广泛。面对Kafka的普及和学习热潮,哪吒想分享一...【详细内容】
2023-11-21  Search: Kafka  点击:(181)  评论:(0)  加入收藏
▌简易百科推荐
Docker 和传统虚拟机有什么区别?
我有一个程序员朋友,他每年情人节都要送女朋友一台服务器。他说:“谁不想在过节当天收到一台 4核8g 的服务器呢?”“万一对方不要,我还能留着自己用。” 给他一次过节的机会,他能...【详细内容】
2024-03-26  小白debug  微信公众号  Tags:Docker   点击:(12)  评论:(0)  加入收藏
掌握Docker网络驱动程序:优化容器通信
Docker为在容器内包装、交付和运行应用程序提供了一个强大的平台,从而彻底改变了容器化。网络是容器化的重要组成部分,Docker提供了各种网络驱动程序来支持容器之间的通信以...【详细内容】
2024-03-22    51CTO  Tags:Docker   点击:(11)  评论:(0)  加入收藏
Containerd容器管理
Nginx 指定容器名称 使用 ctr container create 命令创建容器后,容器并没有处于运行状态,其只是一个静态的容器。容器基本操作容器基本操作主要是 ctr image 命令,查看命令帮...【详细内容】
2024-03-20  云原生运维圈  微信公众号  Tags:容器   点击:(13)  评论:(0)  加入收藏
如何基于Docker镜像逆向生成Dockerfile
引言你是否曾经遇到过一个想要使用的 Docker 镜像,但却无法修改以适应你的特定需求?或者你可能发现了一个喜欢的 Docker 镜像,但想要了解它是如何构建的?在这两种情况下,将 Docke...【详细内容】
2024-03-07  云原生运维圈  微信公众号  Tags:Docker   点击:(23)  评论:(0)  加入收藏
Kubernetes是什么?主要特点是什么?
Kubernetes是什么?Kubernetes,也称为K8s,是一个开源的容器编排系统,由Google首次开发和维护。它允许容器化的应用程序在集群中自动部署、扩展和管理。Kubernetes提供了一种容器...【详细内容】
2024-02-01    简易百科  Tags:Kubernetes   点击:(159)  评论:(0)  加入收藏
我们一起聊聊容器资源自愈
在企业实际在使用容器这类资源的时候,除了技术本身,要考虑的其他问题也会很多。企业管理的容器有千千万万,出于效率考虑,对于有特殊需求的容器如何进行批量创建和管理呢,这就需要...【详细内容】
2024-01-30  匠心独运维妙维效  微信公众号  Tags:容器   点击:(47)  评论:(0)  加入收藏
Docker与Docker Compose入门:释放你应用部署的威力
今天给大家介绍一项强大而有趣的技能,那就是使用 Docker 和 Docker Compose 来释放你的应用部署的威力!无论你是一名开发人员还是系统管理员,掌握这个技能都将为你的工作带来巨...【详细内容】
2024-01-17  waynblog  微信公众号  Tags:Docker   点击:(66)  评论:(0)  加入收藏
Docker镜像与容器的交互及在容器内部执行代码的原理与实践
Docker作为一种流行的容器技术,已经成为现代应用程序开发和部署的重要工具。在Docker中,镜像是构建和运行容器的基础,而容器则是基于镜像创建的可执行实例。Docker镜像与容器的...【详细内容】
2024-01-10  编程技术汇  今日头条  Tags:Docker   点击:(77)  评论:(0)  加入收藏
如何在 Ubuntu 上安装 Docker
使用 Docker 意味着开启一个新的计算领域,但如果你刚刚开始使用 Docker,安装可能看起来是一项艰巨的任务。在 Ubuntu 上安装 Docker 有两种推荐的方法: 从 Ubuntu 的仓库安装 D...【详细内容】
2024-01-04    Linux中国  Tags:Docker   点击:(124)  评论:(0)  加入收藏
从Kubernetes的探针到DevOps
今天在群里又看有人问如何设置 Kubernetes 的探针,感觉要补充的话太多了,结合我们在一些 DevOps 项目中痛苦的体验,今天一劳永逸的全部说完,此外,也为大家展现一下为什么 DevOps...【详细内容】
2023-12-27  云云众生s  微信公众号  Tags:Kubernetes   点击:(116)  评论:(0)  加入收藏
站内最新
站内热门
站内头条