Skip to content

Commit d2f93d6

Browse files
authored
修复linux下执行pm2 restart后python守护进程未重启
修复linux下执行pm2 restart后python守护进程未重启
1 parent ac7d6f0 commit d2f93d6

File tree

1 file changed

+21
-32
lines changed

1 file changed

+21
-32
lines changed

spider/py/core/t4_daemon.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
INIT_TIMEOUT = 15 # 初始化超时(秒)
2828
IDLE_EXPIRE = 30 * 60 # 实例空闲过期(秒)
2929
CLEAN_INTERVAL = 5 * 60 # 清理间隔(秒)
30-
REQUEST_TIMEOUT = 120 # 单次请求socket超时(秒)
30+
REQUEST_TIMEOUT = 120 # 单次请求 socket 超时(秒)
3131

3232
LOG_LEVEL = os.environ.get("T4_LOG_LEVEL", "INFO").upper()
3333
LOG_FILE = os.environ.get("T4_LOG_FILE") # 若未设置则只打到控制台
34-
PID_FILE = os.environ.get("T4_PID_FILE") # 若设置则会写入PID
34+
PID_FILE = os.environ.get("T4_PID_FILE") # 若设置则会写入 PID
3535

3636
# =========================
3737
# 日志
@@ -71,7 +71,6 @@
7171
'action': 'action',
7272
}
7373

74-
7574
# =========================
7675
# 工具:长度前缀协议
7776
# =========================
@@ -87,7 +86,6 @@ def recv_exact(rfile, n: int) -> bytes:
8786
remaining -= len(chunk)
8887
return b"".join(chunks)
8988

90-
9189
def send_packet(wfile, obj: dict):
9290
payload = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
9391
if len(payload) > MAX_MSG_SIZE:
@@ -96,7 +94,6 @@ def send_packet(wfile, obj: dict):
9694
wfile.write(payload)
9795
wfile.flush()
9896

99-
10097
def recv_packet(rfile) -> dict:
10198
header = recv_exact(rfile, 4)
10299
(length,) = struct.unpack(">I", header)
@@ -105,7 +102,6 @@ def recv_packet(rfile) -> dict:
105102
payload = recv_exact(rfile, length)
106103
return pickle.loads(payload)
107104

108-
109105
# =========================
110106
# Spider 管理
111107
# =========================
@@ -117,7 +113,6 @@ def __init__(self, spider):
117113
self.init_event = threading.Event()
118114
self.last_used = time.time()
119115

120-
121116
class SpiderManager:
122117
def __init__(self, logger):
123118
self.logger = logger
@@ -154,7 +149,6 @@ def _parse_env(env_str: str):
154149
proxy_url = data.get("proxyUrl", "") or ""
155150
ext = data.get("ext", "") or ""
156151
except Exception:
157-
# 非JSON字符串时,保持兼容:当作 ext 传
158152
ext = env_str
159153
return proxy_url, ext
160154

@@ -163,12 +157,10 @@ def _instance_key(self, script_path: str, env_str: str) -> str:
163157
key_data = f"{Path(script_path).resolve()}|{proxy_url}|{ext}"
164158
return hashlib.sha256(key_data.encode("utf-8")).hexdigest()
165159

166-
# ---------- 动态导入 ----------
167160
def _load_module_from_file(self, file_path: Path):
168161
name = file_path.stem
169162
logger.info("_load_module_from_file %s", name)
170-
# 加入项目根目录到 sys.path,保证 base.* 可以被导入
171-
project_root = file_path.parent # 假设 py 是根目录
163+
project_root = file_path.parent
172164
if str(project_root) not in sys.path:
173165
sys.path.insert(0, str(project_root))
174166
logger.info("Added %s to sys.path", project_root)
@@ -184,7 +176,6 @@ def _import_spider_module(self, script_path: str):
184176
p = Path(script_path)
185177
if p.exists() and p.is_file() and p.suffix == ".py":
186178
return self._load_module_from_file(p)
187-
# 作为模块路径导入(已在 sys.path 中)
188179
return importlib.import_module(script_path)
189180

190181
def _create_spider(self, script_path: str, env_str: str):
@@ -237,11 +228,9 @@ def _ensure_instance(self, script_path: str, env_str: str) -> SpiderInstance:
237228
return inst
238229

239230
def call(self, script_path: str, method_name: str, env_str: str, args_list):
240-
# 解析 env 中 ext
241231
_, ext = self._parse_env(env_str)
242232
inst = self._ensure_instance(script_path, env_str)
243233

244-
# init 分支:同步初始化
245234
if method_name == "init":
246235
with threading.Lock():
247236
if inst.initializing:
@@ -258,7 +247,6 @@ def call(self, script_path: str, method_name: str, env_str: str, args_list):
258247
inst.initializing = False
259248
return {"status": "already initialized"}
260249

261-
# 其他方法:若未初始化,则异步触发 + 等待
262250
if not inst.initialized:
263251
if not inst.initializing:
264252
def _bg():
@@ -267,16 +255,12 @@ def _bg():
267255
inst.initialized = True
268256
inst.init_event.set()
269257
except Exception:
270-
# 失败也置事件,避免永等
271258
inst.init_event.set()
272-
273259
inst.initializing = True
274260
threading.Thread(target=_bg, daemon=True).start()
275-
276261
if not inst.init_event.wait(INIT_TIMEOUT) or not inst.initialized:
277262
return {"success": False, "error": "init timeout or failed"}
278263

279-
# 解析 args
280264
parsed_args = []
281265
for a in (args_list or []):
282266
if isinstance(a, (dict, list, int, float, bool, type(None))):
@@ -289,14 +273,12 @@ def _bg():
289273
else:
290274
parsed_args.append(a)
291275

292-
# 方法映射
293276
invoke = METHOD_MAP.get(method_name, method_name)
294277
if not hasattr(inst.spider, invoke):
295278
return {"success": False, "error": f"Spider missing method '{invoke}'"}
296279

297280
try:
298281
result = getattr(inst.spider, invoke)(*parsed_args)
299-
# 若 Spider 提供 json2str 则尝试序列化
300282
if result is not None and hasattr(inst.spider, "json2str"):
301283
try:
302284
return inst.spider.json2str(result)
@@ -311,13 +293,11 @@ def _bg():
311293
"traceback": traceback.format_exc(),
312294
}
313295

314-
315296
# =========================
316297
# Server
317298
# =========================
318299
_manager = SpiderManager(logger)
319300

320-
321301
class T4Handler(StreamRequestHandler):
322302
def handle(self):
323303
self.request.settimeout(REQUEST_TIMEOUT)
@@ -337,41 +317,50 @@ def handle(self):
337317
resp["error"] = result.get("error")
338318
if result.get("traceback"):
339319
resp["traceback"] = result["traceback"]
340-
341320
send_packet(self.wfile, resp)
342321
except Exception as e:
343322
logger.error("Handle error: %s", e)
344323
try:
345324
send_packet(self.wfile, {"success": False, "error": str(e)})
346325
except Exception:
347-
pass # 对端已断开
348-
326+
pass
349327

350328
class ThreadedTCPServer(ThreadingMixIn, TCPServer):
351329
daemon_threads = True
352330
allow_reuse_address = True
353331

354-
355332
def run():
356-
def _stop(*_):
357-
logger.info("Stopping server ...")
333+
# ✅ 让当前进程成为进程组组长,方便整组 kill
334+
if os.name == "posix":
335+
os.setpgrp()
336+
337+
def _stop(signum, frame):
338+
"""
339+
收到 SIGINT/SIGTERM 时的回调:
340+
1. 停止 SpiderManager
341+
2. 关闭 TCP Server
342+
3. 强杀整个进程组,确保进程立即退出
343+
"""
344+
logger.info("Received %s, shutting down...", signum)
358345
_manager.stop()
359-
# 让 serve_forever() 退出
360346
srv.shutdown()
347+
# 立即强制退出,防止线程阻塞
348+
os._exit(0)
361349

350+
# 注册信号
362351
if os.name == "posix":
363-
signal.signal(signal.SIGTERM, _stop)
364352
signal.signal(signal.SIGINT, _stop)
353+
signal.signal(signal.SIGTERM, _stop)
365354

366355
global srv
367356
srv = ThreadedTCPServer((HOST, PORT), T4Handler)
368357
logger.info("T4 daemon listening on %s:%d", HOST, PORT)
358+
369359
try:
370360
srv.serve_forever(poll_interval=0.5)
371361
finally:
372362
srv.server_close()
373363
logger.info("Server closed.")
374364

375-
376365
if __name__ == "__main__":
377366
run()

0 commit comments

Comments
 (0)