消息引擎的核心职责就是将生产者生产的消息传输到消费者,设计消息格式是各大消息引擎框架的关键问题,因为消息格式决定了消息引擎的性能和效率。本文带大家探究消息引擎kafka当前所用的message格式是什么。
一、Kafka message format
kafka从0.11.0版本开始所使用的消息格式版本为v2,参考了 Protocol Buffer而引入了变长整型(Varints)和 ZigZag 编码。Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间, 以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。
kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要一个字节。根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话, 绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。不过需要注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节), 一个int64最长会占用10字节(大于默认的8字节)。
因为Kafka的message经历过几次的版本迭代更改,本文以v2版本为例讲述。
二、Record Batch
在Kafka中,数据是按照topic和partition的方式进行组织和存储的。每个partition的数据被分成一个或多个segment文件,并且每个segment文件包含若干个Record Batch。因此,Record Batch也是Kafka中重要的数据结构之一。
在Kafka中,Record Batch指的是一组相关的消息集合,它们具有相同的key、value类型和所属的topic和partition。每个Record Batch包含若干条消息(Record),并且这些消息被顺序地写入到磁盘中,以提高读取效率。
具体而言,Record Batch由以下几部分构成:
Record Batch Header:包含了当前Batch的元数据,如Magic Code、Batch Size、First Offset等信息。
Record Header:每个Record都附带有一个Header,用于描述该Record的元数据信息,例如时间戳、压缩类型、CRC校验值等。
Record Body:记录具体的消息内容,包括Key、Value等字段。
需要注意的是,Kafka的Record Batch通常具有比较大的体积(默认大小为16KB),因此可以将多个相关的消息打包在一起进行传输和处理,从而提高了消息的传输效率和吞吐量。另外,Kafka还支持对Record Batch进行压缩和批量操作,以进一步提高数据的传输效率和性能。
总的来说,Record Batch是Kafka中定义的一个重要数据结构,用于管理和组织消息,提高消息的读写效率和传输性能。
baseoffset: int64 标识当前的batch的起始偏移量
batchLength: int32 该batch的长度
partitionLeaderEpoch: int32 确保数据可靠性
magic: int8 魔法数字,当前为2,也即当前的message版本为v2版本
crc: int32 crc校验
attributes: int16 消息属性
bit 0~2: 是否压缩和压缩的格式
0: no compression
1: gzip
2: snAppy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32 RecordBatch中最后一个Record的offset与first offset的差值
baseTimestamp: int64 第一条时间戳
maxTimestamp: int64 最大的时间戳,保证消息组装时的正确性
producerId: int64 支持幂等性
producerEpoch: int16 支持幂等性
baseSequence: int32 支持幂等性,消息序号
records: [Record] Record个数
用以下图表示 V2 版本消息批次的样子:
三、Record
在Kafka中,Record Batch和Record是两种不同的数据结构,但它们之间存在着紧密的关系。
Record是指Kafka中的一条消息,通常由Key、Value、Timestamp等字段组成。而Record Batch是指将多个相关的Record打包在一起进行传输和处理的数据结构,每个Record Batch通常包含若干条记录,并且这些记录具有相同的key、value类型和所属的topic和partition。
具体来说,每个Record Batch中的Record都被依次存储在一个连续的二进制数据块中,每个Record包含自己的Header和Body部分。而Record Batch则包含了当前Batch的元数据信息和所有记录的元数据信息,如Batch Size、First Offset、Last Offset、CRC校验值等。
消息格式如下所示:
#消息长度
length: varint
#消息属性
attributes: int8
# 时间戳增量
bit 0~7: unusedtimestampDelta: varlong
#偏移量增量
offsetDelta: varint
#key长度
keyLength: varint
#key值
key: byte[]
#value长度
valueLen: varint
#value值
value: byte[]
#header信息
Headers => [Header]
Record信息通过如下方式封装
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
// 消息总数
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
// 属性
byte attributes = 0; // there are no used record attributes at the moment
out.write(attributes);
// 时间增量
ByteUtils.writeVarlong(timestampDelta, out);
// 位移增量
ByteUtils.writeVarint(offsetDelta, out);
// key
if (key == null) {
ByteUtils.writeVarint(-1, out);
} else {
int keySize = key.remAIning();
// key size
ByteUtils.writeVarint(keySize, out);
// key
Utils.writeTo(out, key, keySize);
}
// Value
if (value == null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
// value size
ByteUtils.writeVarint(valueSize, out);
// value
Utils.writeTo(out, value, valueSize);
}
// header
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) {
// header key
String headerKey = header.key();
byte[] utf8Bytes = Utils.utf8(headerKey);
// header key 长度
ByteUtils.writeVarint(utf8Bytes.length, out);
// header key 值
out.write(utf8Bytes);
// header value
byte[] headerValue = header.value();
if (headerValue == null) {
ByteUtils.writeVarint(-1, out);
} else {
// header value 长度
ByteUtils.writeVarint(headerValue.length, out);
// header value 值
out.write(headerValue);
}
}
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
根据以上代码逻辑,用以下图表示 V2 版本消息格式的样子:
四、总结
message(又称record)总是分批写入的。一批消息的技术术语是一个record batch: