[自研开源] 数据集成之分批传输 v0.7

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

介绍

本篇基于 数据集成之任务流程 介绍任务分批传输的使用场景和配置操作。

使用场景

mydata使用API方式集成数据,当一次请求或响应 传输数据量较多时 可能无法完成、或容易对服务端造成影响,因此需要分为多次处理;

例如 常见的分页查询、导入大量数据时分批处理、集成对接时的全量同步等;

分批传输数据

业务系统与mydata集成时,在提供数据消费数据这两个方向上分别实现分批传输;

提供数据

由mydata调用应用的API获取数据,通过配置分批参数 实现一次任务内多次调用API获取完整数据,有以下两种基本的配置模式:

  • 配置了 固定参数size=10、递增参数current从1开始每次递增1、每次间隔1秒的任务;

在这里插入图片描述

  • 配置了 递增参数start从1开始每次递增100、递增参数end从100开始每次递增100、每次间隔1秒的任务;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;

  • lastProduceData记录上一次数据,用于和本次对比数据,若重复 则结束,避免死循环(理论上很少有2次完全一样的数据);

  • 若分批有异常,则复用任务3次出错 自动结束并发送邮件通知的功能;

  • 执行完一次后,自动计算递增参数值;

// 提供数据
case MdConstant.DATA_PRODUCER:
    // 分批模式 记录上一次数据,用于对比两次数据,若重复 则结束,避免死循环
    List<Map> lastProduceData = null;
    do {
        // 若启用分批,则将分批参数加入请求参数中
        if (taskInfo.isBatch()) {
            Map<String, Object> batchParam = jobBatchService.parseToMap(taskInfo);
            Map<String, Object> reqParams = MapUtil.union(taskInfo.getReqParams(), batchParam);
            taskInfo.setReqParams(reqParams);
        }

        // 调用api 获取json
        String json = ApiUtil.read(taskInfo);

        // 将json按字段映射 解析为业务数据
        jobDataService.parseData(taskInfo, json);
        // 若没有返回数据,则结束处理
        if (CollUtil.isEmpty(taskInfo.getProduceDataList())) {
            break;
        }
        // 对比上一次数据
        if (lastProduceData != null) {
            if (CollUtil.isEqualList(lastProduceData, taskInfo.getProduceDataList())) {
                // 异常任务失败,邮件通知用户检查任务
                throw new RuntimeException("分批获取数据异常,最后两次获取的数据相同!");
            }
        }
        lastProduceData = taskInfo.getProduceDataList();

        // 根据条件过滤数据
        jobDataFilterService.doFilter(taskInfo);

        // 保存业务数据
        jobDataService.saveTaskData(taskInfo);

        // 更新环境变量
        jobVarService.saveVarValue(taskInfo, json);

        // 递增分批参数
        jobBatchService.incBatchParam(taskInfo);

        // 若启用分批,则等待间隔
        if (taskInfo.isBatch()) {
            ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);
        }
    } while (taskInfo.isBatch());

    break;

消费数据

由mydata通过API向应用发送数据,通过配置分批参数 限制每次向API发送的数据量,从而减少数据查询量和请求处理时间;

如下图,配置了分批数量为1000的任务,分批参数为选填,mydata将按1000为限制查询符合条件的数据,通过API请求发送给应用;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;
  • 自动管理分页参数,执行分页查询数据,发送给API;
  • 直到分页查询没有数据 自动结束;
// 消费数据
case MdConstant.DATA_CONSUMER:
    String dataCode = taskInfo.getDataCode();
    if (StrUtil.isEmpty(dataCode)) {
        break;
    }
    List<BizDataFilter> filters = taskInfo.getDataFilters();
    if (CollUtil.isNotEmpty(filters)) {
        // 解析过滤条件值中的 自定义字符串
        parseFilterValue(filters);
        // 排除值为null的条件
        filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
    }
    int round = 0;
    Long skip = null;
    Integer limit = taskInfo.isBatch() ? taskInfo.getBatchSize() : null;
    do {
        if (taskInfo.isBatch()) {
            skip = (long) round * taskInfo.getBatchSize();
        }
        // 根据过滤条件 查询数据
        List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters, skip, limit);
        if (CollUtil.isEmpty(dataList)) {
            break;
        }
        taskInfo.setConsumeDataList(dataList);
        // 根据字段映射转换为api参数
        jobDataService.convertData(taskInfo);
        // 调用api传输数据
        ApiUtil.write(taskInfo);

        round++;
        // 若启用分批,则等待间隔
        if (taskInfo.isBatch()) {
            ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);
        }
    }
    while (taskInfo.isBatch());
    break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/490032.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试笔记——Java集合篇

Java集合框架体系 重点&#xff1a;单列集合——ArrayList、LinkedList&#xff1b;双列集合——HashMap、ConcurrentHashMap。 List相关 数组&#xff08;Array&#xff09; 是一种用连续的内存空间存储相同数据类型数据的线性数据结构。 数组获取其他元素&#xff1a; 为什…

为什么在vite中使用eslint报错‘__dirname‘ is not defined?

问题分析 发生这种情况是因为 ESLint 不知道 vite.config.js 中的代码在 Node.js 中使用&#xff0c;__dirname 未在浏览器中定义&#xff0c;也未在 ES 模块中定义。因此要告诉 ESLint 代码将作为 CommonJS 模块在 Node.js 中运行。 解决方案 请打开 ESLint 配置并在该 env …

关于 boost::asio::strand 初始化 socket、stream、resolver、deadline_timer 对象

在 boost::asio 之中默认情况下&#xff0c;大家使用 io_context 来为这些对象初始化传递的执行者&#xff0c;但我需要这里说明。 对于 boost::asio 构造类似 socket 对象必须构造传递 io_context 是个伪命题&#xff0c;boost::asio 对象并非只允许传递 boost::asio::io_cont…

pyrealsense2获取保存点云

一、第一种实现代码 Python import sys import cv2 import pyrealsense2 as rs import numpy as np import keyboard import open3d as o3d import osif __name__ "__main__":output_folder output_data/os.makedirs(output_folder, exist_okTrue)pipeline rs.p…

git cherry pick merge部分提交

cherry pick merge 指定某次提交 1. git history 选择要从哪个分支merge 2. 找到提交记录,选择cherry pick 3.这个时候就可以直接push了

【面试题】ES文档写入和读取流程详解

前言&#xff1a;在回答这个问题之前我们先要搞清楚一个问题那就是什么是文档&#xff0c;避免不知所云&#xff01; 一、什么是文档&#xff1f; 在Elasticsearch中&#xff0c;文档&#xff08;Document&#xff09;是最基本的信息单元&#xff0c;用于表示和存储数据。文…

数据采集用,集成了主流工业通讯协议

IoTClient 是一个物联网设备通讯协议实现客户端&#xff0c;集成了主流工业通讯协议&#xff0c;包括主流PLC通信读取、ModBus协议、Bacnet协议等。该组件基于.NET Standard 2.0&#xff0c;适用于.NET的跨平台开发&#xff0c;可在Windows、Linux等系统上运行&#xff0c;甚至…

LinkedIn账号为什么被封?被封后如何解决?

近期会有一些小伙伴说自己遇到了帐号无法登录的情况&#xff0c;其实出现领英帐号被封号(被限制登录)主要会有两类情况&#xff0c;今天就给大家分享一下如果被封该如何解决&#xff0c;强烈建议收藏。 在电脑领英官网或者手机领英APP上&#xff0c;输入领英帐号密码点击登录后…

数据结构(五)单链表专题

在开始之前&#xff0c;我先来给大家讲一下顺序表与链表的区别&#xff1a; 它们在堆上存储的差异&#xff1a; 我们可以很容易的知道&#xff0c;循序表是连续的有序的&#xff0c;但链表是杂乱的&#xff0c;它们通过地址彼此联系起来。 1. 链表的概念及结构 概念&#xff1…

【光伏科普】光伏投融资计算的意义

光伏产业&#xff0c;作为清洁能源的重要组成部分&#xff0c;近年来在全球范围内得到了广泛的关注与发展。而在光伏项目的实施过程中&#xff0c;投融资计算显得尤为重要。本文旨在探讨光伏投融资计算的意义&#xff0c;以及它如何影响光伏产业的可持续发展。 首先&#xff0c…

无法找到filesystem头文件

无法找到filesystem头文件 一、前言 这段时间接老板命令&#xff0c;做目标识别模型的嵌入式部署。需要将模型运行环境编译后打包到瑞芯微开发板上运行&#xff0c;在此之前我对原C文件做过修改&#xff0c;为了能实现与厂商提供的数据接口对接。 我在用CMake打包过程中&…

jmeter接口测试及详细步骤以及项目实战教程

在接口测试项目实战中&#xff0c;JMeter是一款非常强大和流行的自动化测试工具&#xff0c;它可以测试各种类型的应用程序&#xff0c;并通过采样和报告来识别性能瓶颈和API的问题。本文将为你提供一个基于实际项目的JMeter接口测试项目实战教程&#xff0c;指导你如何使用JMe…

腾讯VS网易:一场不见终局的游戏未来之战

国内游戏霸主腾讯最近赚足了眼球。 总体上看&#xff0c;腾讯手握“游戏社交”两大王牌&#xff0c;最近发布的财报十分亮眼&#xff0c;其2023年总营收和净利润分别同比增长10%和36%&#xff0c;展现了互联网巨头的强劲活力。 然而巨头亦有焦虑&#xff0c;增值服务营收同比…

数学算法(算法竞赛、蓝桥杯)--分解质因数、唯一分解定理

1、B站视频链接&#xff1a;G07 分解质因数 唯一分解定理 试除法_哔哩哔哩_bilibili 题目链接&#xff1a;质因子分解 - 洛谷 #include <bits/stdc.h> using namespace std;int n; int a[100010];//质因子的个数void decompose(int x){for(int i2;i*i<x;i){//i增加&a…

Fastgpt 无法启动或启动后无法正常使用的讨论(启动失败、用户未注册等问题这里)

FastGPT 是一个基于 LLM 大语言模型的知识库问答系统&#xff0c;提供开箱即用的数据处理、模型调用等能力。同时可以通过 Flow 可视化进行工作流编排&#xff0c;从而实现复杂的问答场景&#xff01; FastGPT是非常实用并且相当厉害的个人知识库AI项目&#xff0c;项目是非常…

Linux Tomcat的服务器如何查看接口请求方式?

问题描述 最近在和安卓开发对接接口&#xff0c;遇到一个接口总是报405错误&#xff0c;有对接经验的开发应该都知道是请求方式不对&#xff0c;假如接口定义为POST请求的&#xff0c;但是客户端却用GET请求&#xff0c;这时候就会报这个错误。Android客户端那边使用xUtils框架…

扫雷大师:用C语言揭秘自动展开盘面与智能扫雷策略

目录 扫雷自动展开盘面智能扫雷更优策略完整代码 扫雷 扫雷游戏是一款经典的单人电脑游戏&#xff0c;其主要规则如下&#xff1a; 游戏目标&#xff1a;游戏的目标是在不触发任何地雷的情况下&#xff0c;找出所有非雷区域。玩家需要根据格子周围的数字来推断哪些格子含有地雷…

MFC(二)集成基础控件

目录 OnCreateCStatic【标签&#xff0c;图片】CEdit【文本框&#xff0c;密码框&#xff0c;数值框&#xff0c;文本区】CButton【按钮&#xff0c;单选按钮&#xff0c;多选按钮】CComboBox【下拉列表&#xff0c;列表】CSliderCtrl【滑动条】CListCtrl【表格】CAnimateCtrl【…

第十二届蓝桥杯JavaB组省赛真题 - ASC

解题思路&#xff1a; 这是目前为止做到过最简单的了 public class Main {public static void main(String[] args) {int res L-A 65;System.out.print(res);} }

东联直播音效助手

东方联盟创始人郭盛华为广大主播免费开发的一款专用的音效场控工具&#xff0c;通过这款软件&#xff0c;主播使用各种精彩的音效&#xff0c;避免直播间过于低沉和尴尬&#xff0c;从而更好的拉近观众的距离。音效有掌声、爆笑声、尖叫声、关注点赞、任务等各种音效. 【东方联…
最新文章