您当前的位置:首页 > 互联网百科 > 大数据

SpringBoot整合Spring Batch批处理框架,处理大数据新方案

时间:2023-09-20 14:25:09  来源:  作者: Spring全家桶实战案例源码

环境:Springboot2.6.12 + Spring Batch4.2.7


Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务场景:

  • 定期提交批处理。

  • 并行批处理:作业的并行处理

  • 分阶段、企业消息驱动的处理

  • 大规模并行批处理

  • 故障后手动或计划重新启动

  • 相关步骤的顺序处理(扩展到工作流驱动的批处理)

  • 部分处理:跳过记录(例如,回滚时)

  • 整批事务,适用于小批量或现有存储过程/脚本的情况

技术目标:

  • 批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。

  • 基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。

  • 提供通用的核心执行服务,作为所有项目都可以实现的接口。

  • 提供可“开箱即用”的核心执行接口的简单和默认实现。

  • 通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。

  • 所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。

  • 提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

Spring Batch的结构:

此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

下面介绍开发流程

本例完成 读取文件内容,经过处理后,将数据保存到数据库中

  • 引入依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-web</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency>  <groupId>MySQL</groupId>  <artifactId>mysql-connector-JAVA</artifactId></dependency><dependency>  <groupId>org.hibernate</groupId>  <artifactId>hibernate-validator</artifactId>  <version>6.0.7.Final</version></dependency>
  • 应用配置文件

spring:  datasource:    driverClassName: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8    username: root    password: *******    type: com.zaxxer.hikari.HikariDataSource    hikari:      minimumIdle: 10      maximumPoolSize: 200      autoCommit: true      idleTimeout: 30000      poolName: MasterDatabookHikariCP      maxLifetime: 1800000      connectionTimeout: 30000      connectionTestQuery: SELECT 1---spring:  jpa:    generateDdl: false    hibernate:      ddlAuto: update    openInView: true    show-sql: true---spring:  batch:    job:      enabled: false #是否自动执行任务    initialize-schema: always  #自动为我们创建数据库脚本

 

  • 开启批处理功能

@Configuration@EnableBatchProcessingpublic class BatchConfig extends DefaultBatchConfigurer{}
  • 任务启动器

接着上一步的配置类BatchConfig重写对应方法

@Overrideprotected JobLauncher createJobLauncher() throws Exception {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher();  jobLauncher.setJobRepository(createJobRepository());  jobLauncher.afterPropertiesSet();  return jobLauncher;}
  • 任务存储

接着上一步的配置类BatchConfig重写对应方法

@Resourceprivate PlatformTransactionManager transactionManager ;@Overrideprotected JobRepository createJobRepository() throws Exception {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();  factory.setDatabaseType("mysql");  factory.setTransactionManager(transactionManager);  factory.setDataSource(dataSource);  factory.afterPropertiesSet();  return factory.getObject();}
  • 定义JOB

@Beanpublic Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){  return builder.get("myJob")      .incrementer(new RunIdIncrementer())      .flow(step)      .end()      .listener(jobExecutionListener)      .build();}
  • 定义ItemReader读取器

@Beanpublic ItemReader<Person> reader(){  FlatFileItemReader<Person> reader = new FlatFileItemReader<>();  reader.setResource(new ClassPathResource("cvs/persons.cvs"));  reader.setLineMApper(new DefaultLineMapper<Person>() {    // 代码块    {      setL.NETokenizer(new DelimitedLineTokenizer(",") {        {          setNames("id", "name");        }      }) ;      setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {        {          setTargetType(Person.class) ;        }      });    }  });  return reader;}
  • 定义ItemProcessor处理器

@Beanpublic ItemProcessor<Person, Person2> processorPerson(){  return new ItemProcessor<Person, Person2>() {      @Override      public Person2 process(Person item) throws Exception {        Person2 p = new Person2() ;        p.setId(item.getId()) ;        p.setName(item.getName() + ", pk");        return p ;      }  } ;}
  • 定义ItemWriter写数据

@Resourceprivate Validator<Person> validator ;@Resourceprivate EntityManagerFactory entityManagerFactory ;@Beanpublic ItemWriter<Person2> writerPerson(){  JpAItemWriter<Person2> writer = null ;  JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  writer = builder.build() ;  return writer;}
  • 定义Step

@Beanpublic Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){  return stepBuilderFactory    .get("myStep")    .<Person, Person>chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)    .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2).listener(new MyReadListener())    .processor(processor)    .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)    .listener(new MyWriteListener())    .build();}
  • 定义相应的监听器

public class MyReadListener implements ItemReadListener<Person> {
  private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
  @Override  public void beforeRead() {  }
  @Override  public void afterRead(Person item) {    System.out.println("reader after: " + Thread.currentThread().getName()) ;  }
  @Override  public void onReadError(Exception ex) {    logger.info("读取数据错误:{}", ex);  }}
@Componentpublic class MyWriteListener implements ItemWriteListener<Person> {      private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);     @Override    public void beforeWrite(List<? extends Person> items) {    }        @Override    public void afterWrite(List<? extends Person> items) {      System.out.println("writer after: " + Thread.currentThread().getName()) ;    }        @Override    public void onWriteError(Exception exception, List<? extends Person> items) {      try {        logger.info(format("%s%n", exception.getMessage()));        for (Person item : items) {            logger.info(format("Failed writing BlogInfo : %s", item.toString()));        }      } catch (Exception e) {        e.printStackTrace();      }    }}

person.cvs文件内容

 

实体类:

@Entity@Table(name = "t_person")public class Person { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id ; private String name ;}

启动任务执行 

@RestController@RequestMapping("/demo")public class DemoController {  @Resource  @Qualifier("myJob")  private Job job ;  @Resource  private JobLauncher launcher ;  @GetMapping("/index")  public Object index() {    JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;    try {      launcher.run(job, jobParameters) ;    } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException        | JobParametersInvalidException e) {      e.printStackTrace();    }    return "success" ;  }}

启动服务,自动为我们创建了表

 

执行任务

查看表情况

 

完毕!!!



Tags:Spring Batch   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
SpringBoot整合Spring Batch批处理框架,处理大数据新方案
环境:Springboot2.6.12 + Spring Batch4.2.7Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Sp...【详细内容】
2023-09-20  Search: Spring Batch  点击:(79)  评论:(0)  加入收藏
▌简易百科推荐
大数据杀熟何时告别“人人喊打却无可奈何”?
2月7日郑州飞往珠海的航班,不同手机、不同账号搜索该航班显示出不同价格。图源网络有网友近日分享在某平台的购票经历,引发社会广泛关注&mdash;&mdash;用3个账号买同一航班同...【详细内容】
2024-01-30    中国青年网  Tags:大数据杀熟   点击:(32)  评论:(0)  加入收藏
简易百科:到底什么是大数据?
随着互联网的快速发展,大数据已经成为了当今社会最热门的话题之一。那么,到底什么是大数据呢?首先,我们需要明确大数据的定义。大数据是指数据量极大、类型繁多、处理难度高的数...【详细内容】
2024-01-30    简易百科  Tags:大数据   点击:(40)  评论:(0)  加入收藏
数据采集新篇章:AI与大模型的融合应用
开篇在AIGC(人工智能与通用计算)应用中,大型语言模型(LLM)占据着举足轻重的地位。这些模型,如GPT和BERT系列,通过处理和分析庞大的数据集,已经极大地推动了自然语言理解和生成的边界...【详细内容】
2024-01-17  崔皓  51CTO  Tags:数据采集   点击:(50)  评论:(0)  加入收藏
挑战 Spark 和 Flink?大数据技术栈的突围和战争
十年的轮回,正如大数据的发展一般,它既是一个轮回的结束,也是崭新的起点。大数据在过去的二十年中蓬勃发展,从无到有,崛起为最具爆炸性的技术领域之一,逐渐演变成为每个企业不可或...【详细内容】
2024-01-17  InfoQ    Tags:大数据   点击:(40)  评论:(0)  加入收藏
分布式存储系统在大数据处理中扮演着怎样的角色?
如果存储节点本身可以定制,则通常会让其支持部分计算能力,以利用数据的亲和性,将部分计算下推到相关的存储节点上。如果存储是云上的 S3 等对象存储,无法定制,则通常会将数据在计...【详细内容】
2023-12-19  木鸟杂记  微信公众号  Tags:大数据   点击:(48)  评论:(0)  加入收藏
大数据如何实时拯救生命:车联网的数据分析有助预防交通事故
译者 | 李睿审校 | 重楼车联网(IoV)是汽车行业与物联网相结合的产物。预计车联网数据规模将越来越大,尤其是当电动汽车成为汽车市场新的增长引擎。问题是:用户的数据平台准备...【详细内容】
2023-12-19    51CTO  Tags:大数据   点击:(41)  评论:(0)  加入收藏
利用生成对抗网络进行匿名化数据处理
在互联网时代,数据日益成为人们的生产资料。然而,在某些情况下,我们需要分享数据,但又需要保护个人隐私。这时,匿名化技术就显得尤为重要。本文将介绍利用生成对抗网络进行匿名化...【详细内容】
2023-12-18  技巧达人小影    Tags:数据处理   点击:(56)  评论:(0)  加入收藏
盘点那些常见的数据中心类型,你知道几个?
在数字化潮流的浪潮下,数据中心如同企业的神经系统,关系到业务的稳健运转。而在这个巨大的网络中,各种数据中心类型如雨后春笋般崭露头角。从企业级的个性至云数据中心的虚拟化...【详细内容】
2023-12-07  数据中心之家  微信公众号  Tags:数据中心   点击:(65)  评论:(0)  加入收藏
数据中心的七个关键特征
随着信息技术的不断演进,数据中心的可靠性、可扩展性、高效性、安全性、灵活性、管理性和可持续性成为业界探讨的焦点。下面让我们一同深入剖析这些关键特征,了解它们是如何影...【详细内容】
2023-12-06  数据中心之家  微信公众号  Tags:数据   点击:(63)  评论:(0)  加入收藏
什么是数据解析?将数据转化为更好的决策
什么是数据解析?数据解析是一门专注于从数据中获取洞察力的学科。它包含数据分析(data analysis)和管理的流程、工具和技术,包括数据的收集、组织和存储。数据解析的主要目的是...【详细内容】
2023-12-06  计算机世界    Tags:数据解析   点击:(62)  评论:(0)  加入收藏
站内最新
站内热门
站内头条