Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 增加消费者数量
props.put("max.poll.records", 500); // 每次拉取的最大记录数
props.put("max.partition.fetch.bytes", 1048576); // 每次拉取的最大字节数
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
}
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在重新分配分区之前,进行一些清理工作
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 在分配新的分区之后,进行一些初始化工作
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<SomeRecord> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
SomeRecord processedRecord = processRecord(record);
batch.add(processedRecord);
if (batch.size() >= 100) {
// 批量处理消息
saveBatchToDatabase(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
// 处理剩余的消息
saveBatchToDatabase(batch);
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!isMessageProcessed(messageId)) {
// 处理消息
processRecord(record);
// 标记消息为已处理
markMessageAsProcessed(messageId);
}
}
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 设置事务ID
props.put("transactional.id", "kafka-transactional-id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
consumer.beginTransaction();
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
processRecord(record);
}
consumer.commitTransaction();
} catch (Exception e) {
consumer.abortTransaction();
}
Set<String> processedMessages = new HashSet<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!processedMessages.contAIns(messageId)) {
// 处理消息
processRecord(record);
// 添加到已处理消息集合
processedMessages.add(messageId);
}
}
}
针对数据积压和数据重复问题的解决方案需要根据具体的业务需求和系统情况进行调整和优化。此外,监控和度量系统也是非常重要的,可以帮助及时发现和解决数据积压和重复问题。