当前位置: 首页 > article >正文

MySQL数据自动同步到Es

Logstash
  • 测试数据准备

    DROP DATABASE IF EXISTS es;
    
    CREATE DATABASE es DEFAULT CHARACTER SET utf8;
    
    USE es;
    
    CREATE TABLE book
    (
     id INT NOT NULL,
     title VARCHAR(20),
     author VARCHAR(20),
     price DECIMAL(6,2),
     PRIMARY KEY(id)
    );
    
    DROP PROCEDURE IF EXISTS batchInsertBook;
    
    DELIMITER $$
    CREATE PROCEDURE batchInsertBook(IN seed INT, IN loops INT)
    BEGIN
    	DECLARE i INT;
    	DECLARE id INT;
    	SET i = 0;
    	SET id = seed;
    	WHILE i < loops DO
    		INSERT INTO book(id, title,author, price) VALUES
    		(id, '雪山飞狐', '金庸', 50),
    		(id+1, '神雕侠侣', '金庸', 60),
    		(id+2, '三国演义', '罗贯中', 50),
    		(id+3, '西游记', '吴承恩', 40);
    		SET id = id + 4;
    		SET i = i + 1;
    	END WHILE;
    END $$
    DELIMITER ;
    
    -- 禁用索引,加快数据导入速度
    ALTER TABLE book DISABLE KEYS;
    
    -- 调用存储过程导入数据
    CALL batchInsertBook(1, 100);
    
    -- 添加索引
    ALTER TABLE book ENABLE KEYS;
    
    -- 修改表的引擎为innodb
    ALTER TABLE book ENGINE INNODB;
    
    mysql> select count(*) from book;
    +----------+
    | count(*) |
    +----------+
    |    40000 |
    +----------+
    1 row in set (0.03 sec)
    
  • docker安装logstash

    # 拉取镜像
    docker pull logstash:7.12.1
    
  • 在宿主机配置目录

    mkdir -p /root/logstash
    
  • 在宿主机创建/root/logstash/logstash.yml,内容为空即可,该步骤不能省略

  • 在宿主机创建/root/logstash/logstash.conf

    input {
      jdbc {
    	jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
    	jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    	jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"
    	jdbc_user => "root"
    	jdbc_password => "root"
    	schedule => "* * * * *" 
    	statement => "SELECT * FROM book"
    	type => "jdbc"
      }
    }
    
    filter {
    }
    
    output {
        stdout {
            codec => json_lines
        }
    }
    
    • 本次连接的是windows上的MySQL,通过IpV4的IP地址连接

      测试连通性

    • 上传maven仓库中的jar

      # \apache-maven-3.9.6\repository\mysql\mysql-connector-java\8.0.11\mysql-connector-java-8.0.11.jar
      [root@localhost logstash]# ls
      mysql-connector-java-8.0.11.jar
      [root@localhost logstash]# chmod 644 mysql-connector-java-8.0.11.jar
      
    • 开启windowsroot远程登录

      mysql -uroot -proot
      use mysql;
      update user set host = '%' where user = 'root';
      FLUSH PRIVILEGES;
      
      mysql> select host,user from user;
      +-----------+------------------+
      | host      | user             |
      +-----------+------------------+
      | %         | root             |
      | localhost | mysql.infoschema |
      | localhost | mysql.session    |
      | localhost | mysql.sys        |
      +-----------+------------------+
      4 rows in set (0.00 sec)
      
  • 启动logstash容器

    docker run -d  \
    	--name logstash \
        -v ~/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml \
        -v ~/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
        -v ~/logstash/mysql-connector-java-8.0.11.jar:/usr/share/logstash/mysql-connector-java-8.0.11.jar \
        logstash:7.12.1
    
  • 查看日志

    # 查看数据同步
    docker logs -f logstash
    

  • 将数据输出到ElasticSearch

    {% note blue ‘fas fa-bullhorn’ modern %}

    既然我们能从mysql中读取数据,并输出到stdout,那么我们同样可以从mysql中读取数据,然后将数据输出到ElasticSearch,修改logstash.conf,内容如下

    {% endnote %}

    input {
      jdbc {
    	jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"
    	jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    	jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"
    	jdbc_user => "root"
    	jdbc_password => "root"
    	schedule => "* * * * *"
    	statement => "SELECT * FROM book"
    	type => "jdbc"
      }
    }
    
    filter {
        
    }
    
    output {
        elasticsearch {
        	hosts => ["192.168.32.128:9200"]
        	index => "book"
        	document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    }
    
  • 确保es是启动的

  • 重启

    [root@192 logstash]# docker restart logstash
    logstash
    
  • 进入如下界面

  • 查看是否同步到es

    以上的这种同步数据的方式,我们称之为“全量同步”

  • 增量同步

    • 修改logstash.conf,内容如下

      input {
        jdbc {
      	jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"
      	jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      	jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"
      	jdbc_user => "root"
      	jdbc_password => "root"
      	schedule => "* * * * *"
       	type => "jdbc"
       	# 记录查询结果中,某个字段的值
      	use_column_value => true
      	# 记录id字段的值,用于增量同步
      	tracking_column => "id"
      	# 指明要记录的字段的类型
      	tracking_column_type => numeric
      	# 指定要记录上一次查询的数据
      	record_last_run => true
         # :sql_last_value代表上次查询出来的最大的“tracking_column”中的值
      	statement => "SELECT * FROM book where id > :sql_last_value"
      	last_run_metadata_path => "syncpoint_table"
      	
        }   
      }
      
      filter {
      }
      
      output {
          elasticsearch {
          	hosts => ["192.168.32.128:9200"]
          	index => "book"
          	document_id => "%{id}"
          }
          stdout {
            codec => json_lines
          }
      }
      

      增量同步


http://www.kler.cn/news/274362.html

相关文章:

  • 关系数据库:关系数据结构基础与概念解析
  • 代码随想录算法训练营第二十八天|93. 复原 IP 地址,78. 子集,90. 子集 II
  • GPT能复制人类的决策和直觉吗?
  • Vue async (type = 0) => {}代码讲解
  • 前端 - 基础 表单标签 -- 表单元素( input - type属性) 文本框和密码框
  • Android逆向(二)-系统调试开关
  • 【深度学习】深度学习md笔记总结第1篇:深度学习课程,要求【附代码文档】
  • 数据资产管理解决方案:构建高效、安全的数据生态体系
  • Linux之线程同步
  • 系统运维中出现的问题,问题原因,解决办法
  • PB-03F模组蓝牙基础+主从机指令的使用
  • mapstruct学习笔记-pojo之间的转换
  • 73_Pandas获取分位数/百分位数
  • 1-postgresql数据库高可用脚本详解
  • 网站引用图片但它域名被墙了或者它有防盗链,我们想引用但又不能显示,本文附详细的解决方案非常简单!
  • rviz上不显示机器人模型(模型只有白色)
  • 【Numpy】(2)numpy对象和random模块
  • openEuler 欧拉系统nginx正向代理 http https —— 筑梦之路
  • 【数据结构取经之路】栈
  • 使用uniapp,uni-data-select组件时,内容长度没超过容器宽度时候虽然能显示全内容但是数据后边会出现三个点,逼死强迫症
  • nginx实现多个域名和集群
  • 鸿蒙实战开发:【FaultLoggerd组件】讲解
  • 英伟达深夜放王炸|字节跳动游戏之路波折不断|文旅短剧风口将至|25岁QQ魅力不减,5亿人在用|云计算市场疯长152%|电商巨头齐瞄向富足悠闲银发族
  • 刷题日记:面试经典 150 题 DAY6
  • SCI一区 | Matlab实现GWO-TCN-BiGRU-Attention灰狼算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测
  • php前端和java后端数据调用流程
  • F-logic DataCube3 任意文件上传漏洞复现(CVE-2024-25832)
  • 【C++】用红黑树模拟实现set、map
  • 学习笔记--强化学习(1)
  • 使用jQuery的autocomplete实现数据查询一次,联想自动补全