译者 | 李睿
审校 | 重楼
在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时的数据。
然而,构建和运行任务关键型实时数据管道具有挑战性。基础设施必须具有容错性、无限可扩展性,并与各种数据源和应用程序集成。这就是ApacheKafka、Python/ target=_blank class=infotextkey>Python和云平台的用武之地。
这个综合指南中将介绍:
这里将包括大量的代码片段、配置示例和文档链接,以便获得这些非常有用的技术的实践经验。
Apache Kafka是一个分布式、分区、复制的提交日志,用于可靠且大规模地存储数据流。Apache Kafka的核心是提供以下功能:
Kafka架构由以下主要组件组成:
消息被发布到名为“主题”的类别中。每个主题都充当消息提要或消息队列。常见的场景是每个消息类型或数据流的一个主题。Kafka主题中的每条消息都有一个唯一的标识符,称为偏移量,它代表了在主题中的位置。一个主题可以分为多个分区,这些分区是可以存储在不同代理上的主题片段。分区允许Kafka通过在多个消费者之间分配负载来扩展和并行化数据处理。
生产者是向Kafka主题发布消息的应用程序。它们连接到Kafka集群,序列化数据(例如JSON或Avro),分配一个密钥,并将其发送到适当的主题。
例如,一个Web应用程序可以产生点击流事件,或者一个移动应用程序可以产生使用统计。
消费者从Kafka主题中读取消息并进行处理。处理可能涉及解析数据、验证、聚合、过滤、存储到数据库等。
消费者连接到Kafka集群,并订阅一个或多个主题来获取消息提要,然后根据用例需求进行处理。
这是一个Kafka服务器,它接收来自生产者的消息,分配偏移量,将消息提交到存储中,并将数据提供给消费者。Kafka集群由多个代理组成,以实现可扩展性和容错性。
ZooKeeper处理代理之间的协调和共识,例如控制器选举和主题配置。它维护Kafka操作所需的集群状态和配置信息。
这涵盖了Kafka的基础知识。要深入了解,可以参考一些Kafka文档。
以下了解如何通过在云中运行Kafka来简化管理。
虽然Kafka具有高度可扩展性和可靠性,但它的运行涉及部署、基础设施管理、监控、安全、故障处理、升级等方面的大量工作。
值得庆幸的是,Kafka现在是所有主要云计算提供商提供的完全托管服务:
服务 |
描述 |
定价 |
AWS MSK |
在AWS上完全托管、高可用的Apache Kafka集群。处理基础设施,扩展,安全,故障处理等。 |
基于代理的数量 |
google Cloud Pub/Sub |
基于Kafka的无服务器实时消息服务。自动扩展,至少一次交付保证。 |
基于使用指标 |
Confluent Cloud |
完全管理的事件流平台,由Apache Kafka提供支持。提供免费层。 |
基于功能的分层定价 |
Azure Event Hubs |
Apache Kafka的高吞吐量事件摄取服务。与Azure数据服务的集成。 |
基于吞吐量单位 |
托管服务抽象了Kafka操作的复杂性,可以让用户专注数据管道。
接下来,将使用Python、Kafka和云平台构建一个实时管道。也可以参考以下的指南作为另一个示例。
Kafka的基本实时管道有两个主要组件:向Kafka发布消息的生产者和订阅主题并处理消息的消费者。
其架构遵循以下流程:
为了进行简化,将使用Confluent Kafka Python客户端库。
生产者应用程序从数据源收集数据并将其发布到Kafka主题。作为一个例子,假设有一个Python服务从一个Web应用程序收集用户点击流事件。
在Web应用程序中,当用户的行为像是页面浏览或产品评级时,可以捕获这些事件并将它们发送给Kafka。
可以抽象出Web应用程序如何收集数据的实现细节。
Python
from confluent_kafka import Producer
import json
# User event data
event = {
"timestamp": "2022-01-01T12:22:25",
"userid": "user123",
"page": "/product123",
"action": "view"
}
# Convert to JSON
event_json = json.dumps(event)
# Kafka producer configuration
conf = {
'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'client.id': 'clickstream-producer'
}
# Create producer instance
producer = Producer(conf)
# Publish event
producer.produce(topic='clickstream', value=event_json)
# Flush and close producer
producer.flush()
producer.close()
这将事件发布到云托管Kafka集群上的clickstream主题。
Confluent_Kafka Python客户端在将消息发送到Kafka之前使用内部缓冲区来批处理消息。与单独发送每条消息相比,这提高了效率。
在默认情况下,消息会在缓冲区中累积,直到:
(1)已达到缓冲区大小限制(默认为32MB)。
(2)调用flush()方法。
当调用flush()时,缓冲区中的任何消息都会立即发送到Kafka代理。
如果不调用flush(),而是依赖于缓冲区大小限制,那么在下一次自动刷新之前,如果发生故障,就有丢失事件的风险。调用flush()能够更好地控制最小化潜在的消息丢失。
但是,在每次生产后调用flush()会带来额外的开销。找到合适的缓冲配置取决于特定的可靠性需求和吞吐量需求。
可以在事件发生时不断添加事件来构建实时流。这为下游数据消费者提供了连续的事件提要。
接下来,有一个消费者应用程序来从Kafka摄取事件并处理它们。
例如,可能想要解析事件,筛选特定的子类型,并验证模式。
Python
from confluent_kafka import Consumer
import json
# Kafka consumer configuration
conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
'group.id': 'clickstream-processor',
'auto.offset.reset': 'earliest'}
# Create consumer instance
consumer = Consumer(conf)
# Subscribe to 'clickstream' topic
consumer.subscribe(['clickstream'])
# Poll Kafka for messages infinitely
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
# Parse JSON from message value
event = json.loads(msg.value())
# Process event based on business logic
if event['action'] == 'view':
print('User viewed product page')
elif event['action'] == 'rating':
# Validate rating, insert to DB etc
pass
print(event) # Print event
# Close consumer
consumer.close()
这个轮询clickstream主题以获取新消息,使用它们,并根据事件类型采取行动——打印、更新数据库等。
对于一个简单的管道来说,这很有效。但如果每秒事件数增加100倍呢?消费者将无法跟上其增长。这就是像PySpark这样的工具可以帮助扩展处理的地方。
PySpark为Apache Spark提供了一个Python API,Apache Spark是一个为大规模数据处理优化的分布式计算框架。
使用PySpark,可以利用Spark的内存计算和并行执行来更快地使用Kafka流。
首先,将Kafka数据加载到DataFrame中,DataFrame可以使用Spark SQL或Python进行操作。
Python
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.AppName('clickstream-consumer')
.getOrCreate()
# Read stream from Kafka 'clickstream'
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "clickstream")
.load()
# Parse JSON from value
df = df.selectExpr("CAST(value AS STRING)")
df = df.select(from_json(col("value"), schema).alias("data"))
Next, we can express whatever processing logic we need using DataFrame transformations:
from pyspark.sql.functions import *
# Filter for 'page view' events
views = df.filter(col("data.action") == "view")
# Count views per page URL
counts = views.groupBy(col("data.page"))
.count()
.orderBy("count")
# Print the stream
query = counts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awAItTermination()
它利用Spark的分布式运行时,在数据流上实时应用过滤、聚合和排序等操作。
还可以使用多个消费者组并行化消费,并将输出接收器写入数据库、云存储等。
这允许在Kafka的数据上构建可扩展的流处理。
现在已经介绍了端到端管道,以下了解应用它的一些实际用例。
以下探索一些实际用例,在这些用例中,这些技术可以帮助大规模地处理大量实时数据。
许多现代网络和移动应用程序跟踪用户的行为,例如页面浏览量、按钮点击、交易等,以收集使用情况分析。
(1)问题
(2)解决方案
物联网传感器产生大量的实时遥测数据,例如温度、压力、位置等。
(1)问题
(2)解决方案
像Zendesk这样的聊天平台捕获了大量的客户支持对话。
(1)问题
(2)解决方案
这个用例演示了如何将这些技术应用于涉及大量快速移动数据的实际业务问题。
综上所述, Python、Kafka和云平台为构建健壮的、可扩展的实时数据管道提供了一个很好的组合。
原文标题:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii Mitiaev