您当前的位置:首页 > 电脑百科 > 数据库 > MYSQL

一次打通FlinkCDC同步Mysql数据

时间:2023-06-18 13:26:55  来源:微信公众号  作者:不焦躁的程序员

业务痛点

离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。

业务中常见的需要数据同步的场景

1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。

2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再MySQL的数据中,后来为了方便检索需要写入到ES,或者为了缓存需要写入到redis,或者是Mysql分表的数据合并写入到Doris中。

3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。

4、应急场景。如果不采专用CDC的方案,那么要达到实时查询的效果,只能在BFF层的代码调用多个中心层的查询API,然后再BFF层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询API,临时开发的话,会让开发进度不可控。

总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。

数据同步为什么需要独立的系统来做

这种任务放在业务代码里做,是不可持续的。你要尽量让业务系统解耦,专注于做业务的事情,这种数据同步的任务应该交给专门的系统来做。如果在业务系统中增加额外的数据同步功能,同时为了提高数据同步的可用性,就需要写许多数据同步的代码和容错的代码(效率问题、并发问题、数据一致性问题、集群问题等等),这会让业务系统不堪重负,到后期业务系统几乎会达到不可维护的地步。

CDC登场

基于以上问题,本场数据同步的主角FlinkCDC就登场了,FlinkCDC是专门为数据同步(同步+计算)而生。通过CDC工具,可以将数据同步任务从业务系统中解耦出来,同时还可以将一份变动的数据,写入到多个存储中。这种方式不但让业务系统解耦,而且可以让数据同步任务更加健壮,方便后续的维护。

CDC原理

CDC是什么

CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。

目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。

Debezium官方https://debezium.io/

Debezium官方定义:Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your Apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong。翻译过来则是:Debezium 是一个用于变更数据捕获的开源分布式平台。 启动它,将其指向您的数据库,您的应用程序就可以开始响应其他应用程序提交给您的数据库的所有插入、更新和删除操作。 Debezium 耐用且快速,因此即使出现问题,您的应用程序也可以快速响应并且不会错过任何事件。

CDC原理

CDC的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。

FlinkCDC

FlinkCDC是什么

官网地址:https://ververica.Github.io/flink-cdc-connectors/

官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义,FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。

为什么是FlinkCDC

1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。

2、Flink的DataStream和SQL比较成熟和易用

3、Flink支持状态后端(State Backends),允许存储海量的数据状态

4、Flink有更好的生态,更多的Source和Sink的支持

数据流向对比

数据合并流向:

数据多写流向:

技术方案比较

网上有数据同步的多种技术方案的比较,我只挑选我实践过的2种做个比较,Canal和FlinkCDC。

数据链路对比

通过下图,我们可以看到Canal处理数据的链路比FlinkCDC更长,数据链路一旦变长意味着,出错的可能性更高。

我在实践Canal的过程中,监听到Kafka之后,通过一个Springboot项目的微服务项目去监听Kafka处理业务逻辑,这种负责度更高,内部数据关联啥的也是调用Dubbo API,我不建议你也使用这种方法。当然啦,这是我没遇到Flink之前的方案,嘻嘻。当然还用过更差的方案,定时任务扫描,再写入别的库,哈哈。

变更数据的结构

Mysql单次提交多条数据的时候,Canal拿到的数据是1条数据,FlinkCDC拿到的是多条数据。FlinkCDC的这种方式更便于处理。

canal数据格式:

 
{"data":[{"id":"G00002","name":"潞城市小蚂蚁家政保洁有限公司","province_id":"32","province":"江苏省","city_id":"3201","city":"南京市","district_id":"320114","district":"雨花台区","address":"科创城23222333444","logo_url":"http://bm-oss.oss-cn-hangzhou.aliyuncs.com/jfe-app3.0/baba/goods/icon_112.png","slogan":"欢迎来到","credit_code":"2343243","master_name":"江鑫","master_idcard":"532524199911304246","power_group_id":"9996","opt_time":"2021-10-19 17:41:57","add_user_id":"132","add_user_name":"测试","add_time":"2020-07-27 13:59:34","emAIl":"123456@qq.com","master_wechat":null,"service_phone":"13232323232","max_shop_num":"5","pay_mode":null,"business_license":"https://guard.bm001.com/kacloud/null/image/MXVSeb7Pv4r1f1346237770562140.jpg","idcard_front":"https://guard.bm001.com/kacloud/null/image/13uty9yyvrlvWb346237808202139.png","idcard_back":"https://guard.bm001.com/kacloud/null/image/5uA7IblI6r1zoW346237778042125.png","cloud_shop_state":"0","expiration_time":null,"version_id":null,"company_type":"0","company_property":"1","main_sell":null,"introduction":null,"contact_name":null,"contact_phone":null,"certification_name":"潞城市小蚂蚁家政保洁有限公司","is_test":null,"login_account":null,"delete_at":"0","self_invitation_code":"IN6501"}],"database":"cloud_test","es":1669010586000,"id":8150,"isDdl":false,"mysqlType":{"id":"varchar(32)","name":"varchar(64)","province_id":"int(6)","province":"varchar(32)","city_id":"int(6)","city":"varchar(32)","district_id":"int(6)","district":"varchar(64)","address":"varchar(128)","logo_url":"varchar(500)","slogan":"varchar(255)","credit_code":"varchar(18)","master_name":"varchar(16)","master_idcard":"varchar(18)","power_group_id":"bigint(20)","opt_time":"datetime","add_user_id":"varchar(32)","add_user_name":"varchar(32)","add_time":"datetime","email":"varchar(255)","master_wechat":"varchar(255)","service_phone":"varchar(32)","max_shop_num":"int(11)","pay_mode":"int(1)","business_license":"varchar(128)","idcard_front":"varchar(128)","idcard_back":"varchar(128)","cloud_shop_state":"int(1)","expiration_time":"datetime","version_id":"bigint(20)","company_type":"tinyint(2)","company_property":"int(2)","main_sell":"varchar(200)","introduction":"varchar(512)","contact_name":"varchar(32)","contact_phone":"varchar(11)","certification_name":"varchar(64)","is_test":"int(1)","login_account":"varchar(50)","delete_at":"bigint(14)","self_invitation_code":"char(6)"},"old":[{"address":"科创城23222333"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"name":12,"province_id":4,"province":12,"city_id":4,"city":12,"district_id":4,"district":12,"address":12,"logo_url":12,"slogan":12,"credit_code":12,"master_name":12,"master_idcard":12,"power_group_id":-5,"opt_time":93,"add_user_id":12,"add_user_name":12,"add_time":93,"email":12,"master_wechat":12,"service_phone":12,"max_shop_num":4,"pay_mode":4,"business_license":12,"idcard_front":12,"idcard_back":12,"cloud_shop_state":4,"expiration_time":93,"version_id":-5,"company_type":-6,"company_property":4,"main_sell":12,"introduction":12,"contact_name":12,"contact_phone":12,"certification_name":12,"is_test":4,"login_account":12,"delete_at":-5,"self_invitation_code":1},"table":"uc_company","ts":1669010468134,"type":"UPDATE"}

FlinkCDC数据格式:

 
{"before":{"id":"PF1784570096901248","pay_order_no":null,"out_no":"J1784570080435328","title":"充值办卡","from_user_id":"PG11111","from_account_id":"1286009802396288","user_id":"BO1707796995184000","account_id":"1707895210106496","amount":13400,"profit_state":1,"profit_time":1686758315000,"refund_state":0,"refund_time":null,"add_time":1686758315000,"remark":"充值办卡","acct_circle":"PG11111","user_type":92,"from_user_type":90,"company_id":"PG11111","profit_mode":1,"type":2,"parent_id":null,"oc_profit_id":"1784570096901248","keep_account_from_user_id":null,"keep_account_from_bm_user_id":null,"keep_account_user_id":null,"keep_account_bm_user_id":null,"biz_company_id":"PG11111"},"after":{"id":"PF1784570096901248","pay_order_no":null,"out_no":"J1784570080435328","title":"充值办卡","from_user_id":"PG11111","from_account_id":"1286009802396288","user_id":"BO1707796995184000","account_id":"1707895210106496","amount":13400,"profit_state":1,"profit_time":1686758315000,"refund_state":0,"refund_time":null,"add_time":1686758315000,"remark":"充值办卡1","acct_circle":"PG11111","user_type":92,"from_user_type":90,"company_id":"PG11111","profit_mode":1,"type":2,"parent_id":null,"oc_profit_id":"1784570096901248","keep_account_from_user_id":null,"keep_account_from_bm_user_id":null,"keep_account_user_id":null,"keep_account_bm_user_id":null,"biz_company_id":"PG11111"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1686734882000,"snapshot":"false","db":"cloud_test","sequence":null,"table":"acct_profit","server_id":1,"gtid":null,"file":"mysql-bin.000514","pos":650576218,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1686734882689,"transaction":null}

如何使用

FlinkCDC同步数据,有两种方式,一种是FlinkSQL的方式,一种是Flink DataStream和Table API的方式。为了方便管理,这两种方式我都写在代码里。

前置准备

1、准备好Flink集群。FlinkCDC也是以任务的形式提交到Flink集群去执行的。可以按照Flink官网进行下载安装:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/try-flink/local_installation/

2、开启Mysql的binlog。这一步自行解决。

FlinkSQL方式

为了方便管理,FlinkSQL方式也是用JAVA代码写

1、创建database

tEnv.executeSql("CREATE DATABASE IF NOT EXISTS cloud_test");tEnv.executeSql("CREATE DATABASE IF NOT EXISTS league_test");

2、创建source表

注意类型是'connector' = 'mysql-cdc'

tEnv.executeSql("CREATE TABLE league_test.oc_settle_profit (n" +        "    id                           STRING,n" +        "    show_profit_id               STRING,n" +        "    order_no                     STRING,n" +        "    from_user_id                 STRING,n" +        "    from_user_type               INT,n" +        "    user_id                      STRING,n" +        "    user_type                    INT,n" +        "    rate                         INT,n" +        "    amount                       INT,n" +        "    type                         INT,n" +        "    add_time                     TIMESTAMP,n" +        "    state                        INT,n" +        "    expect_profit_time           TIMESTAMP,n" +        "    profit_time                  TIMESTAMP,n" +        "    profit_mode                  INT,n" +        "    opt_code                     STRING,n" +        "    opt_name                     STRING,n" +        "    acct_circle                  STRING,n" +        "    process_state                INT,n" +        "    parent_id                    STRING,n" +        "    keep_account_from_user_id    STRING,n" +        "    keep_account_from_bm_user_id STRING,n" +        "    keep_account_user_id         STRING,n" +        "    keep_account_bm_user_id      STRING,n" +        "    biz_type                     INT,n" +        "    remark                       STRING,n" +        "    contribute_user_id           STRING,n" +        "    relation_brand_owner_id      STRING,n" +        "    PRIMARY KEY (id) NOT ENFORCEDn" +        ") WITH (n" +        "  'connector' = 'mysql-cdc',n" +        "  'hostname' = '10.20.1.11',n" +        "  'port' = '3306',n" +        "  'username' = 'root',n" +        "  'password' = '123456',n" +        "  'database-name' = 'league_test',n" +        "  'table-name' = 'oc_settle_profit',n" +        "  'scan.incremental.snapshot.enabled' = 'false'n" +        ")");

3、创建sink表

注意类型是'connector' = 'jdbc'

tEnv.executeSql("CREATE TABLE cloud_test.dws_profit_record_hdj_flink (n" +        "    id                      STRING,n" +        "    show_profit_id          STRING,n" +        "    order_no                STRING,n" +        "    from_user_id            STRING,n" +        "    from_user_type          INT,n" +        "    user_id                 STRING,n" +        "    user_type               INT,n" +        "    amount                  INT,n" +        "    profit_time             TIMESTAMP,n" +        "    state                   INT,n" +        "    acct_circle             STRING,n" +        "    biz_type                INT,n" +        "    contribute_user_id      STRING,n" +        "    relation_brand_owner_id STRING,n" +        "    remark                  STRING,n" +        "    add_time                TIMESTAMP,n" +        "    PRIMARY KEY (id) NOT ENFORCEDn" +        ") WITH (n" +        "  'connector' = 'jdbc',n" +        "  'url' = 'jdbc:mysql://10.20.1.11:3306/cloud_test',n" +        "  'username' = 'root',n" +        "  'password' = 'root12345',n" +        "  'table-name' = 'dws_profit_record_hdj_flink'n" +        ")");

4、执行insert。

如果需要多表关联的,可以注册多个'connector' = 'jdbc'的源表,然后这里编写类似insert into select join这样代码

tEnv.executeSql("INSERT INTO cloud_test.dws_profit_record_hdj_flink (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,n" +        "                                              user_type, amount, profit_time, state, acct_circle, biz_type,n" +        "                                              contribute_user_id, relation_brand_owner_id, remark, add_time)n" +        "select f.id,n" +        "       f.show_profit_id,n" +        "       f.order_no,n" +        "       f.from_user_id,n" +        "       f.from_user_type,n" +        "       f.user_id,n" +        "       f.user_type,n" +        "       f.amount,n" +        "       f.profit_time,n" +        "       f.state,n" +        "       f.acct_circle,n" +        "       f.biz_type,n" +        "       f.contribute_user_id,n" +        "       f.relation_brand_owner_id,n" +        "       f.remark,n" +        "       f.add_timen" +        "from league_test.oc_settle_profit fn" +        "where f.id is not nulln" +        "  and f.biz_type is not nulln" +        "  and f.biz_type = 9");

FlinkSQL方式结束,此时只要source表有变动,那么会自动监听到数据,自动插入到新的表中。

DataStream和Table API方式

个人觉得这种方式虽说有些繁琐,但是灵活度更好,可以用Java代码处理很多逻辑,比SQL更灵活些。

1、监听source

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()                .hostname(MYSQL_HOST)                .port(MYSQL_PORT)                .databaseList(SYNC_DB) // set captured database                .tableList(String.join(",", SYNC_TABLES)) // set captured table                .username(MYSQL_USER)                .password(MYSQL_PASSWD)                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);        env.enableCheckpointing(5000);
        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + LeagueOcSettleProfit2DwsHdjProfitRecordAPI.class.getName());

2、清洗数据(过滤、转换等等)

此处逻辑比较自定义,文中是过滤掉了不相关的表,然后过滤掉了删除数据的log。

过滤掉不相关的表。

private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {        return source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String row) throws Exception {                try {                    JSONObject rowJson = JSON.parseobject(row);                    JSONObject source = rowJson.getJSONObject("source");                    String tbl = source.getString("table");                    return table.equals(tbl);                } catch (Exception ex) {                    ex.printStackTrace();                    return false;                }            }        });    }

过滤掉删除数据的log

private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {        return source.flatMap(new FlatMapFunction<String, String>() {            @Override            public void flatMap(String row, Collector<String> out) throws Exception {                try {                    LOG.info("============================row:{}", row);                    JSONObject rowJson = JSON.parseObject(row);                    String op = rowJson.getString("op");                    //history,insert,update                    if (Arrays.asList("r", "c", "u").contains(op)) {                        out.collect(rowJson.getJSONObject("after").toJSONString());                    } else {                        LOG.info("filter other op:{}", op);                    }                } catch (Exception ex) {                    LOG.warn("filter other format binlog:{}", row);                }            }        });    }

处理业务逻辑,过滤掉了部分数据

private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {        return cleanStream.filter(new FilterFunction<String>() {            @Override            public boolean filter(String data) throws Exception {                try {                    JSONObject dataJson = JSON.parseObject(data);                    String id = dataJson.getString("id");                    Integer bizType = dataJson.getInteger("biz_type");                    if (StringUtils.isBlank(id) || bizType == null) {                        return false;                    }                    // 只处理上岗卡数据                    return bizType == 9;                } catch (Exception ex) {                    LOG.warn("filter other format binlog:{}", data);                    return false;                }            }        });    }

3、创建自定义sink,将数据写出去

private static class CustomDealDataSink extends RichSinkFunction<String> {        private transient Connection cloudConnection;        private transient PreparedStatement cloudPreparedStatement;
        private String insertSql = "INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,n" +                "                                              user_type, amount, profit_time, state, acct_circle, biz_type,n" +                "                                              contribute_user_id, relation_brand_owner_id, remark, add_time)n" +                "VALUES (?, ?, ?, ?, ?, ?, ?, ?,n" +                "        ?, ?, ?, ?, ?, ?, ?, ?)";        private String deleteSql = "delete from dws_profit_record_hdj_flink_api where id = '%s'";
        @Override        public void open(Configuration parameters) throws Exception {            super.open(parameters);            // 在这里初始化 JDBC 连接            cloudConnection = DriverManager.getConnection("jdbc:mysql://10.20.1.11:3306/cloud_test", "root", "123456");            cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);        }
        @Override        public void invoke(String value, Context context) throws Exception {            JSONObject dataJson = JSON.parseObject(value);            String id = dataJson.getString("id");            String showProfitId = dataJson.getString("show_profit_id");            String orderNo = dataJson.getString("order_no");            String fromUserId = dataJson.getString("from_user_id");            Integer fromUserType = dataJson.getInteger("from_user_type");            String userId = dataJson.getString("user_id");            Integer userType = dataJson.getInteger("user_type");            Integer amount = dataJson.getInteger("amount");            Timestamp addTime = dataJson.getTimestamp("add_time");            Integer state = dataJson.getInteger("state");            Timestamp profitTime = dataJson.getTimestamp("profit_time");            String acctCircle = dataJson.getString("acct_circle");            Integer bizType = dataJson.getInteger("biz_type");            String remark = dataJson.getString("remark");            String contributeUserId = dataJson.getString("contribute_user_id");            String relationBrandOwnerId = dataJson.getString("relation_brand_owner_id");
            Timestamp profitTimeTimestamp = Timestamp.valueOf(DateFormatUtils.format(profitTime.getTime(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")));            Timestamp addTimeTimestamp = Timestamp.valueOf(DateFormatUtils.format(addTime.getTime(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")));
            cloudPreparedStatement.setString(1, id);            cloudPreparedStatement.setString(2, showProfitId);            cloudPreparedStatement.setString(3, orderNo);            cloudPreparedStatement.setString(4, fromUserId);            cloudPreparedStatement.setInt(5, fromUserType);            cloudPreparedStatement.setString(6, userId);            cloudPreparedStatement.setInt(7, userType);            cloudPreparedStatement.setInt(8, amount);            cloudPreparedStatement.setTimestamp(9, profitTimeTimestamp);            cloudPreparedStatement.setInt(10, state);            cloudPreparedStatement.setString(11, StringUtils.isBlank(acctCircle) ? "PG11111" : acctCircle);            cloudPreparedStatement.setInt(12, bizType);            cloudPreparedStatement.setString(13, contributeUserId);            cloudPreparedStatement.setString(14, relationBrandOwnerId);            cloudPreparedStatement.setString(15, remark);            cloudPreparedStatement.setTimestamp(16, addTimeTimestamp);
            cloudPreparedStatement.execute(String.format(deleteSql, id));            cloudPreparedStatement.execute();        }
        @Override        public void close() throws Exception {            super.close();            // 在这里关闭 JDBC 连接            cloudPreparedStatement.close();            cloudConnection.close();        }    }

代码地址

代码里有2个夹子,一个是API方式的,一个是SQL方式的,每种方式了放了2个例子,代码地址如下:https://github.com/yclxiao/flink-cdc-demo.git

如果在实践的过程中碰到问题,可以在这里找到我:http://www.mangod.top/articles/2023/03/15/1678849930601.html



Tags:Mysql   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
MySQL 核心模块揭秘
server 层会创建一个 SAVEPOINT 对象,用于存放 savepoint 信息。binlog 会把 binlog offset 写入 server 层为它分配的一块 8 字节的内存里。 InnoDB 会维护自己的 savepoint...【详细内容】
2024-04-03  Search: Mysql  点击:(5)  评论:(0)  加入收藏
MySQL 核心模块揭秘,你看明白了吗?
为了提升分配 undo 段的效率,事务提交过程中,InnoDB 会缓存一些 undo 段。只要同时满足两个条件,insert undo 段或 update undo 段就能被缓存。1. 关于缓存 undo 段为了提升分...【详细内容】
2024-03-27  Search: Mysql  点击:(10)  评论:(0)  加入收藏
MySQL:BUG导致DDL语句无谓的索引重建
对于5.7.23之前的版本在评估类似DDL操作的时候需要谨慎,可能评估为瞬间操作,但是实际上线的时候跑了很久,这个就容易导致超过维护窗口,甚至更大的故障。一、问题模拟使用5.7.22...【详细内容】
2024-03-26  Search: Mysql  点击:(9)  评论:(0)  加入收藏
从 MySQL 到 ByteHouse,抖音精准推荐存储架构重构解读
ByteHouse是一款OLAP引擎,具备查询效率高的特点,在硬件需求上相对较低,且具有良好的水平扩展性,如果数据量进一步增长,可以通过增加服务器数量来提升处理能力。本文将从兴趣圈层...【详细内容】
2024-03-22  Search: Mysql  点击:(23)  评论:(0)  加入收藏
MySQL自增主键一定是连续的吗?
测试环境:MySQL版本:8.0数据库表:T (主键id,唯一索引c,普通字段d)如果你的业务设计依赖于自增主键的连续性,这个设计假设自增主键是连续的。但实际上,这样的假设是错的,因为自增主键不...【详细内容】
2024-03-10  Search: Mysql  点击:(5)  评论:(0)  加入收藏
准线上事故之MySQL优化器索引选错
1 背景最近组里来了许多新的小伙伴,大家在一起聊聊技术,有小兄弟提到了MySQL的优化器的内部策略,想起了之前在公司出现的一个线上问题,今天借着这个机会,在这里分享下过程和结论...【详细内容】
2024-03-07  Search: Mysql  点击:(26)  评论:(0)  加入收藏
MySQL数据恢复,你会吗?
今天分享一下binlog2sql,它是一款比较常用的数据恢复工具,可以通过它从MySQL binlog解析出你要的SQL,并根据不同选项,可以得到原始SQL、回滚SQL、去除主键的INSERT SQL等。主要...【详细内容】
2024-02-22  Search: Mysql  点击:(43)  评论:(0)  加入收藏
如何在MySQL中实现数据的版本管理和回滚操作?
实现数据的版本管理和回滚操作在MySQL中可以通过以下几种方式实现,包括使用事务、备份恢复、日志和版本控制工具等。下面将详细介绍这些方法。1.使用事务:MySQL支持事务操作,可...【详细内容】
2024-02-20  Search: Mysql  点击:(51)  评论:(0)  加入收藏
为什么高性能场景选用Postgres SQL 而不是 MySQL
一、 数据库简介 TLDR;1.1 MySQL MySQL声称自己是最流行的开源数据库,它属于最流行的RDBMS (Relational Database Management System,关系数据库管理系统)应用软件之一。LAMP...【详细内容】
2024-02-19  Search: Mysql  点击:(37)  评论:(0)  加入收藏
MySQL数据库如何生成分组排序的序号
经常进行数据分析的小伙伴经常会需要生成序号或进行数据分组排序并生成序号。在MySQL8.0中可以使用窗口函数来实现,可以参考历史文章有了这些函数,统计分析事半功倍进行了解。...【详细内容】
2024-01-30  Search: Mysql  点击:(53)  评论:(0)  加入收藏
▌简易百科推荐
MySQL 核心模块揭秘
server 层会创建一个 SAVEPOINT 对象,用于存放 savepoint 信息。binlog 会把 binlog offset 写入 server 层为它分配的一块 8 字节的内存里。 InnoDB 会维护自己的 savepoint...【详细内容】
2024-04-03  爱可生开源社区    Tags:MySQL   点击:(5)  评论:(0)  加入收藏
MySQL 核心模块揭秘,你看明白了吗?
为了提升分配 undo 段的效率,事务提交过程中,InnoDB 会缓存一些 undo 段。只要同时满足两个条件,insert undo 段或 update undo 段就能被缓存。1. 关于缓存 undo 段为了提升分...【详细内容】
2024-03-27  爱可生开源社区  微信公众号  Tags:MySQL   点击:(10)  评论:(0)  加入收藏
MySQL:BUG导致DDL语句无谓的索引重建
对于5.7.23之前的版本在评估类似DDL操作的时候需要谨慎,可能评估为瞬间操作,但是实际上线的时候跑了很久,这个就容易导致超过维护窗口,甚至更大的故障。一、问题模拟使用5.7.22...【详细内容】
2024-03-26  MySQL学习  微信公众号  Tags:MySQL   点击:(9)  评论:(0)  加入收藏
从 MySQL 到 ByteHouse,抖音精准推荐存储架构重构解读
ByteHouse是一款OLAP引擎,具备查询效率高的特点,在硬件需求上相对较低,且具有良好的水平扩展性,如果数据量进一步增长,可以通过增加服务器数量来提升处理能力。本文将从兴趣圈层...【详细内容】
2024-03-22  字节跳动技术团队    Tags:ByteHouse   点击:(23)  评论:(0)  加入收藏
MySQL自增主键一定是连续的吗?
测试环境:MySQL版本:8.0数据库表:T (主键id,唯一索引c,普通字段d)如果你的业务设计依赖于自增主键的连续性,这个设计假设自增主键是连续的。但实际上,这样的假设是错的,因为自增主键不...【详细内容】
2024-03-10    dbaplus社群  Tags:MySQL   点击:(5)  评论:(0)  加入收藏
准线上事故之MySQL优化器索引选错
1 背景最近组里来了许多新的小伙伴,大家在一起聊聊技术,有小兄弟提到了MySQL的优化器的内部策略,想起了之前在公司出现的一个线上问题,今天借着这个机会,在这里分享下过程和结论...【详细内容】
2024-03-07  转转技术  微信公众号  Tags:MySQL   点击:(26)  评论:(0)  加入收藏
MySQL数据恢复,你会吗?
今天分享一下binlog2sql,它是一款比较常用的数据恢复工具,可以通过它从MySQL binlog解析出你要的SQL,并根据不同选项,可以得到原始SQL、回滚SQL、去除主键的INSERT SQL等。主要...【详细内容】
2024-02-22  数据库干货铺  微信公众号  Tags:MySQL   点击:(43)  评论:(0)  加入收藏
如何在MySQL中实现数据的版本管理和回滚操作?
实现数据的版本管理和回滚操作在MySQL中可以通过以下几种方式实现,包括使用事务、备份恢复、日志和版本控制工具等。下面将详细介绍这些方法。1.使用事务:MySQL支持事务操作,可...【详细内容】
2024-02-20  编程技术汇    Tags:MySQL   点击:(51)  评论:(0)  加入收藏
MySQL数据库如何生成分组排序的序号
经常进行数据分析的小伙伴经常会需要生成序号或进行数据分组排序并生成序号。在MySQL8.0中可以使用窗口函数来实现,可以参考历史文章有了这些函数,统计分析事半功倍进行了解。...【详细内容】
2024-01-30  数据库干货铺  微信公众号  Tags:MySQL   点击:(53)  评论:(0)  加入收藏
mysql索引失效的场景
MySQL中索引失效是指数据库查询时无法有效利用索引,这可能导致查询性能显著下降。以下是一些常见的MySQL索引失效的场景:1.使用非前导列进行查询: 假设有一个复合索引 (A, B)。...【详细内容】
2024-01-15  小王爱编程  今日头条  Tags:mysql索引   点击:(82)  评论:(0)  加入收藏
站内最新
站内热门
站内头条