106 基于消息队列来做 mysql 大数据表数据的遍历处理

前言

最近有这样的一个需求, 我们存在一张 很大的 mysql 数据表, 数据量大概是在 六百万左右 

然后 需要获取所有的记录, 将数据传输到 es 中 

然后 当时 我就写了一个脚本来读取 这张大表, 然后 分页获取数据, 然后 按页进行数据处理 转换到 es 

但是存在的问题是, 前面 还效率还可以, 但是 约到后面, 大概是到 三百多页, 的时候 从 mysql 读取数据 已经快不行了 

十分耗时, 这里就是 记录这个问题的 另外的处理方式 

我这里的处理是基于 消息中间件, 从 mysql 通过 datax/spoon 传输数据到 kafka 很快 

然后  java 程序从 kafka 中消费队列的数据 也很快, 最终 六百万的数据 读取 + 处理 合计差不多是 一个多小时完成, 其中处理 有一部分地方 业务上面比较耗时 

 

 

待处理的数据表

待处理的数据表如下, 里面合计 600w 的数据 

CREATE TABLE `student_all` (
  `id` int NOT NULL AUTO_INCREMENT,
  `field0` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field1` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field2` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field3` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field4` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field5` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field6` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field7` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field8` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field9` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field10` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field11` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field12` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field13` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field14` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field15` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field16` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field17` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field18` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field19` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field20` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field21` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field22` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field23` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field24` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field25` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field26` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field27` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field28` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `field29` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
  `CREATED_AT` bigint NOT NULL,
  `UPDATED_AT` bigint NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4379001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

 

 

基于 mysql 的数据分页处理

基于 mysql 的处理程序如下, 就是一个简单的 mysql 分页 

然后将需要提取的数据封装, 然后 批量提交给 es 

总的情况来说是 前面的一部分页是可以 很快的响应数据, 但是 越到后面, mysql 服务器越慢 

/**
 * Test05PostQy2Es
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2022/11/21 16:00
 */
public class Test05PostEsFromMysql {

    private static String mysqlUrl = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&autoReconnectForPools=true";
    private static String mysqlUsername = "postgres";
    private static String mysqlPassword = "postgres";
    private static JdbcTemplate mysqlJdbcTemplate = JdbcTemplateUtils.getJdbcTemplate(mysqlUrl, mysqlUsername, mysqlPassword);

    private static RestHighLevelClient esClient = getEsClient();
    private static IndicesClient indicesClient = esClient.indices();

    // Test05PostQy2Es
    public static void main(String[] args) throws Exception {

        String esIndexName = "student_all_20221211";
        bulkEsData(esIndexName);

    }

    private static void bulkEsData(String esIndexName) throws Exception {
        String queryDbTableName = "student_all";
        List<String> fieldList = Arrays.asList("id", "field0", "field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12", "field13", "field14", "field15", "field16", "field17", "field18", "field19", "field20", "field21", "field22", "field23", "field24", "field25", "field26", "field27", "field28", "field29", "CREATED_AT", "UPDATED_AT");

        String idKey = "id";
        String whereCond = "";
//        String orderBy = "order by id asc";
        String orderBy = "";
        AtomicInteger counter = new AtomicInteger(0);
        int pageSize = 1000;
        int startPage = 0;
        pageDo(queryDbTableName, whereCond, orderBy, pageSize, startPage, (pageNo, list) -> {
            BulkRequest bulkRequest = new BulkRequest();
            for (Map<String, Object> entity : list) {
                IndexRequest indexRequest = new IndexRequest(esIndexName);
                Map<String, Object> sourceMap = new LinkedHashMap<>();
                List<String> allFieldsListed = new ArrayList<>();
                for (String fieldName : fieldList) {
                    String fieldValue = String.valueOf(entity.get(fieldName));
                    sourceMap.put(fieldName, fieldValue);
                    allFieldsListed.add(Objects.toString(fieldValue, ""));
                }
                String id = String.valueOf(entity.get(idKey));
                indexRequest.id(id);
                sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));

                indexRequest.source(sourceMap);
                bulkRequest.add(indexRequest);
            }

            try {
                BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                counter.addAndGet(list.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(" page : " + pageNo + ", flushed " + counter.get() + " records ");
        });
    }

    private static void pageDo(String tableName, String whereCond, String orderBy, int pageSize, int startPage,
                               BiConsumer<Integer, List<Map<String, Object>>> func) {
        if (StringUtils.isNotBlank(whereCond) && (!whereCond.trim().toLowerCase().startsWith("where"))) {
            whereCond = " where " + whereCond;
        }
        if (StringUtils.isNotBlank(orderBy) && (!orderBy.trim().toLowerCase().startsWith("order"))) {
            orderBy = " order by " + orderBy;
        }

        String queryCountSql = String.format(" select count(*) from %s %s %s", tableName, whereCond, orderBy);
        Integer totalCount = mysqlJdbcTemplate.queryForObject(queryCountSql, Integer.class);
        Integer totalPage = (totalCount == null || totalCount == 0) ? 0 : (totalCount - 1) / pageSize + 1;
        for (int i = startPage; i < totalPage; i++) {
            int offset = i * pageSize;
            String queryPageSql = String.format(" select * from %s %s %s limit %s,%s ", tableName, whereCond, orderBy, offset, pageSize);
            List<Map<String, Object>> list = mysqlJdbcTemplate.queryForList(queryPageSql);
            func.accept(i, list);
        }
    }

}

 

 

基于中间件 kafka 的处理

首先通过 spoon/datax 将数据从 mysql 转换到 kafka 

然后 再由脚本从 kafka 消费数据, 处理 传输到 es 中 

入了一次 消息队列之后, 然后程序 再来消费, 就会快很多了, 消息队列本身功能比较单纯 比较适合于做做顺序遍历 就会有优势一些 

 

这里以 spoon 将数据从 mysql 转换到 kafka 

我这里 本地环境 内存等什么的都不足, 因此是 一分钟 入库三万条, 但是 实际生产环境 会很快 

在生产环境 五百多w 的数据, 基于 datax 传输 mysql 到 kafka, 差不多是 五六分钟 就可以了 

e3cb2b641cfe4d208e11040f1b5fbc2a.png

 

 

基于 kafka 将数据传输到 es 

如下程序 仅仅是将 kafka 中的数据 原样照搬过去了, 但是 实际的场景 中会做一些 额外的业务处理, 这里仅仅是为了 演示 

/**
 * Test05PostQy2Es
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2022/11/21 16:00
 */
public class Test05PostEsFromKafka {

    private static RestHighLevelClient esClient = getEsClient();
    private static IndicesClient indicesClient = esClient.indices();
    private static String esIndexName = "student_all_20221211";
    private static String groupId = "group-01";

    // Test05PostQy2Es
    public static void main(String[] args) throws Exception {

        bulkKafka2EsData(esIndexName, groupId);

    }

    private static void bulkKafka2EsData(String esIndexName, String groupId) throws Exception {
        List<Pair<String, String>> hjk2StdFieldMap = hjk2StdFieldMap();
        Properties properties = kafkaProperties(groupId);

        String idKey = "ID";
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("STUDENT_ALL_20221211"));
        AtomicInteger counter = new AtomicInteger(0);
        long start = System.currentTimeMillis();
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            if (records.isEmpty()) {
                Thread.sleep(10 * 1000);
                long spent = System.currentTimeMillis() - start;
                System.out.println(" spent : " + (spent / 1000) + " s ");
                continue;
            }

            BulkRequest bulkRequest = new BulkRequest();
            boolean isEmpty = true;
            for (ConsumerRecord<String, String> record : records) {
                IndexRequest indexRequest = new IndexRequest(esIndexName);
                String value = record.value();
                JSONObject entity = JSON.parseObject(value);

                // 获取 id
                String id = StringUtils.defaultIfBlank(entity.getString(idKey), "");
                if (isFilterByQy(id)) {
                    continue;
                }

                Map<String, Object> sourceMap = new LinkedHashMap<>();
                List<String> allFieldsListed = new ArrayList<>();
                for (Pair<String, String> entry : hjk2StdFieldMap) {
                    String hjkKey = entry.getKey(), stdKey = entry.getValue();
                    String fieldValue = StringUtils.defaultIfBlank(entity.getString(hjkKey), "");
                    sourceMap.put(stdKey, fieldValue);
                    allFieldsListed.add(Objects.toString(fieldValue, ""));
                }
                indexRequest.id(id);
                sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));

                isEmpty = false;
                indexRequest.source(sourceMap);
                bulkRequest.add(indexRequest);
            }
            if (isEmpty) {
                continue;
            }

            try {
                BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                counter.addAndGet(bulkRequest.requests().size());
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(" flushed " + counter.get() + " records ");
        }

    }

    private static List<Pair<String, String>> hjk2StdFieldMap() {
        List<Pair<String, String>> hjk2StdFieldMap = new ArrayList<>();
        hjk2StdFieldMap.add(new ImmutablePair<>("id", "id"));
        hjk2StdFieldMap.add(new ImmutablePair<>("CREATED_AT", "CREATED_AT"));
        hjk2StdFieldMap.add(new ImmutablePair<>("UPDATED_AT", "UPDATED_AT"));
        for (int i = 0; i < Test05CreateMysqlBigTable.maxFieldIdx; i++) {
            String fieldName = String.format("field%s", i);
            hjk2StdFieldMap.add(new ImmutablePair<>(fieldName, fieldName));
        }
        return hjk2StdFieldMap;
    }

    private static Properties kafkaProperties(String groupId) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.0.190:9092");
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    private static boolean isFilterByQy(String qy) {
        if (StringUtils.isBlank(qy)) {
            return true;
        }

        return false;
    }

}

 

 

spoon 安装 kakfa 插件

来自 Kettle安装Kafka Consumer和Kafka Producer插件

    1.从github上下载kettle的kafka插件,地址如下
    Kafka Consumer地址:
    https://github.com/RuckusWirelessIL/pentaho-kafka-consumer/releases/tag/v1.7
    Kafka Producer地址:
    https://github.com/RuckusWirelessIL/pentaho-kafka-producer/releases/tag/v1.9
    2.进入 kettle 安装目录:在plugin目录下创建steps目录
    3.把下载的插件解压后放到 steps 目录下
    5.重启 spoon.bat 即可

 

 

 

 

参考

Kettle安装Kafka Consumer和Kafka Producer插件

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/274063.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

尚硅谷SpringBoot3笔记 (二) Web开发

Spring Boot Web开发&#xff1a;24.Web开发-自动配置原理_哔哩哔哩_bilibili 1. Web场景 1.1 自动配置 整合web场景&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId&g…

php便民超市管理系统flask-django-nodejs-python

随着时代的变迁&#xff0c;超市管理系统软件使用的普及【1】&#xff0c;以上所有的问题&#xff0c;都是为它而打造的&#xff0c;现在不仅是开一家店容易管理&#xff0c;开多家店页变得容易很多&#xff0c;同时它的出现也可以为本店起到宣传的作用。 21世纪的今天&#…

cc-uploadSomePic图片上传组件:快速开发与用户体验的提升

cc-uploadSomePic图片上传组件&#xff1a;快速开发与用户体验的提升 摘要&#xff1a; 在前端开发中&#xff0c;图片上传功能是一个常见的需求。为了提高开发效率和用户体验&#xff0c;我们开发了一个名为cc-uploadSomePic的图片上传组件。该组件支持单个或多个文件上传&am…

计算机网络——物理层(物理传输介质和物理层的设备)

计算机网络——物理层&#xff08;物理传输介质和物理层的设备 物理传输介质导向性传输介质双绞线同轴电缆光纤 非导向性传输介质无线电波多径效应 微波地面微波通信ISM 频段 卫星通信 物理层设备中继器集线器中继器和集线器的区别 我们今天进入物理层的物理传输介质和物理层的…

阿里云部署MySQL、Redis、RocketMQ、Nacos集群

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容MySQL集群配置云服务器选购CPU选择内存选择云盘选择ESSD AutoPL云盘块存储性能&#xff08;ESSD&#xff09; 镜像选择带宽选择密码配置注意事项 搭建宝塔面板方便管理云服务器云服务器的安全组安装docker和docker-compose…

使用IDEA2023创建传统的JavaWeb项目并运行与调试

日期:2024-0312 作者:dusuanyun 文档环境说明: OS:Deepin 20.9(Linux) JDK: OpenJDK21 Tomcat:10.1.19 IDEA: 2023.3.4 (Ultimate Edition) 本文档默认已经安装JDK及环境变量的配置。 关键词…

openGauss学习笔记-246 openGauss性能调优-SQL调优-经验总结:SQL语句改写规则

文章目录 openGauss学习笔记-246 openGauss性能调优-SQL调优-经验总结&#xff1a;SQL语句改写规则246.1 使用union all代替union246.2 join列增加非空过滤条件246.3 not in转not exists246.4 选择hashagg246.5 尝试将函数替换为case语句246.6 避免对索引使用函数或表达式运算2…

2024热门外贸独立站wordpress模板

工艺品wordpress外贸主题 简约大气的wordpress外贸主题&#xff0c;适合做工艺品进出品外贸的公司官网使用。 https://www.jianzhanpress.com/?p5377 日用百货wordpress外贸主题 蓝色大气的wordpress外贸主题&#xff0c;适合做日用百货的外贸公司搭建跨境电商网站使用。 …

BUGKU-WEB never_give_up

题目描述 题目截图如下&#xff1a; 进入场景看看&#xff1a; 解题思路 F12查看请求和响应&#xff0c;查找线索 相关工具 base64解码URL解码Burp Suit抓包 解题步骤 F12查看请求和响应&#xff0c;发现一行注释包含一个文件名称【1p.html】&#xff0c;这应该就是提…

flex属性详解

flex布局&#xff0c;父元素属性可参考&#xff1a;flex布局 &#xff0c;本文主要介绍flex添加到子元素的属性。 <div class"father"><div class"left"></div><div class"middle"></div><div class"midd…

HTTPS(超文本传输安全协议)工作过程

一、简述HTTPS HTTPS超文本传输协议&#xff08;全称&#xff1a;Hypertext Transfer Protocol Secure &#xff09;&#xff0c;是以安全为目标的 HTTP 通道&#xff0c;在HTTP的基础上通过传输加密和身份认证保证了传输过程的安全性 。HTTPS 在HTTP 的基础下加入SSL&#x…

SNMP协议入门:揭秘网络管理的核心技术

背景 随着数字化、网络化的深入发展&#xff0c;在现代数据中心机房供配电系统中&#xff0c;有越来越多的产品需要通过标准的SNMP协议接入到以太网络&#xff0c;将诸如UPS&#xff08;不间断电源&#xff09;、空调、油机、配电柜及机柜PDU这些设备能够被NMS&#xff08;网络…

atoi函数

Hello, 大家好&#xff0c;我是一代&#xff0c;今天给大家讲解atoi函数的有关知识 所属专栏&#xff1a;C语言 创作不易&#xff0c;望得到各位佬们的互三呦 函数原型&#xff1a;int atoi (const char * str); 头文件&#xff1a;stdlib.h 功能&#xff1a;将字符串转换为整数…

考研数学|张宇还是武忠祥?怎么选?

我觉得张宇老师和武忠祥老师讲课实力都差不多&#xff0c;区别就在于风格的不同 张宇老师的讲课风格比较活泼&#xff0c;擅长调动学生的思维跟着课堂一起走&#xff0c;并且张宇老师发明了很多容易记的段子&#xff0c;但是虽然张宇老师段子多&#xff0c;一点也不妨碍他讲课…

unity发布安卓获取读取权限

一、Player Settings 设置 Player Settings>Player>Other Settings> Android > Write Permission > External (SDCard). 二、代码 using System.Collections; using System.Collections.Generic; using System.IO; using UnityEngine; using UnityEngine.Andr…

【数据结构】二叉树OJ题(C语言实现)

✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅ ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨ &#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1…

边缘计算+WEB端应用融合:AI行为识别智能监控系统搭建指南 -- 边缘设备图像识别及部署(二)

专栏目录 边缘计算WEB端应用融合&#xff1a;AI行为识别智能监控系统搭建指南 – 整体介绍&#xff08;一&#xff09; 边缘计算WEB端应用融合&#xff1a;AI行为识别智能监控系统搭建指南 -- 边缘图像识别及部署&#xff08;二&#xff09; 前言边缘图像识别与推流整体思路原始…

赛昉(starFive)星光2 多媒体框架分析与功能验证

开发板 开发板长这个样子: 串口调试接口如下: 整体支持情况 驱动&firmware&API jh7110/soft_3rdpart/wave511 : H.264&H.265 Decoder (Chips&Media 芯媒)jh7110/soft_3rdpart/wave521 : H.264&H.265 Encoder (Chips&Media 芯媒)jh7110/soft_3rdp…

LeetCode刷题【树状数组、并查集】

目录 树状数组307. 区域和检索 - 数组可修改406. 根据身高重建队列673. 最长递增子序列的个数1409. 查询带键的排列 并查集128. 最长连续序列130. 被围绕的区域 树状数组 307. 区域和检索 - 数组可修改 给你一个数组 nums &#xff0c;请你完成两类查询。 其中一类查询要求 …

HTML案例-1.标签练习

效果 源码 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title> </head&g…
最新文章