[Python小工具]Python批量生成数据到MySQL

[Python小工具]Python批量生成数据到MySQL

base.py

#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
import random
from datetime import datetime, timedelta
from faker import Faker
import string

fake = Faker('zh_CN')


class Base_conn:
    # 数据库连接配置初始化
    def __init__(self, DB_HOST, DB_USER, DB_PASSWORD, DB_NAME):
        self.DB_HOST = DB_HOST
        self.DB_USER = DB_USER
        self.DB_PASSWORD = DB_PASSWORD
        self.DB_NAME = DB_NAME


class TimeShift:

    def starttime(self):
        self.start_time = time.time()

        return self.start_time

    def endtime(self):
        self.end_time = time.time()

        return self.end_time

    def executiontime(self):
        self.execution_time = self.end_time - self.start_time

        return self.execution_time

    def timeshift(self, execution_time):
        if execution_time < 60:
            return f"{int(execution_time)}秒"
        elif execution_time >= 60 or execution_time <= 3600:
            return f"{execution_time / 60:.1f}分钟"
        else:
            return f"{int(execution_time / 60 / 60):.1f}小时"


class Table_Employee:

    # 生成身份证信息的模拟函数,可根据实际情况调整
    def generate_id_card(self):
        # region_code = "420100"  # 地区编码
        region_code = "{:.6f}".format(random.random()).split('.')[1]  # 地区编码
        birth_date = datetime.strptime("19900101", "%Y%m%d") + timedelta(days=random.randint(0, 365 * 50))  # 出生日期
        seq_code = str(random.randint(100, 999))  # 序列码
        gender_code = str(random.randint(0, 9))  # 性别码
        check_code = str(random.randint(0, 9))  # 校验码(这里仅用于示例)

        id_card = f"{region_code}{birth_date.strftime('%Y%m%d')}{seq_code}{gender_code}{check_code}"

        # 确保生成的身份证号码长度正好是 18 个字符
        return id_card[:18]

    def __init__(self, work_id=None, name=None, age=None, gender=None, id_card=None, entry_date=None, department=None):
        # self.work_id = work_id if work_id else fake.pystr(min_chars=3, max_chars=10)
        self.work_id = work_id if work_id else "{:.6f}".format(random.random()).split('.')[1]
        self.name = name if name else fake.name()
        self.age = age if age else random.randint(18, 65)
        self.gender = gender if gender else fake.random_element(elements=('男', '女'))
        self.id_card = id_card if id_card else self.generate_id_card()
        self.entry_date = entry_date if entry_date else (
                datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()
        self.department = department if department else random.randint(1, 7)


class Table_Students:
    def Email_data(self):
        num_letters = random.randint(5, 10)
        random_letters = random.choices(string.ascii_letters, k=num_letters)
        Email = "www." + ''.join(random_letters) + '.com'
        return Email

    def Address_data(self):
        provinces = ['北京市', '天津市', '山西省']
        cities = {
            '北京市': ['东城区', '西城区', '朝阳区', '海淀区', '丰台区'],
            '天津市': ['和平区', '河东区', '河西区', '南开区', '红桥区'],
            '山西省': ['太原市', '大同市', '阳泉市', '长治市', '晋城市']
        }
        roads = ['长安街', '和平路', '人民大街', '建国路', '中山路', '解放街', '青年路', '光明街', '文化路', '新华街']

        province = random.choice(provinces)
        city = random.choice(cities[province])
        random_province = random.choice(provinces)
        random_road = random.choice(roads)
        random_address = random_province + random_road
        random_init = str(random.randint(100, 999))
        address = province + city + random_address + random_init + '号'

        return address

    def __init__(self, SNo=None, Sname=None, Gender=None, Birthday=None, Mobile=None, Email=None, Address=None,
                 Image='null'):
        # self.work_id = work_id if work_id else fake.pystr(min_chars=3, max_chars=10)
        self.SNo = SNo if SNo else str((datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()).split('-')[0] + str(random.randint(100,999))
        self.Sname = Sname if Sname else fake.name()
        self.Gender = Gender if Gender else fake.random_element(elements=('男', '女'))
        self.Birthday = Birthday if Birthday else (
                datetime.now() - timedelta(days=random.randint(0, 365 * 10))).date()
        self.Mobile = Mobile if Mobile else str(random.randint(130, 199)) + "{:.8f}".format(random.random()).split('.')[
            1]
        self.Email = Email if Email else self.Email_data()
        self.Address = Address if Address else self.Address_data()
        self.Image = Image

mysql_data.py

import re
import time
import pymysql
import threading
from faker import Faker
from pymysql.constants import CLIENT
from base import Base_conn, TimeShift, Table_Employee, Table_Students

fake = Faker('zh_CN')
# 数据库连接配置
# connect = Base_conn('localhost', 'root', 'iotplatform', 'test')
connect = Base_conn('localhost', 'root', 'iotplatform', 'student_profile_db')


def connect_db():
    try:
        connection = pymysql.connect(
            host=connect.DB_HOST,
            user=connect.DB_USER,
            password=connect.DB_PASSWORD,
            database=connect.DB_NAME,
            client_flag=CLIENT.MULTI_STATEMENTS,
            cursorclass=pymysql.cursors.DictCursor,
            charset='utf8mb4'
        )
        return connection
    except pymysql.MySQLError as e:
        print(f"Database connection failed: {e}")
        return None


# 数据插入函数
def insert_data(start, end, thread_id, sql):
    # 创建数据库连接
    connection = connect_db()
    try:
        # 创建 cursor 用于执行 SQL 语句
        with connection.cursor() as cursor:
            sql_cmd = sql

            for i in range(start, end):
                # 记录最后一次用于插入的数据
                table_employee = Table_Employee()
                table_students = Table_Students()
                # last_values = (table_employee.work_id,
                #                table_employee.name,
                #                table_employee.age,
                #                table_employee.gender,
                #                table_employee.id_card,
                #                table_employee.entry_date,
                #                table_employee.department)
                last_values = (
                    table_students.SNo,
                    table_students.Sname,
                    table_students.Gender,
                    table_students.Birthday,
                    table_students.Mobile,
                    table_students.Email,
                    table_students.Address,
                    table_students.Image
                )

                # 执行 SQL 语句
                cursor.execute(sql_cmd, last_values)

                connection.commit()

            # 提交事务
            # connection.commit()
        # print(f"Thread {thread_id}: Inserted rows {start} to {end}")
    except Exception as e:
        print(f"Thread {thread_id}: Error occurred: {e}")
    finally:
        # if last_values:
        #     print(f"Thread {thread_id}: Inserted rows {start} to {end}. Last row data: {last_values}")
        # 关闭数据库连接
        connection.close()


def perform_sql_operation(sql, operation):
    conn = connect_db()
    num = 0
    try:
        with conn.cursor() as cursor:
            if operation in ('select', 'show'):
                cursor.execute(sql)
                for row in cursor.fetchall():
                    print(row)
            elif operation in ('update', 'delete'):
                cursor.execute(sql)
                num += 1
                conn.commit()

        print('执行sql数:', num)


    except pymysql.MySQLError as e:
        print(f"Error: {e}")
        conn.rollback()
    finally:
        conn.close()


def insert_sql(sql_command, total_records, num_threads):
    total_records = total_records  # 总共需要插入的记录数
    num_threads = num_threads  # 线程数量
    records_per_thread = total_records // num_threads  # 每个线程需要插入的记录数
    sql_cmd = sql_command
    threads = []
    for i in range(num_threads):
        start_index = i * records_per_thread
        end_index = start_index + records_per_thread
        end_index = min(end_index, total_records)
        thread_id = i + 1
        # 创建线程
        thread = threading.Thread(target=insert_data, args=(start_index, end_index, thread_id, sql_cmd))
        threads.append(thread)
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()


def up_del_sel_sql(sql_cmd):
    sql_command = sql_cmd
    perform_sql_operation(sql_command, command)


# 主函数,负责创建线程并分配任务
def main(sql):
    if sql.lower() == 'insert':
        insert_sql(sql_cmd, total_records, num_threads)
    elif sql.lower() in ('update', 'delete', 'select', 'show'):
        up_del_sel_sql(sql_cmd)
    else:
        print('请传入执行的sql类型: [insert|update|delete|select]')


if __name__ == '__main__':
    # 总共需要插入的记录数
    total_records = 1000000
    # 开启线程数量
    num_threads = 1
    # 执行sql
    # sql_cmd = """ insert into `employee` (`workid`, `name`, `age`, `gender`, `idcard`, `entrydate`, `department`)
    #               values (%s, %s, %s, %s, %s, %s, %s) """
    sql_cmd = """ insert into `Student` (`SNo`, `Sname`, `Gender`, `Birthday`, `Mobile`, `Email`, `Address` , `Image`)
                      values (%s, %s, %s, %s, %s, %s, %s,%s) """
    # sql_cmd = "delete from employee limit 100000"
    # sql_cmd = "select count(*) from employee"
    # sql_cmd = "show tables like '%employee%'"
    # sql_cmd = ""

    command = re.search(r'\binsert|update|delete|select|show\b', sql_cmd, re.IGNORECASE)
    command = command.group().lower() if command else None
    if command:
        time_shift = TimeShift()
        start_time = time_shift.starttime()
        main(command)
        end_time = time_shift.endtime()
        execution_time = time_shift.timeshift(time_shift.executiontime())
        print(f"执行时间: {execution_time}")
    else:
        print('未找到匹配的命令')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/366842.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Ansible基础及常用模块

目录 1.前言 Ansible Ansible的特性 2.ansible环境安装部署 管理端安装ansible(192.168.88.22) ansible目录结构 配置主机清单 配置密钥对验证 3.ansible命令行模块 command 模块 shell 模块 ​编辑cron 模块 user 模块 group 模块 copy 模块 file 模块 hostn…

爱上算法:每日算法(24-2月2号)

&#x1f31f;坚持每日刷算法&#xff0c;将其变为习惯&#x1f91b; 题目链接&#xff1a;101. 对称二叉树 最开始肯定是比较简单的想法&#xff0c;就是遍历左右节点呀&#xff0c;不相等我就直接返回false。 但是这样错了&#xff0c;我们要的是以根节点为轴&#xff0c;而…

如何保证MySQL和Redis中的数据一致性?

文章目录 前言一、缓存案例1.1 缓存常见用法1.2 缓存不一致产生的原因 二、解决方案2.1 先删除缓存&#xff0c;再更新数据库2.2 先更新数据库&#xff0c;删除缓存2.3 只更新缓存&#xff0c;由缓存自己同步更新数据库2.4 只更新缓存&#xff0c;由缓存自己异步更新数据库2.5 …

Unity_使用Shader实现玻璃和镜面效果

效果图如下&#xff1a; 玻璃效果图 镜面效果图 Step1 搭建场景→镜子使用Quad代替&#xff0c;放置在需要反射的墙面→创建新的材质和Shader Step2 墙壁外创建Camera&#xff0c;用来渲染物体后方的视图→创建RenderTexture&#xff0c;赋于该相机 Step3 Shader的编写如下…

如何使用本地私有NuGet服务器

写在前面 上一篇介绍了如何在本地搭建一个NuGet服务器&#xff0c; 本文将介绍如何使用本地私有NuGet服务器。 操作步骤 1.新建一个.Net类库项目 2.打包类库 操作后会生成一个.nupkg文件&#xff0c;当然也可以用dotnet pack命令来执行打包。 3.推送至本地NuGet服务器 打开命…

来看看Tomcat和Web应用的目录结构

在前面两篇大致了解了Tomcat的架构和运行流程&#xff0c;以及Tomcat应用中的web.xml。 聊一聊Tomcat的架构和运行流程&#xff0c;尽量通俗易懂一点-CSDN博客 来吧&#xff0c;好好理解一下Tomcat下的web.xml-CSDN博客 那接下来&#xff0c;再看看Tomcat的目录&#xff0c;…

el-table点击某一行选中改变背景色且执行方法

elementUI table表格点击某一行选中并且改变背景色 使用:row-style"rowStyle"及row-click“selectRow”&#xff1a; 其中 selectRow 方法中&#xff1a; row 输出&#xff1a;当前行的内容 column 输出&#xff1a;当前列的信息 event 输出&#xff1a;当前事件 …

Unknown custom element:<xxx>-did you register the component correctly解决方案

如图所示控制台发现了爆红&#xff08;大哭&#xff09;&#xff1a; 报错解释&#xff1a; 当我们看到报错时&#xff0c;我们需要看到一些关键词&#xff0c;比如显眼的“component”和“name”这两个单词&#xff0c; 因此我们就从此处切入&#xff0c;大概与组件有关系。…

洛谷 B3635 硬币问题(DP入门)

[题目概述] 今有面值为 1、5、11 元的硬币各无限枚。 想要凑出 n 元&#xff0c;问需要的最少硬币数量。 输入格式 仅一行&#xff0c;一个正整数 n。 输出格式 仅一行&#xff0c;一个正整数&#xff0c;表示需要的硬币个数。 输入 15输出 3输入 12输出 2样例解释 …

深度学习入门笔记(二)神经元的结构

神经网络的基本单元是神经元&#xff0c;本节我们介绍神经元的结构。 2.1 神经元 一个神经元是由下面 5 部分组成的&#xff1a; 输入&#xff1a;x1,x2,…,xk。权重&#xff1a;w1,w2,…,wk。权重的个数与神经元输入的个数相同。偏移项&#xff1a;可省略。激活函数&#…

Hadoop:HDFS学习巩固——基础习题及编程实战

一 HDFS 选择题 1.对HDFS通信协议的理解错误的是&#xff1f; A.客户端与数据节点的交互是通过RPC&#xff08;Remote Procedure Call&#xff09;来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…

【Vue】指令之内容绑定,事件绑定

Vue指令[1] 内容绑定&#xff0c;事件绑定v-test指令v-html指令v-on基础 内容绑定&#xff0c;事件绑定 v-test指令 作用&#xff1a;设置标签的文本值&#xff08;textContent&#xff09; 默认写法会替换全部内容&#xff0c;使用差值表达式可以替换指定内容内部支持写表达…

Nicn的刷题日常之打印菱形

目录 1.题目描述 2.解题思路 3.解题 1.题目描述 用C语言在屏幕上输出以下图案&#xff1a; 2.解题思路 仔细观察图形&#xff0c;可以发现&#xff0c;此图形中是由空格和*按照不同个数的输出组成的。 上三角&#xff1a;先输出空格&#xff0c;后输出*&#xff0c;每…

谈谈BlueStore

目录 未完待续前言组成前期准备工作基础概念对象PextentextentBlobNode 线程事务磁盘的抽象与分配位图法分层位图 上电流程写流程读流程参考资料 未完待续 前言 BlueStore是什么&#xff1f; Ceph是一个统一的分布式存储系统。BlueStore是Ceph的存储引擎。 它的作用是什么&am…

C语言:内存函数(memcpy memmove memset memcmp使用)

和黛玉学编程呀------------- 后续更新的节奏就快啦 memcpy使用和模拟实现 使用 void * memcpy ( void * destination, const void * source, size_t num ) 1.函数memcpy从source的位置开始向后复制num个字节的数据到destination指向的内存位置。 2.这个函数在遇到 \0 的时候…

深入理解网络编程之BIO和NIO

目录 原生JDK网络编程BIO BIO通信模型服务端代码 BIO通信模型客户端代码 伪异步模型服务端代码&#xff08;客户端跟之前一致&#xff09; 原生JDK网络编程NIO 什么是NIO&#xff1f; NIO和BIO的主要区别 阻塞与非阻塞IO NIO之Reactor模式 NIO中Reactor模式的基本组成…

【深度学习】基于PyTorch架构神经网络学习总结(基础概念基本网络搭建)

神经网络整体架构 类似于人体的神经元 神经网络工作原来为层次结构&#xff0c;一层一层的变换数据。如上述示例有4层&#xff0c;1层输入层、2层隐藏层、1层输出层神经元&#xff1a;数据的量或矩阵的大小&#xff0c;如上述示例中输入层中有三个神经元代表输入数据有3个特征…

国家博物馆逆向抢票协议

逆向工程的具体步骤可以因项目和目标系统的不同而有所变化。然而&#xff0c;以下是一般逆向工程的一般步骤&#xff1a; 1. 分析目标系统&#xff1a;对待逆向的系统进行调研和了解&#xff0c;包括其架构、功能、使用的技术等方面的信息。 2. 反汇编或反编译&#xff1a;使…

简单实践 java spring boot 自动配置模拟

1.概要 1.1 需求&#xff0c;自己写一个redis-spring-boot-starter模拟自动配置 自动配置就是在引入*-starter坐标后&#xff0c;可以已经spring框架的规则实现一些Bean的自动注入&#xff0c;并设置一些参数的默认值&#xff0c;且也可以在引入的工程中修改这些配置的值。这…

【算法】约数之和(数论)

题目 给定 n 个正整数 ai&#xff0c;请你输出这些数的乘积的约数之和&#xff0c;答案对 1097 取模。 输入格式 第一行包含整数 n。 接下来 n 行&#xff0c;每行包含一个整数 ai。 输出格式 输出一个整数&#xff0c;表示所给正整数的乘积的约数之和&#xff0c;答案需…