在 Sanic 应用中使用内存缓存管理 IP 黑名单

news/2024/12/24 3:11:45 标签: 缓存, tcp/ip, 网络协议, Python

[外链图片转存中…(img-Pm0K9mzd-1734859380698)]

在现代 web 应用中,保护 API 接口免受恶意请求的攻击至关重要。IP 黑名单是一种常见的安全措施,可以有效阻止某些 IP 地址的访问。本文将介绍如何在 Python 的 Sanic 框架中实现 IP 黑名单功能,并结合内存缓存提升性能。

1. 环境准备

首先,确保你已经安装了 Sanic 和必要的数据库驱动程序。我们将使用 aiomysql 作为 MySQL 的异步驱动。可以使用以下命令进行安装:

pip install sanic aiomysql

2. 创建数据库表

我们需要一个数据库表来存储黑名单 IP。可以使用以下 SQL 语句创建表:

CREATE TABLE api_ip_blacklist (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识',
    ip_address VARCHAR(45) NOT NULL COMMENT '被拦截的 IP 地址,支持 IPv4 和 IPv6',
    status TINYINT DEFAULT 1 COMMENT '状态,1 表示有效(黑名单),0 表示无效(可用)',
    reason VARCHAR(255) DEFAULT NULL COMMENT '记录添加到黑名单的原因',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',
    UNIQUE KEY (ip_address) COMMENT '确保 IP 地址唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='API 接口黑名单 IP 表';

3. 实现内存缓存

我们可以创建一个 CacheObject 类来管理内存中的黑名单 IP 缓存。以下是该类的实现:

class CacheObject(object):
    def __init__(self):
        self.data = dict()

    def put(self, key, value):
        """将 IP 地址放入缓存"""
        self.data[key] = value

    def get(self, key):
        """从缓存中获取 IP 地址的状态"""
        return self.data.get(key)

    def load_blacklist(self, ip_list):
        """从数据库加载黑名单 IP 到缓存"""
        for ip in ip_list:
            self.put(ip, 1)  # 1 表示有效(黑名单)

    def clear(self):
        """清空缓存"""
        self.data.clear()

4. 集成 Sanic 应用

接下来,我们将 CacheObject 集成到 Sanic 应用中,实现动态加载和检查黑名单功能。

from sanic import Sanic, response
from sanic.exceptions import Forbidden
import aiomysql
import asyncio

app = Sanic("IPBlacklistApp")

# 创建 CacheObject 实例
blacklist_cache = CacheObject()

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'your_username',
    'password': 'your_password',
    'db': 'your_database'
}

async def load_blacklist():
    async with aiomysql.connect(**DB_CONFIG) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT ip_address FROM api_ip_blacklist WHERE status = 1")
            rows = await cursor.fetchall()
            blacklist_cache.load_blacklist([row[0] for row in rows])  # 加载黑名单到缓存

async def update_blacklist():
    while True:
        await load_blacklist()
        await asyncio.sleep(60)  # 每60秒更新一次黑名单

@app.listener('before_server_start')
async def setup_db(app, loop):
    await load_blacklist()  # 启动时加载黑名单到缓存
    app.add_task(update_blacklist())  # 定时更新黑名单

@app.middleware("request")
async def block_blacklisted_ips(request):
    client_ip = request.ip
    if blacklist_cache.get(client_ip) == 1:  # 检查 IP 是否在黑名单中
        raise Forbidden("Your IP address is blocked.")

@app.route("/")
async def index(request):
    return response.json({"message": "Welcome to the API!"})

@app.route("/data")
async def data(request):
    return response.json({"data": "Here is your data!"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

5. 代码解析

  • 数据库连接:使用 aiomysql 连接到 MySQL 数据库并查询黑名单 IP。
  • 内存缓存:通过 CacheObject 类将黑名单 IP 缓存到内存中,提高查询效率。
  • 中间件:在请求处理中检查客户端 IP 是否在黑名单中,若在则抛出 Forbidden 异常。

6. 测试应用

启动应用后,您可以通过 Postman 或浏览器访问接口。确保数据库中有有效的黑名单 IP 数据。访问黑名单 IP 时,您将收到 403 Forbidden 响应,表示该 IP 被阻止。

7. 总结

通过结合 Sanic 框架和内存缓存,我们成功实现了一个高效的 IP 黑名单管理系统。这种方法有效地提高了查询速度,减少了对数据库的频繁访问,为 API 接口提供了更好的安全保障。


http://www.niftyadmin.cn/n/5797255.html

相关文章

【HarmonyOs学习日志(14)】计算机网络之域名系统DNS

域名系统DNS 域名系统DNS——从域名解析出IP地址 文章目录 域名系统DNS概述域名到IP地址的解析 互联网的域名结构命名标准 域名服务器域名的解析过程 概述 域名系统DNS(Domain Name System)是互联网使用的命名系统,用来把便于人们使用的机器…

Qt中的QProcess与Boost.Interprocess:实现多进程编程

目录 QProcess简介 启动进程的不同方式 例子1:打开记事本程序 例子2:执行带有管道(|)的Linux命令 同步进程API Boost.Interprocess简介 (一)共享内存: (二)命名信…

kubernates实战

使用k8s来部署tomcat 1、创建一个部署,并指定镜像地址 kubectl create deployment tomcat6 --imagetomcat:6.0.53-jre82、查看部署pod状态 kubectl get pods # 获取default名称空间下的pods kubectl get pods --all-namespaces # 获取所有名称空间下的pods kubect…

【网络】超以太网联盟 UEC|下一代 “RoCE” 协议--编辑中

术语 UEC: 超级以太联盟 UET: 超级以太传输协议 Tail latency: 尾部延迟,(以通信阶段最后一条消息的到达时间为衡量标准)是系统性能的关键指标。 未来 AI 和 HPC 网络的关键需求 为了实现低尾延迟,UEC 规范通过满足下一代应用…

HarmonyOS 实践 - 设计模式在代码中的作用

文章目录 前言设计模式概述单例模式:全局状态管理代码分析 策略模式:界面主题切换代码分析 示例测试单例模式测试策略模式测试 体验评价总结 前言 在软件开发中,设计模式是公认的最佳实践,它能帮助开发者通过模块化和规范化的代码…

【Mybatis-Plus】连表查询 逻辑删除 多租户

文章目录 连表查询逻辑删除多租户 连表查询 引入 mybatis-plus-join-boot-starter 依赖 <dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join-boot-starter</artifactId><version>1.5.1</version>…

VSCode 中 Git 功能比较:内置 Git、GitLens 与 Git History 插件

在软件开发领域&#xff0c;版本控制是维护代码变更的重要工具。Git 作为最流行的版本控制系统&#xff0c;被广泛集成在各种代码编辑器中。Visual Studio Code&#xff08;VSCode&#xff09;不仅内置了 Git 支持&#xff0c;还提供了丰富的扩展来增强 Git 功能。本文将对比 V…

SQL 使用带聚集函数的联结

聚集函数用于汇总数据&#xff0c;通常用于从一个表中计算统计信息&#xff0c;但也可以与联结一起使用。以下是一个例子&#xff0c;展示如何使用聚集函数统计每个顾客的订单数。 示例 1&#xff1a;使用 COUNT() 函数与 INNER JOIN 假设我们需要检索所有顾客及每个顾客所下…