如果你用谷歌或者百度进行搜索就会发现,当你在这些搜索引擎的框内键入某些内容时,它们可以根据输入的内容智能展现输入提示建议。本文作者正是带着这样的想法实现了一个具备类似功能的系统。
本文将展现如何设计一个大规模的自动完成输入提示建议的系统,就像 google 搜索一样,整个设计是用 Docker Compose 实现的,可以在这里找到源代码:https://github.com/lopespm/autocomplete
最终的系统需要适应类似 Google 的搜索规模,即每天约 50 亿次搜索,也就是每秒钟约 5.8 万次查询。我们可以预期这些搜索中有 20% ,也就是每天有 10 亿次查询。
如果我们选择为这 10 亿条查询建立索引的话,平均每个查询有 15 个字符【2】,每个字符有 2 个字节(我们将只支持英语设置),这反映在托管这些查询所需的存储空间大约为 30 GB。
主要的两个 API 是:
两个主要的子系统是:
这个实现使用了现成的组件,如 Kafka(消息代理)、Hadoop(MapReduce 和分布式文件系统)、redis(分布式缓存)和 Nginx(负载平衡、网关、反向代理)等,但是也有用 Python 构建的定制服务,即 Trie 分发和构建服务,Trie 数据结构也是定制的。
这个实现中的后端服务被构建为可持续使用,不需要太多编排。例如,如果一个活动后端主机停止响应,则它对应的临时节点 znode 注册表最终会消失,而另一个备用后端节点将尝试通过 zookeeper 上的 临时节点 znode 声明该位置来取代它的位置。
分发服务器使用并提供给分发服务器的数据结构是 Trie ( 译注 :又称前缀树、字典树,是一种有序树,用于保存关联数据),其每个前缀节点都有一个热门短语列表。热门短语是使用 享元模式(flyweight pattern)进行引用的,这意味着短语的字符串文字仅存储一次。每个前缀节点都有一个热门短语列表,这是对字符串文本的引用列表。
正如我们之前看到的,我们将需要大约 30 GB 来索引 10 亿个查询,这大约是上述 Trie 存储 10 亿个查询所需的内存。由于我们希望将 Trie 保存在内存中,以便为给定的查询启用快速查找时间,因此,我们将 Trie 划分为多个 Trie,每个 Trie 在不同的机器上进行。这一做法减轻了任何给定机器上的内存负载。
为了提高可用性,托管这些 Trie 的服务也将具有多个副本。为了提高持久性,Trie 的序列化版本将在分布式文件系统(HDFS)中可用,并且可以通过 MapReduce 作业以一种可预测的、确定性的方式重新构建。
1、客户端通过 http://localhost:80/search?phrase="a user query" 将搜索到的短语提交到网关:
2、由于搜索服务器不在此实现的范围内,网关通过 http://assembler.collector-load-balancer:6000/collect-phrase?phrase="a user query" 直接将搜索短语发送到收集器的负载均衡器:
3、收集器的负载均衡器通过 http://assembler.collector:7000/collect-phrase?phrase="a user query" 将请求转发到其中一个收集器后端:
4、收集器后端向消息代理(Kafka)发送短语主题的消息。关键和价值在于短语本身【4】。
5、Kafka Connect HDFS Connector 汇编器。kafka-connect 将短语主题中的消息转储到 /phrases/1_sink/phrases/{30 minute window timestamp} 【5】文件夹【6】中。
6、 触发 MapReduce 作业【7】:通过加权每个短语的新近度和频率,它们将搜索的短语减少到一个单独的 HDFS 文件中【8】。
7、Trie Builder 服务正在监听 / phrases/assembler/last_built_target zonde 中的更改,它基于 /phrases/4_with_weight_ordered/{TARGET_ID} 文件为每个分区【10】构建 Trie。例如,一个 Trie 可以覆盖前缀直到 mod,另一个从 mod 到 racke,还有一个从 racke 开始。
1、分发服务器后端可以处于活动模式(服务请求)或备用模式。处于备用模式的节点将获取最近的 Trie,将它们加载到内存中,并将自己标记为准备接管活动位置。具体如下:
a. 备用节点在监听对 znode /phrases/distributor/next_target 的更改时,检测其修改并为每个每个分区创建一个 临时的顺序节点 znode,一次一个,位于 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/ znode。如果创建的 znode 是第一个 R znode 之一(R 是每个分区的副本节点数【11】),继续执行下一步。否则,从这个分区移除 znode 并尝试加入下一个分区。
b. 备用后端节点从 /phrases/5_tries/{TARGET_ID}/{PARTITION} 获取序列话的 Trie 文件,并开始将 Trie 加载到内存中。
c. 当 Trie 加载到内存时,备用后端节点通过将 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/{CREATED_ZNODE} znode 设置为后端主机名,将自己标记为就绪。
2、Trie 后端应用程序服务轮询 /phrases/distributor/{TARGET_ID}/ sub-znode ( TARGET_ID 是 /phrases/distributor/next_target 中定义的节点),并检查是否所有分区的所有节点都标记为就绪。
a. 如果它们都为下一个 TARGET_ID 做好了准备,那么服务将在单个事务中将 /phrases/distributor/next_target znode 的值更改为空,并将 /phrases/distributor/current_target znode 设置为新的 TARGET_ID 。通过这一步骤,所有标记为就绪的备用后端节点现在都将处于活动状态,并将用于以下分发服务器请求。
当分发服务器的后端节点处于活动状态并加载了它们各自的尝试后,我们就可以开始为给定的前缀提供热门短语请求:
1、客户端通过 http://localhost:80/top-phrases?prefix="some prefix" 向网关请求给定前缀的热门短语。
2、网关通过 http://distributor.load-balancer:5000/top-phrases?prefix="some prefix" 将此请求发送到分发服务器的负载均衡器。
、3 负载均衡器通过 http://distributor.frontend:8000/top-phrases?prefix="some prefix" 将请求转发到其中一个前端。
4、前端服务器处理请求:
a. 前端服务检查分布式缓存(redis)是否有这个前缀的条目【12】。如果是,则返回这些缓存的热门短语,否则,继续执行下一步。
b. 前端服务从 zookeeper( /phrases/distributor/{TARGET_ID}/partitions/ znode)获取当前 TARGET_ID 的分区,并选择与提供的前缀匹配的分区。
c. 前端服务从 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/ znode 中随机选择一个 znode,并获取其主机名。
d. 前端服务通过 http://{BACKEND_HOSTNAME}:8001/top-phrases="some prefix" 从选定的后端请求热门短语。
e. 后端使用其相应的加载 Trie 返回给定前缀的热门短语列表。
f. 前端服务将热门短语列表插入到分布式缓存(缓存模式)中,并返回热门短语。
注意:当系统运行时,请使用 shell 命令 docker exec -it zookeeper ./bin/zkCli.sh 查看当前 Zookeeper 的 znode。
注意:当系统运行时,在浏览器中访问 http://localhost:9870/explorer.html 来浏览当前的 HDFS 文件和文件夹。
你可以通过在浏览器中访问 http://localhost 与系统进行交互。当你输入一个查询时,系统会提供搜索建议,可以通过提交更多搜索来输入更多查询或短语到系统中。
你可以在 https://github.com/lopespm/autocomplete 上获得完整的源代码。
【1】 因为这个实现的主要目标是以简单的方式构建和共享系统,所以使用 Docker compose 代替了像 Kubernetes 或 Docker Swarm 这样的容器编排工具。
【2】 搜索查询的平均长度为 2.4 个词,英语中的平均词长为 4.7 个字符。
【3】 在本文中, 短语 和 查询 是可以互换使用。不过,在系统内部中,只使用 短语 这一术语。
【4】 在这个实现中,为清楚起见,只使用了代理的一个实例。但是,对于大量的传入请求,最好将该主题分为多个实例(消息将根据 短语 键进行分区),以便分配负载。
【5】 ** /phrases/1_sink/phrases/{30 minute window timestamp} 文件夹 :例如,假设消息 A[time: 19h02m] [time: 19h25m],C[time: 19h40m],消息 A 和 B 将放入文件夹 /phrases/1_sink/phrases/20200807_1900,而消息 C 将被放入文件夹 /phrases/1_sink/phrases/20200807_1930。
【6】 在将这些消息传递给 Hadoop 之前,我们还可以将它们预先聚合到另一个主题中(使用 Kafka Streams)。
【7】 为清楚起见,在这个实现中,MapReduce 作业是通过 make do_mapreduce_tasks 手动触发的,但是在生产环境中,它们可以通过 cron job 每 30 分钟触发一次。
【8】 可以添加一个额外的 MapReduce 来将 /phrases/1_sink/phrases/ 文件夹聚合为更大的时间 timespan 聚合(如 1 天,5 周,10 天等)。
【9】 可在 assembler/hadoop/mapreduce-tasks/do_tasks.sh 中通过变量 MAX_NUMBER_OF_INPUT_FOLDERS 进行配置。
【10】 分区在 assembler/trie-builder/triebuilder.py 中定义。
【11】 每个分区的副本节点数是通过 docker-compose.yml 中的环境变量 NUMBER_NODES_PER_PARTITION 配置的。
【12】 在这个实现中,默认情况下分布式缓存是禁用的,因此,对于首次使用这个代码库的人来说,可以更清楚地了解每个更新 / 步骤中发生了什么。分布式缓存可以通过 docker-compose.yml 中的环境变量 DISTRIBUTED_CACHE_ENABLED 来启用。
原文链接:
https://lopespm.github.io/2020/08/03/implementation-autocomplete-system-design.html