跳至内容
集成数据库支持

集成数据库支持

不同的项目可能使用不同的数据库,需要集成对各种数据库的支持。本篇以对 mysql 和 redis 支持的实现,介绍一下我的具体思路。

确定配置文件与数据结构

数据库信息需要写在配置文件(推荐 toml 格式)中:

config/database_config.toml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 此配置文件用于存放项目数据库连接信息
[mysql]
host = "111.111.1.111"
user = "root"
password = "Password"
database = "xgate"
charset = "utf8mb4"
pool_mincached = 1
pool_maxcached = 4
pool_maxconnections = 10
pool_blocking = true

[redis]
host = "111.111.1.111"
password = "Password"
db = "0"
max_connections = 4

实现数据库连接池

数据库连接池抽象策略接口

common/db/database_connection_pool_strategy.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
数据库连接池策略抽象基类模块

该模块定义了数据库连接池的抽象基类 (DatabaseConnectionPoolStrategy)。
采用策略模式与模板方法模式,封装连接池的通用生命周期管理流程,
强制子类实现具体创建/关闭逻辑,支持多数据库类型扩展。
"""

import traceback
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

from common.file_data_reader import FileDataReader
from common.log_config import setup_logger

logger = setup_logger()


class DatabaseConnectionPoolStrategy(ABC):
    """数据库连接池策略抽象基类

    封装连接池初始化、验证、关闭的标准流程(模板方法模式),
    子类仅需实现具体数据库驱动的创建与关闭逻辑(策略模式)。
    支持上下文管理器与状态查询,提升资源管理安全性与可观察性。

    属性:
        _base_name (str): 配置键名(标识具体数据库实例)
        _config_file_path (str): 配置文件路径
        _pool (Optional[Any]): 连接池实例(None 表示未初始化或已关闭)
    """

    DEFAULT_CONFIG_FILE = "config/database.toml"

    def __init__(
        self,
        base_name: str,
        config_file_path: Optional[str] = None,
    ):
        """初始化连接池策略(立即初始化模式)

        Args:
            base_name: 配置文件中目标数据库配置块的键名
            config_file_path: 配置文件路径。若为 None 或空字符串,使用 DEFAULT_CONFIG_FILE

        Raises:
            FileNotFoundError: 配置文件不存在
            KeyError: 配置中缺失 base_name 对应的配置块
            ValueError: 配置内容非字典或为空
            RuntimeError: 连接池创建过程中发生不可恢复错误
        """
        if not base_name or not isinstance(base_name, str):
            raise ValueError("base_name 必须为非空字符串")

        self._base_name = base_name.strip()
        self._config_file_path = (config_file_path or self.DEFAULT_CONFIG_FILE).strip()
        self._pool: Optional[Any] = None
        self._initialize_pool()

    def _initialize_pool(self) -> None:
        """模板方法:标准化初始化流程(配置加载 → 验证 → 创建)"""
        try:
            logger.debug(
                f"加载配置: file='{self._config_file_path}', section='{self._base_name}'"
            )
            reader = FileDataReader(self._config_file_path)
            _, full_config = reader.read()

            if self._base_name not in full_config:
                err_msg = f"配置缺失: section '{self._base_name}' 不存在于 {self._config_file_path}"
                logger.error(err_msg)
                raise KeyError(err_msg)

            base_config = full_config[self._base_name]
            if not isinstance(base_config, dict) or not base_config:
                err_msg = f"配置无效: section '{self._base_name}' 非字典或为空"
                logger.error(err_msg)
                raise ValueError(err_msg)

            self._pool = self._create_pool(base_config)
            logger.info(
                f"连接池初始化成功 | name='{self._base_name}', "
                f"type={type(self._pool).__name__}, config_keys={list(base_config.keys())}"
            )

        except (FileNotFoundError, KeyError, ValueError) as e:
            logger.error(
                f"连接池初始化失败 [{self._base_name}]: {type(e).__name__} - {e}",
                exc_info=True,
            )
            raise
        except Exception as e:
            logger.critical(
                f"连接池初始化发生未预期异常 [{self._base_name}]: {e}\n{traceback.format_exc()}",
                exc_info=True,
            )
            raise RuntimeError(f"连接池初始化失败: {e}") from e

    @abstractmethod
    def _create_pool(self, config: Dict[str, Any]) -> Any:
        """子类实现:创建具体数据库连接池实例

        Args:
            config: 已验证的配置字典(含 host, port, user, password 等)

        Returns:
            初始化完成的连接池对象

        Note:
            - 子类应处理驱动特定的参数转换与异常
            - 建议对敏感信息(如密码)在日志中脱敏
        """
        pass

    def close_pool(self) -> None:
        """安全关闭连接池(幂等操作)

        - 若连接池已关闭/未初始化,静默跳过
        - 关闭异常仅记录,不中断流程,确保状态重置
        - 最终将 _pool 置为 None 保证状态一致性
        """
        if self._pool is None:
            logger.debug(f"连接池 '{self._base_name}' 已关闭或未初始化,跳过关闭操作")
            return

        try:
            self._close_pool_impl()
            logger.info(f"连接池 '{self._base_name}' 已安全关闭")
        except Exception as e:
            logger.error(
                f"关闭连接池时发生异常 [{self._base_name}]: {e}", exc_info=True
            )
        finally:
            self._pool = None

    @abstractmethod
    def _close_pool_impl(self) -> None:
        """子类实现:执行底层连接池关闭逻辑

        Note:
            - 实现应具备幂等性(重复调用安全)
            - 推荐调用驱动原生的 close()/terminate() 方法
            - 避免在此方法中抛出未处理异常(由 close_pool 统一捕获)
        """
        pass

    @property
    def is_closed(self) -> bool:
        """连接池状态查询:是否已关闭或未初始化"""
        return self._pool is None

    def __enter__(self) -> "DatabaseConnectionPoolStrategy":
        """支持上下文管理器:with pool_strategy as pool: ..."""
        if self.is_closed:
            raise RuntimeError(f"连接池 '{self._base_name}' 已关闭,无法进入上下文")
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
        """退出上下文时自动关闭连接池"""
        self.close_pool()
        return False

    def __del__(self) -> None:
        """析构时尝试清理资源(辅助保障,不替代显式关闭)"""
        if not self.is_closed:
            logger.warning(
                f"连接池 '{self._base_name}' 未显式关闭,触发析构清理。"
                "建议使用 close_pool() 或上下文管理器确保资源释放。"
            )
            try:
                self.close_pool()
            except Exception as e:
                logger.debug(f"析构时关闭连接池异常: {e}")

实现不同数据库连接池策略

首先需要执行 pip install pymysql dbutils 安装 pymysql 库 和 dbutils 库。

common/db/mysql_connection_pool.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
MySQL 连接池策略实现模块

基于 DBUtils.PooledDB + PyMySQL 实现连接池。
支持配置前缀分离、敏感信息脱敏
"""

from typing import Any, Dict, Generator, List, Optional, Union, cast

import pymysql
from dbutils.pooled_db import PooledDB

from common.db.database_connection_pool_strategy import DatabaseConnectionPoolStrategy
from common.log_config import setup_logger

logger = setup_logger()

SENSITIVE_KEYS = {"password", "passwd", "secret", "token"}


class MySQLConnectionPoolStrategy(DatabaseConnectionPoolStrategy):
    """MySQL 连接池策略实现(PooledDB + PyMySQL)"""

    def _create_pool(self, config: Dict[str, Any]) -> PooledDB:
        """
        创建 MySQL 连接池

        配置约定:
        - 连接池参数:以 `pool_` 为前缀(如 pool_maxconnections)
        - 数据库参数:直接使用(host, port, user, password...)

        Args:
            config: 混合配置字典

        Returns:
            PooledDB: 初始化完成的连接池实例

        Raises:
            ValueError: 配置缺失或无效
            pymysql.MySQLError: 数据库连接参数错误
        """
        pool_kwargs = {}
        db_kwargs = {}
        for key, value in config.items():
            if key.startswith("pool_"):
                pool_kwargs[key[5:]] = value
            else:
                db_kwargs[key] = value

        required = {"host", "user", "database"}
        missing = required - db_kwargs.keys()
        if missing:
            raise ValueError(f"MySQL 配置缺失必要参数: {missing}")

        log_db = {
            k: ("***" if k in SENSITIVE_KEYS else v) for k, v in db_kwargs.items()
        }
        log_pool = pool_kwargs.copy()
        if "password" in log_pool:
            log_pool["password"] = "***"

        logger.debug(
            f"创建 MySQL 连接池 | name='{self._base_name}', "
            f"pool_config={log_pool}, db_config={log_db}"
        )

        try:
            pool = PooledDB(creator=pymysql, **pool_kwargs, **db_kwargs)
            logger.info(
                f"MySQL 连接池初始化成功 | name='{self._base_name}', "
                f"pool_size={pool_kwargs.get('maxconnections', 'N/A')}"
            )
            return pool
        except (pymysql.err.OperationalError, pymysql.err.ProgrammingError) as e:
            logger.error(f"MySQL 连接参数错误 [{self._base_name}]: {e}")
            raise ValueError(f"无效的 MySQL 连接配置: {e}") from e
        except TypeError as e:
            logger.error(f"PooledDB 参数错误 [{self._base_name}]: {e}")
            raise ValueError(f"连接池配置参数无效: {e}") from e
        except Exception as _:
            logger.exception(f"MySQL 连接池创建异常 [{self._base_name}]")
            raise

    def _close_pool_impl(self) -> None:
        """
        安全关闭 PooledDB 连接池
        """
        pool = cast(PooledDB, self._pool)
        assert pool is not None, "内部错误:_close_pool_impl 被调用时连接池应已初始化"

        try:
            pool.close()
            logger.debug(f"MySQL 连接池已关闭 [{self._base_name}]")
        except Exception as e:
            logger.debug(f"PooledDB close 执行细节 [{self._base_name}]: {e}")
            raise

    def select_database(
        self,
        sql: str,
        params: Optional[tuple] = None,
    ) -> List[dict]:
        """执行数据库查询操作

        执行SELECT语句并返回查询结果,结果以字典列表形式返回。

        Args:
            sql (str): SQL查询语句
            params (Optional[tuple], optional): SQL参数元组。默认为None

        Returns:
            List[dict]: 查询结果列表,每个元素为一个字典,代表一行数据

        Raises:
            pymysql.MySQLError: 当数据库查询失败时
            Exception: 当执行查询时发生未知错误时
        """
        logger.debug(f"执行查询 [select_database]: {sql}, 参数: {params}")

        try:
            with cast(PooledDB, self._pool).connection() as conn:
                with conn.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
                    execute_time = cursor.execute(sql, params)
                    logger.debug(f"SQL 查询成功执行,影响/获取行数: {execute_time}")

                    results = cursor.fetchall()
                    logger.debug(f"查询完成,共获取 {len(results)} 条记录")
                    return results

        except pymysql.MySQLError as e:
            error_msg = f"数据库查询失败: {e}, 执行的SQL: {sql}, 使用的参数: {params}"
            logger.error(error_msg)
            raise pymysql.MySQLError(error_msg) from e
        except Exception as e:
            error_msg = f"执行查询时发生未知错误: {e}"
            logger.exception(error_msg)
            raise

    def select_large_database(
        self, sql: str, params: Optional[tuple] = None, batch_size: int = 1000
    ) -> Generator[List[dict], None, None]:
        """流式读取大量数据

        用于处理大数据量查询,通过生成器分批返回结果,避免内存溢出。

        Args:
            sql (str): SQL查询语句
            params (Optional[tuple], optional): SQL参数元组。默认为None
            batch_size (int, optional): 每批次返回的记录数。默认为1000

        Yields:
            List[dict]: 每次生成一个批次的查询结果,以字典列表形式返回

        Raises:
            pymysql.MySQLError: 当分批查询数据库失败时
            Exception: 当执行分批查询时发生未知错误时
        """
        logger.debug(
            f"开始流式查询 [select_large_database],SQL: {sql}, 参数: {params}, 批次大小: {batch_size}"
        )

        try:
            with cast(PooledDB, self._pool).connection() as conn:
                with conn.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
                    cursor.execute(sql, params)
                    logger.info("流式查询游标已就绪,开始分批生成数据...")

                    batch_num = 0
                    while True:
                        batch_num += 1
                        batch = cursor.fetchmany(batch_size)

                        if not batch:
                            logger.info(f"流式查询结束,共处理 {batch_num - 1} 个批次")
                            break

                        logger.debug(
                            f"流式查询生成第 {batch_num} 批数据,数量: {len(batch)}"
                        )
                        yield batch

        except pymysql.MySQLError as e:
            error_msg = f"分批查询数据库失败: {e}, SQL: {sql}, 参数: {params}"
            logger.error(error_msg)
            raise pymysql.MySQLError(error_msg) from e
        except Exception as e:
            error_msg = f"执行分批查询时发生未知错误: {e}"
            logger.exception(error_msg)
            raise

    def change_database(
        self,
        sql: str,
        params: Optional[Union[tuple, List[tuple]]] = None,
        batch_size: int = 1000,
    ) -> int:
        """执行数据库变更操作

        执行INSERT、UPDATE、DELETE等数据变更操作,支持单条和批量执行。

        Args:
            sql (str): SQL变更语句
            params (Optional[Union[tuple, List[tuple]]], optional): SQL参数,
                可以是单个元组或元组列表。默认为None
            batch_size (int, optional): 批量操作时的批次大小。默认为1000

        Returns:
            int: 受影响的行数

        Raises:
            TypeError: 当params参数类型错误时
            pymysql.MySQLError: 当数据库变更失败时
            Exception: 当执行数据库变更时发生未知错误时
        """
        operation_type = "批量更新" if isinstance(params, list) else "单条变更"
        logger.debug(
            f"执行数据库变更 [{operation_type}]: {sql}, 参数示例: {params[:3] if isinstance(params, list) else params}"
        )

        try:
            with cast(PooledDB, self._pool).connection() as conn:
                with conn.cursor() as cursor:
                    affected = 0

                    if params is None:
                        affected = cursor.execute(sql)
                        logger.debug(f"无参数执行,影响行数: {affected}")

                    elif isinstance(params, tuple):
                        affected = cursor.execute(sql, params)
                        logger.debug(f"单条执行完成,影响行数: {affected}")

                    elif isinstance(params, list):
                        if not params:
                            logger.debug("传入的参数列表为空,跳过执行")
                            return 0

                        total_params = len(params)
                        logger.info(
                            f"检测到 {total_params} 条待处理数据,开始分批次提交 (批次大小: {batch_size})"
                        )

                        for i in range(0, total_params, batch_size):
                            batch_params = params[i : i + batch_size]
                            batch_affected = cursor.executemany(sql, batch_params)
                            affected += batch_affected
                            logger.debug(
                                f"批次提交完成 [{i // batch_size + 1}],本批次影响: {batch_affected},累计: {affected}"
                            )

                    else:
                        err_msg = f"参数params类型错误,需为None、元组或元组列表,当前类型: {type(params)}"
                        logger.error(err_msg)
                        raise TypeError(err_msg)

                    conn.commit()
                    logger.info(f"事务提交成功,总影响行数: {affected}")
                    return affected

        except pymysql.MySQLError as e:
            error_msg = f"数据库变更失败: {str(e)},执行SQL: {sql}"
            logger.error(error_msg)
            logger.warning("数据库变更异常,事务已自动回滚")
            raise pymysql.MySQLError(error_msg) from e

        except Exception as e:
            error_msg = f"执行数据库变更时发生未知错误: {str(e)}"
            logger.exception(error_msg)
            raise


if __name__ == "__main__":
    mysql_con_pool = MySQLConnectionPoolStrategy("mysql")
    res = mysql_con_pool.select_database("select * from user;")
    # res = mysql_pool.change_database("具体sql")
    print(res)
    mysql_con_pool.close_pool()

实现数据库连接池工厂

common/database_connection_pool.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""
数据库连接池工厂模块

该模块提供了一个统一的工厂类 (DatabaseConnectionPool),
用于根据数据库类型创建和管理不同的连接池实例(如 MySQL, Redis)。

"""
from typing import Optional

import redis
from common.db.mysql_connection_pool import MySQLConnectionPoolStrategy
from common.db.redis_connection_pool import RedisConnectionPoolStrategy


class DatabaseConnectionPool:
    """数据库连接池工厂类

    该类充当策略模式的上下文,根据传入的数据库类型返回相应的连接池实例。
    它封装了具体类的实例化过程,提供了一个统一的接口来获取连接池。

    属性:
        _STRATEGY_MAP (Dict[str, Type]): 数据库类型到连接池类的映射表
    """

    # 策略映射表:键为数据库类型,值为对应的连接池类
    _STRATEGY_MAP = {"mysql": MySQLConnectionPoolStrategy, "redis": RedisConnectionPoolStrategy}

    @staticmethod
    def get_connection_pool(
            database_type: str,
            base_name: str,
            config_file_path: Optional[str] = None
    ):
        """获取指定类型的数据库连接池实例

        根据数据库类型字符串查找对应的连接池类,并初始化返回其实例。

        Args:
            database_type (str): 数据库类型标识符,支持 'mysql' 或 'redis'
            base_name (str): 配置文件中对应的具体配置名称(键名)
            config_file (str, optional): 自定义配置文件路径。默认为None,使用类内默认值

        Returns:
            Union[MysqlConnectionPool, RedisConnectionPool]: 初始化后的连接池实例

        Raises:
            ValueError: 当请求的数据库类型不被支持时
        """
        strategy_class = DatabaseConnectionPool._STRATEGY_MAP.get(database_type)
        if not strategy_class:
            raise ValueError(f"不支持的数据库类型:{database_type}")
        return strategy_class(base_name, config_file_path)


if __name__ == "__main__":
    # 测试 MySQL 连接池
    mysql_pool = DatabaseConnectionPool.get_connection_pool("mysql", "mysql")
    res = mysql_pool.select_database("select * from user;")
    # res = mysql_pool.change_database("具体sql")
    print(res)
    mysql_pool.close_pool()

    # 测试 Redis 连接池
    redis_pool = DatabaseConnectionPool.get_connection_pool("redis", "redis")
    r = redis_pool.get_redis_client()
    res = r.hget("child_hash", "192.168.9.118")
    print(res)
    redis_pool.close_pool()

参考

最后更新于