Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole(
    {
        role: "strole",
        privileges: [{
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
            { role: 'read', db: 'config' }
        ]
    }
);

db.createUser(
  {
      user: 'stuser',
      pwd: 'stpw',
      roles: [
         { role: 'strole', db: 'admin' }
      ]
  }
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # 您可以在此处设置引擎配置
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : string
      }
    }
  }
}

# 在本地客户端打印读取的 MongoDB 数据
sink {
  Console {
    parallelism = 1
  }
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://mysql_cdc_e2e:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "st_user"
    password = "seatunnel"

    generate_sink_sql = true
    # You need to configure both database and table
    database = mongodb_cdc
    table = products
    primary_keys = ["_id"]
  }
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory","crm"]
    collection = ["inventory.products","crm.test"]
    username = stuser
    password = stpw
  }
}

# Console printing of the read Mongodb data
sink {
  Console {
    parallelism = 1
  }
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/274258.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

3月19日做题

[NPUCTF2020]验证🐎 if (first && second && first.length second.length && first!second && md5(firstkeys[0]) md5(secondkeys[0]))用数组绕过first1&second[1] 这里正则规律过滤位(Math.) (?:Math(?:\.\w)?) : 匹配 …

详解命令docker run -d --name container_name -e TZ=Asia/Shanghai your_image

docker run 是Docker的主要命令,用于从镜像启动一个新的容器。下面详细解释并举例说明 -d, --name, -e TZ 参数的用法: -d 或 --detach: 这个标志告诉Docker以守护进程(后台)模式运行容器。这意味着当你执行 docker ru…

②免费AI软件开发工具测评:通义灵码 VS 码上飞

前言 我又双叒叕来测评了!上次给大家带来的是iFlyCode和CodeFlying两款产品的测评,受到了大家的一致好评~ 今天咱就继续来聊聊,这次我们选的的对象是通义灵码和码上飞,从名字上也能看到出来这两款产品一定是跟软件开发有关系的&…

IPD集成产品开发:塑造企业未来竞争力的关键

随着市场竞争的日益激烈,企业对产品开发的要求也越来越高。如何在快速变化的市场环境中,既保证产品的批量生产效率,又满足客户的个性化需求,成为了企业面临的重要挑战。IPD(集成产品开发)模式,作…

Linux 常用操作命令大全

目录 一、命令大集合 1.1 whereis 1.2 which 1.3 sudo 1.4 grep 1.5 free 1.6 top 动态显示进程的状态 1.7 ps 静态显示进程信息 1.8 df 1.9 iostat 看IO性能状态 1.10 yum安装插件命令 1.11 rpm 1.12 scp远程拷贝 1.13 uname 二、linux网络命令 2.1 centos7 防火…

理论学习:with torch.no_grad()

如果不加上“with torch.no_grad():”,模型参数会发生改变吗? 如果不使用with torch.no_grad():,在进行模型推理(即计算outputs_cls net(inputs[batch_size//2:])这一步)时,模型参数不会发生改变&#xf…

#Ubuntu(修改root信息)

(一)发行版:Ubuntu16.04.7 (二)记录: (1)命令行终端: a.右键,open terminal b.快捷键 ctrlaltt (2)进行root修改 sudo passwd &a…

Gradle v8.5 笔记 - 从入门到进阶(基于 Kotlin DSL)

目录 一、前置说明 二、Gradle 启动! 2.1、安装 2.2、初始化项目 2.3、gradle 项目目录介绍 2.4、Gradle 项目下载慢?(万能解决办法) 2.5、Gradle 常用命令 2.6、项目构建流程 2.7、设置文件(settings.gradle.…

linux 安装常用软件

文件传输工具 sudo yum install –y lrzsz vim编辑器 sudo yum install -y vimDNS 查询 sudo yum install bind-utils用法可以参考文章 《掌握 DNS 查询技巧,dig 命令基本用法》 net-tools包 yum install net-tools -y简单用法: # 查看端口占用情况…

2024流星全自动网页生成系统重构版源码

2024流星全自动网页生成系统重构版源码 源码介绍 流星全自动网页生成系统重构版源码分享,所有模板经过精心审核与修改,完美兼容小屏手机大屏手机,以及各种平板端、电脑端和360浏览器、谷歌浏览器、火狐浏览器等等各大浏览器显示。 为用户使…

微信支付宝--充ChatGPTPLUS/openAI key

ChatGPT是人工智能技术驱动的自然语言处理工具,它能够基于在预训练阶段所见的模式和统计规律,来生成回答,还能根据聊天的上下文进行互动,真正像人类一样来聊天交流,甚至能完成撰写论文、邮件、脚本、文案、翻译、代码等…

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Refresh)

可以进行页面下拉操作并显示刷新动效的容器组件。 说明: 该组件从API Version 8开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 子组件 支持单个子组件。 从API version 11开始,Refresh子组件会跟随手势下拉而下移…

恒驰喜讯 | 亮相华为中国合作伙伴大会2024,荣膺最佳服务一致性奖等3大奖项

3月14日至15日,华为中国合作伙伴大会2024在深圳隆重召开。大会以“因聚而生,数智有为”为主题,面向数智化转型的浪潮,华为携手伙伴共同探讨如何通过强化“伙伴华为”体系,帮助客户抓住数智化转型的巨大机遇&#xff0c…

QT信号与槽实现方式

1、第一种实现方式 在QT开发工具UI界面先拖入按钮,然后鼠标右键拖入按钮,点击选中槽,在页面选着需要的信号,然后OK,随即将会跳转到类的.cpp文件,(这种UI代码结合的方式,会自动去绑定…

医药工厂5G智能制造数字孪生可视化平台,推进医药企业数字化转型

医药工厂5G智能制造数字孪生可视化平台,推进医药企业数字化转型。随着科技的不断发展,数字化转型已成为医药企业不可或缺的一部分。5G智能制造医药工厂数字孪生可视化平台作为数字化转型的重要工具,正在逐步改变医药企业的生产方式和管理模式…

UDF提权

目录 一、UDF概述 二、提权条件 三、漏洞复现 (一) 信息收集 1. Nmap信息收集 1.1、查看当前IP地址 1.2、扫描当前网段,找出目标机器 1.3、快速扫描目标机全端口 2. dirb目录扫描 3. 第一个flag 3.1、目录遍历漏洞 3.2、flag 4. 敏感信息利用 (二) 漏…

智能合约 之 ERC-20介绍

什么是ERC20 ERC20全称为Ethereum Request for Comment 20,是一种智能合约标准,用于以太坊网络上的代币发行 姊妹篇 - 如何部署ERC20 ERC20的应用场景 代币化资产,例如:USDT 是一种以美元为背书的ERC20代币,每个USDT代…

2024地方门户源码运营版带圈子动态+加即时通讯(PC电脑端+WAP移动端+H5微信端+微信小程序+APP客户端)

2024地方门户源码运营版带圈子动态加即时通讯(PC电脑端WAP移动端H5微信端微信小程序APP客户端) 源码介绍: 包含5个端 PC电脑端WAP移动端H5微信端微信小程序APP客户端 功能介绍: 包含功能:信息资讯、二手信息、房产…

第二十六天-统计与机器学习SciPy,Scikit-Leaen

目录 1.介绍 2.使用scipy 1. 安装 2.拟合曲线 3.随机变量与概率分布 4.假设检验 5.参数检验 3.使用Scikit-Learn 1. 机器学习库,建立在numpy,scipy,matplotlib基础上 2.包含功能 3.安装 1.官网:https://scikit-learn.org 2.下载 3.线性回归…
最新文章