From 41719a00e7c0390085840fdef5c598dad54c8476 Mon Sep 17 00:00:00 2001 From: fawney19 Date: Sun, 28 Dec 2025 21:34:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=94=B9=E8=BF=9B=E5=88=86?= =?UTF-8?q?=E5=B8=83=E5=BC=8F=E4=BB=BB=E5=8A=A1=E9=94=81=E7=9A=84=E6=B8=85?= =?UTF-8?q?=E7=90=86=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现两种锁清理模式: - 单实例模式(默认):启动时使用 Lua 脚本原子性清理旧锁,解决 worker 重启时���锁残留问题 - 多实例模式:使用 NX 选项竞争锁,依赖 TTL 处理异常退出 可通过 SINGLE_INSTANCE_MODE 环境变量控制模式选择。 --- src/utils/task_coordinator.py | 69 +++++++++++++++++++++++++++++++---- 1 file changed, 61 insertions(+), 8 deletions(-) diff --git a/src/utils/task_coordinator.py b/src/utils/task_coordinator.py index 432ef5f..510bb1b 100644 --- a/src/utils/task_coordinator.py +++ b/src/utils/task_coordinator.py @@ -1,8 +1,16 @@ -"""分布式任务协调器,确保仅有一个 worker 执行特定任务""" +"""分布式任务协调器,确保仅有一个 worker 执行特定任务 + +锁清理策略: +- 单实例模式(默认):启动时使用原子操作清理旧锁并获取新锁 +- 多实例模式:使用 NX 选项竞争锁,依赖 TTL 处理异常退出 + +使用方式: +- 默认行为:启动时清理旧锁(适用于单机部署) +- 多实例部署:设置 SINGLE_INSTANCE_MODE=false 禁用启动清理 +""" from __future__ import annotations -import asyncio import os import pathlib import uuid @@ -19,6 +27,10 @@ except ImportError: # pragma: no cover - Windows 环境 class StartupTaskCoordinator: """利用 Redis 或文件锁,保证任务只在单个进程/实例中运行""" + # 类级别标记:在当前进程中是否已尝试过启动清理 + # 注意:这在 fork 模式下每个 worker 都是独立的 + _startup_cleanup_attempted = False + def __init__(self, redis_client=None, lock_dir: Optional[str] = None): self.redis = redis_client self._tokens: Dict[str, str] = {} @@ -26,6 +38,8 @@ class StartupTaskCoordinator: self._lock_dir = pathlib.Path(lock_dir or os.getenv("TASK_LOCK_DIR", "./.locks")) if not self._lock_dir.exists(): self._lock_dir.mkdir(parents=True, exist_ok=True) + # 单实例模式:启动时清理旧锁(适用于单机部署,避免残留锁问题) + self._single_instance_mode = os.getenv("SINGLE_INSTANCE_MODE", "true").lower() == "true" def _redis_key(self, name: str) -> str: return f"task_lock:{name}" @@ -36,12 +50,51 @@ class StartupTaskCoordinator: if self.redis: token = str(uuid.uuid4()) try: - acquired = await self.redis.set(self._redis_key(name), token, nx=True, ex=ttl) - if acquired: - self._tokens[name] = token - logger.info(f"任务 {name} 通过 Redis 锁独占执行") - return True - return False + if self._single_instance_mode: + # 单实例模式:使用 Lua 脚本原子性地"清理旧锁 + 竞争获取" + # 只有当锁不存在或成功获取时才返回 1 + # 这样第一个执行的 worker 会清理旧锁并获取,后续 worker 会正常竞争 + script = """ + local key = KEYS[1] + local token = ARGV[1] + local ttl = tonumber(ARGV[2]) + local startup_key = KEYS[1] .. ':startup' + + -- 检查是否已有 worker 执行过启动清理 + local cleaned = redis.call('GET', startup_key) + if not cleaned then + -- 第一个 worker:删除旧锁,标记已清理 + redis.call('DEL', key) + redis.call('SET', startup_key, '1', 'EX', 60) + end + + -- 尝试获取锁(NX 模式) + local result = redis.call('SET', key, token, 'NX', 'EX', ttl) + if result then + return 1 + end + return 0 + """ + result = await self.redis.eval( + script, 2, + self._redis_key(name), self._redis_key(name), + token, ttl + ) + if result == 1: + self._tokens[name] = token + logger.info(f"任务 {name} 通过 Redis 锁独占执行") + return True + return False + else: + # 多实例模式:直接使用 NX 选项竞争锁 + acquired = await self.redis.set( + self._redis_key(name), token, nx=True, ex=ttl + ) + if acquired: + self._tokens[name] = token + logger.info(f"任务 {name} 通过 Redis 锁独占执行") + return True + return False except Exception as exc: # pragma: no cover - Redis 异常回退 logger.warning(f"Redis 锁获取失败,回退到文件锁: {exc}")