您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

Kafka实时数据即席查询应用与实践

时间:2023-05-25 13:33:21  来源:vivo互联网技术  作者:
在定位一些实时数据的Case时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。因此,我们需要对处理的这些实时数据进行记录归档并存储。

一、背景

Kafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有一定时效性的,比如保存24小时、36小时、48小时等。

二、内容

2.1 案例分析

这里以i视频和vivo短视频实时数据为例,之前存在这样的协作问题:

数据上游内容方提供实时Topic(存放i视频和vivo短视频相关实时数据),数据侧对实时数据进行逻辑处理后,发送给下游工程去建库实时索引,当任务执行一段时间后,工程侧建索引偶尔会提出数据没有发送过去的Case,前期由于没有对数据做存储,在定位问题的时候会比较麻烦,经常需求查看实时日志,需要花费很长的时间来分析这些Case是出现在哪个环节。

为了解决这个问题,我们可以将实时Topic中的数据,在发送给其他Topic的时候,添加跟踪机制,进行数据分流,Sink到存储介质(比如HDFS、Hive等)。这里,我们选择使用Hive来进行存储,主要是查询方便,支持SQL来快速查询。如下图所示:

图片

在实现优化后的方案时,有两种方式可以实现跟踪机制,它们分别是Flink SQL写Hive、Flink DataStream写Hive。接下来,分别对这两种实现方案进行介绍和实践。

2.2 方案一:Flink SQL写Hive

这种方式比较直接,可以在Flink任务里面直接操作实时Topic数据后,将消费后的数据进行分流跟踪,作为日志记录写入到Hive表中,具体实现步骤如下:

  • 构造Hive Catalog;
  • 创建Hive表;
  • 写入实时数据到Hive表。

2.2.1 构造Hive Catalog

在构造Hive Catalog时,需要初始化Hive的相关信息,部分代码片段如下所示:

// 设置执行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
 // 构造 Hive Catalog 名称
 String name = "video-hive-catalog";
 // 初始化数据库名
 String defaultDatabase = "comsearch";
 // Hive 配置文件路径地址
 String hiveConfDir = "/Appcom/hive/conf";
 // Hive 版本号
 String version = "3.1.2";
 // 实例化一个 HiveCatalog 对象
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 // 注册HiveCatalog
 tEnv.registerCatalog(name, hive);
 // 设定当前 HiveCatalog
 tEnv.useCatalog(name);
 // 设置执行SQL为Hive
 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 // 使用数据库
 tEnv.useDatabase("db1");

在以上代码中,我们首先设置了 Flink 的执行环境和表环境,然后创建了一个 HiveCatalog,并将其注册到表环境中。

2.2.2 创建Hive表

如果Hive表不存在,可以通过在程序中执行建表语句,具体SQL见表语句代码如下所示:

-- 创建表语句 
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
  `content_id` string,
  `status` int)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'auto-compaction'='true',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")

在创建Hive表时我们使用了IF NOT EXISTS关键字,如果Hive中该表不存在会自动在Hive上创建,也可以提前在Hive中创建好该表,Flink SQL中就无需再执行建表SQL,因为用了Hive的Catalog,Flink SQL运行时会找到表。这里,我们设置了auto-compaction属性为true,用来使小文件自动合并,1.12版的新特性,解决了实时写Hive产生的小文件问题。同时,指定metastore值是专门用于写入Hive的,也需要指定success-file值,这样CheckPoint触发完数据写入磁盘后会创建_SUCCESS文件以及Hive metastore上创建元数据,这样Hive才能够对这些写入的数据可查。

2.2.3 写入实时数据到Hive表

在准备完成2.2.1和2.2.2中的步骤后,接下来就可以在Flink任务中通过SQL来对实时数据进行操作了,具体实现代码片段如下所示:

// 编写业务SQL
 String insertSql = "insert into  xxx_table SELECT content_id, status, " +
                    " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt";
 // 执行 Hive SQL
 tEnv.executeSql(insertSql);
 // 执行任务
 env.execute();

将消费后的数据进行分类,编写业务SQL语句,将消费的数据作为日志记录,发送到Hive表进行存储,这样Kafka中的实时数据就存储到Hive了,方便使用Hive来对Kafka数据进行即席分析。

2.2.4 避坑技巧

使用这种方式在处理的过程中,如果配置使用的是EventTime,在程序中配置'sink.partition-commit.trigger'='partition-time',最后会出现无法提交分区的情况。经过对源代码PartitionTimeCommitTigger的分析,找到了出现这种异常情况的原因。

我们可以通过看

org.Apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions

中的一个函数,来说明具体的问题,部分源代码片段如下:

// PartitionTimeCommitTigger源代码函数代码片段
@Override
public List<String> committablePartitions(long checkpointId) {
 if (!watermarks.contAInsKey(checkpointId)) {
  throw new IllegalArgumentException(String.format(
    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
    checkpointId, watermarks));
 }
 long watermark = watermarks.get(checkpointId);
 watermarks.headMap(checkpointId, true).clear();
 
 List<String> needCommit = new ArrayList<>();
 Iterator<String> iter = pendingPartitions.iterator();
 while (iter.hasNext()) {
  String partition = iter.next();
  // 通过分区的值来获取分区的时间
  LocalDateTime partTime = extractor.extract(
    partitionKeys, extractPartitionValues(new Path(partition)));
  // 判断水印是否大于分区创建时间+延迟时间
  if (watermark > toMills(partTime) + commitDelay) {
   needCommit.add(partition);
   iter.remove();
  }
 }
 return needCommit;
}

通过分析上述代码片段,我们可以知道系统通过分区值来抽取相应的分区来创建时间,然后进行比对,比如我们设置的时间 pattern 是 '$dt $h:$m:00' , 某一时刻我们正在往 /2022-02-26/18/20/ 这个分区下写数据,那么程序根据分区值,得到的 pattern 将会是2022-02-26 18:20:00,这个值在SQL中是根据 DATA_FORMAT 函数获取的。

而这个值是带有时区的,比如我们的时区设置为东八区,2022-02-26 18:20:00这个时间是东八区的时间,换成标准 UTC 时间是减去8个小时,也就是2022-02-26 10:20:00,而在源代码中的 toMills 函数在处理这个东八区的时间时,并没有对时区进行处理,把这个其实应该是东八区的时间当做了 UTC 时间来处理,这样计算出来的值就比实际值大8小时,导致一直没有触发分区的提交。

如果我们在数据源中构造的分区是 UTC 时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区2022-02-26 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小8个小时的UTC时间2022-02-26 10:20:00。

在明白了原因之后,我们就可以针对上述异常情况进行优化我们的实现方案,比如自定义一个分区类、或者修改缺省的时间分区类。比如,我们使用TimeZ.NETableFunction类来实现一个自定义时区,部分参考代码片段如下:

public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
  private transient DateTimeFormatter formatter;
  private String timeZoneId;
  public CustomTimeZoneTableFunction(String timeZoneId) {
    this.timeZoneId = timeZoneId;
  }
  @Override
  public void open(FunctionContext context) throws Exception {
    // 初始化 DateTimeFormatter 对象
    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
    formatter = formatter.withZone(ZoneId.of(timeZoneId));
  }
  @Override
  public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
    // 将时间戳转换为 LocalDateTime 对象
    LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
    // 将 LocalDateTime 对象转换为指定时区下的 LocalDateTime 对象
    LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
    // 将 LocalDateTime 对象转换为 TimestampWithTimeZone 对象,并输出到下游
    out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
  }
}

2.3 方案二:Flink DataStream写Hive

在一些特殊的场景下,Flink SQL如果无法实现我们复杂的业务需求,那么我们可以考虑使用Flink DataStream写Hive这种实现方案。比如如下业务场景,现在需要实现这样一个业务需求,内容方将实时数据写入到Kafka消息队列中,然后由数据侧通过Flink任务消费内容方提供的数据源,接着对消费的数据进行分流处理(这里的步骤和Flink SQL写Hive的步骤类似),每分钟进行存储到HDFS(MapReduce任务需要计算和重跑HDFS数据),然后通过MapReduce任务将HDFS上的这些日志数据生成Hive所需要格式,最后将这些Hive格式数据文件加载到Hive表中。实现Kafka数据到Hive的即席分析功能,具体实现流程细节如下图所示:

图片

具体核心实现步骤如下:

  • 消费内容方Topic实时数据;
  • 生成数据预处理策略;
  • 加载数据;
  • 使用Hive SQL对Kafka数据进行即席分析。

2.3.1 消费内容方Topic实时数据

编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,在后面统一交给MapReduce来做数据预处理,直接消费并存储到HDFS上。具体实现代码如下所示:

public class Kafka2Hdfs {
    public static void main(String[] args) {
        // 判断参数是否有效
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        // 初始化Kafka连接地址和HDFS存储地址以及Flink并行度
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);
 
        // 实例化一个Flink任务对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Flink消费Topic中的数据
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
        // 实例化一个HDFS存储对象
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
        // 自定义存储到HDFS上的文件名,用小时和分钟来命名,方便后面算策略
        sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
        // 设置存储HDFS的文件大小和存储文件时间频率
        sink.setBatchSize(1024 * 1024 * 4);
        sink.setBatchRolloverInterval(1000 * 30);
        transction.addSink(sink);
        env.execute("Kafka2Hdfs");
    }
    // 初始化Kafka对象连接信息
    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

注意事项:

  • 这里我们把时间窗口设置小一些,每30s做一次Checkpoint,如果该批次的时间窗口没有数据过来,就生成一个文件落地到HDFS上;
  • 另外,我们重写了Bucketer为DateTimeBucketer,逻辑并不复杂,在原有的方法上加一个年-月-日/时-分的文件生成路径,例如在HDFS上的生成路径:xxxx/2022-02-26/00-00。

具体DateTimeBucketer实现代码如下所示:

public class DateMinuteBucketer implements Bucketer<String> {
    private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
    }
}

2.3.2 生成数据预处理策略

这里,我们需要对落地到HDFS上的文件进行预处理,处理的逻辑是这样的。比如,现在是2022-02-26 14:00,那么我们需要将当天的13:55,13:56,13:57,13:58,13:59这最近5分钟的数据处理到一起,并加载到Hive的最近5分钟的一个分区里面去。那么,我们需要生成这样一个逻辑策略集合,用HH-mm作为key,与之最近的5个文件作为value,进行数据预处理合并。具体实现代码步骤如下:

  • 步骤一:获取小时循环策略;
  • 步骤二:获取分钟循环策略;
  • 步骤三:判断是否为5分钟的倍数;
  • 步骤四:对分钟级别小于10的数字做0补齐(比如9补齐后变成09);
  • 步骤五:对小时级别小于10的数字做0补齐(比如1补齐后变成01);
  • 步骤六:生成时间范围;
  • 步骤七:输出结果。

其中,主要的逻辑是在生成时间范围的过程中,根据小时和分钟数的不同情况,生成不同的时间范围,并输出结果。在生成时间范围时,需要注意前导0的处理,以及特殊情况(如小时为0、分钟为0等)的处理。最后,将生成的时间范围输出即可。

根据上述步骤编写对应的实现代码,生成当天所有日期命名规则,预览部分结果如下:

图片

需要注意的是,如果发生了第二天00:00,那么我们需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55这5个文件中的数据来做预处理。

2.3.3 加载数据

在完成2.3.1和2.3.2里面的内容后,接下来,我们可以使用Hive的load命令直接加载HDFS上预处理后的文件,把数据加载到对应的Hive表中,具体实现命令如下:

-- 加载数据到Hive表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')

2.3.4 即席分析

之后,我们使用Hive SQL来对Kafka数据进行即席分析,示例SQL如下所示:

-- 查询某5分钟分区数据
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'

2.4 Flink SQL与 Flink DataStream如何选择

Flink SQL 和 Flink DataStream 都是 Flink 中用于处理数据的核心组件,我们可以根据自己实际的业务场景来选择使用哪一种组件。

Flink SQL 是一种基于 SQL 语言的数据处理引擎,它可以将 SQL 查询语句转换为 Flink 的数据流处理程序。相比于 Flink DataStream,Flink SQL 更加易于使用和维护,同时具有更快的开发速度和更高的代码复用性。Flink SQL 适用于需要快速开发和部署数据处理任务的场景,比如数据仓库、实时报表、数据清洗等。

Flink DataStream API是Flink数据流处理标准API,SQL是Flink后期版本提供的新的数据处理操作接口。SQL的引入为提高了Flink使用的灵活性。可以认为Flink SQL是一种通过字符串来定义数据流处理逻辑的描述语言。

因此,在选择 Flink SQL 和 Flink DataStream 时,需要根据具体的业务需求和数据处理任务的特点来进行选择。如果需要快速开发和部署任务,可以选择使用 Flink SQL;如果需要进行更为深入和定制化的数据处理操作,可以选择使用 Flink DataStream。同时,也可以根据实际情况,结合使用 Flink SQL 和 Flink DataStream 来完成复杂的数据处理任务。

三、 总结

在实际应用中,Kafka实时数据即席查询可以用于多种场景,如实时监控、实时报警、实时统计、实时分析等。具体应用和实践中,需要注意以下几点:

  • 数据质量:Kafka实时数据即席查询需要保证数据质量,避免数据重复、丢失或错误等问题,需要进行数据质量监控和调优。
  • 系统复杂性:Kafka实时数据即席查询需要涉及到多个系统和组件,包括Kafka、数据处理引擎(比如Flink)、查询引擎(比如Hive)等,需要对系统进行配置和管理,增加了系统的复杂性。
  • 安全性:Kafka实时数据即席查询需要加强数据安全性保障,避免数据泄露或数据篡改等安全问题,做好Hive的权限管控。
  • 性能优化:Kafka实时数据即席查询需要对系统进行性能优化,包括优化数据处理引擎、查询引擎等,提高系统的性能和效率。


Tags:Kafka   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  Search: Kafka  点击:(86)  评论:(0)  加入收藏
如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
译者 | 李睿审校 | 重楼在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时...【详细内容】
2024-01-26  Search: Kafka  点击:(47)  评论:(0)  加入收藏
深入浅出Kafka:高可用、顺序消费及幂等性
在我们旅行于数据海洋的途中,如果把 Kafka 比作是一艘承载无数信息航行的快船,前文《Kafka实战漫谈:大数据领域的不败王者》已经讲述了如何搭建起这艘快船,让它在起风的早晨开始...【详细内容】
2023-12-18  Search: Kafka  点击:(176)  评论:(0)  加入收藏
7k Star,一款开源的 Kafka 管理平台,功能齐全、页面美观!
Apache Kafka UI 是一个免费的开源 Web UI,用于监控和管理 Apache Kafka 集群,可方便地查看 Kafka Brokers、Topics、消息、Consumer 等情况,支持多集群管理、性能监控、访问控...【详细内容】
2023-12-15  Search: Kafka  点击:(133)  评论:(0)  加入收藏
利用Apache Kafka、Flink和Druid构建实时数据架构
译者 | 陈峻审校 | 重楼如今,对于使用批处理工作流程的数据团队而言,要满足业务的实时要求并非易事。从数据的交付、处理到分析,整个批处理工作流往往需要大量的等待,其中包括:等...【详细内容】
2023-12-11  Search: Kafka  点击:(232)  评论:(0)  加入收藏
运维兄弟!Kafka怎么又"超时"了?
现象凌晨,当运维刚躺下,就被业务研发的电话叫醒,"哥们!kafka服务又异常了?影响到业务了,快看看",业务研发给出的异常日志如下:基本分析 集群检查:立即确认kafka集群以及涉及到topic健...【详细内容】
2023-12-07  Search: Kafka  点击:(138)  评论:(0)  加入收藏
图解Kafka适用场景,全网最全!
消息系统消息系统被用于各种场景,如解耦数据生产者,缓存未处理的消息。Kafka 可作为传统的消息系统的替代者,与传统消息系统相比,kafka有更好的吞吐量、更好的可用性,这有利于处...【详细内容】
2023-11-29  Search: Kafka  点击:(183)  评论:(0)  加入收藏
Kafka有哪些应用场景?你能说上来几个?
下面我们来总结一下Kafka的一些应用场景:1、日志处理与分析(最常用的场景)下图显示了典型的 ELK(Elastic-Logstash-Kibana)堆栈。Kafka 有效地从每个实例收集日志流。ElasticSe...【详细内容】
2023-11-28  Search: Kafka  点击:(165)  评论:(0)  加入收藏
Kafka:解锁大数据时代的搜索与分析
在当今大数据时代,数据湖作为一种新兴的数据存储和分析解决方案,正受到越来越多企业的青睐。而作为一种高性能、可扩展的事件流平台,Kafka在数据湖领域发挥着重要的作用。本文...【详细内容】
2023-11-24  Search: Kafka  点击:(287)  评论:(0)  加入收藏
解密Kafka主题的分区策略:提升实时数据处理的关键
Kafka几乎是当今时代背景下数据管道的首选,无论你是做后端开发、还是大数据开发,对它可能都不陌生。开源软件Kafka的应用越来越广泛。面对Kafka的普及和学习热潮,哪吒想分享一...【详细内容】
2023-11-21  Search: Kafka  点击:(181)  评论:(0)  加入收藏
▌简易百科推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(12)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(54)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(50)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(68)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(86)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
花 15 分钟把 Express.js 搞明白,全栈没有那么难
Express 是老牌的 Node.js 框架,以简单和轻量著称,几行代码就可以启动一个 HTTP 服务器。市面上主流的 Node.js 框架,如 Egg.js、Nest.js 等都与 Express 息息相关。Express 框...【详细内容】
2024-01-16  程序员成功  微信公众号  Tags:Express.js   点击:(88)  评论:(0)  加入收藏
站内最新
站内热门
站内头条