This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
zhangyang-variable-monitor/source/ucli_py/lib.py
2023-11-23 04:45:35 -05:00

234 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import bisect
import os
import re
import subprocess
import ctypes
from ctypes import *
MAX_NAME_LEN = 15
TIMER_MAX_WATCH_NUM = 32
CGROUP_NAME_LEN = 32
TASK_COMM_LEN = 16
BACKTRACE_DEPTH = 30
PROCESS_CHAINS_COUNT = 10
PROCESS_ARGV_LEN = 128
DIAG_USER_STACK_SIZE = 16 * 1024
class threshold(ctypes.Structure):
_fields_ = [
("task_id", c_int),
("name", c_char * (MAX_NAME_LEN + 1)),
("ptr", c_void_p),
("threshold", c_longlong),
]
class variable_monitor_record(ctypes.Structure):
_fields_ = [
("et_type", c_int),
("id", c_ulong),
("tv", c_ulonglong),
("threshold_num", c_int),
("threshold_record", threshold * TIMER_MAX_WATCH_NUM),
]
class task_detail(ctypes.Structure):
_fields_ = [
("cgroup_buf", c_char * CGROUP_NAME_LEN),
("cgroup_cpuset", c_char * CGROUP_NAME_LEN),
("pid", c_int),
("tgid", c_int),
("container_pid", c_int),
("container_tgid", c_int),
("state", c_long),
("task_type", c_int),
("syscallno", c_ulong),
("sys_task", c_ulong),
("user_mode", c_ulong),
("comm", c_char * TASK_COMM_LEN),
]
class PtRegs(Structure):
_fields_ = [
("r15", c_ulong),
("r14", c_ulong),
("r13", c_ulong),
("r12", c_ulong),
("rbp", c_ulong),
("rbx", c_ulong),
("r11", c_ulong),
("r10", c_ulong),
("r9", c_ulong),
("r8", c_ulong),
("rax", c_ulong),
("rcx", c_ulong),
("rdx", c_ulong),
("rsi", c_ulong),
("rdi", c_ulong),
("orig_rax", c_ulong),
("rip", c_ulong),
("cs", c_ulong),
("eflags", c_ulong),
("rsp", c_ulong),
("ss", c_ulong),
]
class user_stack_detail(ctypes.Structure):
_fields_ = [
("regs", PtRegs), # Replace with actual type
("ip", c_ulong),
("bp", c_ulong),
("sp", c_ulong),
("stack", c_ulong * BACKTRACE_DEPTH),
]
class raw_stack_detail(ctypes.Structure):
_fields_ = [
("regs", PtRegs), # Replace with actual type
("ip", c_ulong),
("bp", c_ulong),
("sp", c_ulong),
("stack_size", c_ulong),
("stack", c_ulong * (DIAG_USER_STACK_SIZE // 8)),
]
class kern_stack_detail(ctypes.Structure):
_fields_ = [
("stack", c_ulong * BACKTRACE_DEPTH),
]
class InnerArray(ctypes.Structure):
_fields_ = [("chain", ctypes.c_char * PROCESS_ARGV_LEN)]
temp_array = (
InnerArray * PROCESS_CHAINS_COUNT
) # Creates an array of 10 user_stack_detail objects
class proc_chains_detail(ctypes.Structure):
_fields_ = [
("full_argv", c_uint * PROCESS_CHAINS_COUNT),
# ("chains", ctypes.c_char * PROCESS_ARGV_LEN * PROCESS_CHAINS_COUNT),
("chains", (c_char * PROCESS_ARGV_LEN) * PROCESS_CHAINS_COUNT),
# ("chains", temp_array),
("tgid", c_uint * PROCESS_CHAINS_COUNT),
]
class variable_monitor_task(ctypes.Structure):
_fields_ = [
("et_type", c_int),
("id", c_ulong),
("tv", c_ulonglong),
("task", task_detail),
("user_stack", user_stack_detail),
("kern_stack", kern_stack_detail),
("proc_chains", proc_chains_detail),
("raw_stack", raw_stack_detail),
]
class ProcMapsParser:
__pattern = re.compile(r"([0-9a-f]+)-([0-9a-f]+) [\w-]+ [\w:]+ [\w:]+ \S+ (.*?)\n")
def __init__(self, pid):
self.ranges = []
self.names = []
try:
with open(f"/proc/{pid}/maps", "r") as f:
for line in f:
m = self.__pattern.match(line)
if m is not None:
start, end, name = m.groups()
# remove " "
name = name.strip()
start = int(start, 16)
end = int(end, 16)
if name == "" or name == None:
name = None
elif not os.path.isabs(name):
name = None
self.ranges.append((start, end))
self.names.append(name)
except FileNotFoundError:
return None
def lookup(self, addr):
# 内核空间地址通常在高内存区域,我们设定一个阈值为 0x7fffffffffff
if addr > 0x7FFFFFFFFFFF:
return "kernel", addr
i = bisect.bisect(self.ranges, (addr, addr)) - 1
if i >= 0 and self.ranges[i][0] <= addr < self.ranges[i][1]:
offset = addr - self.ranges[i][0]
return self.names[i], offset
return None, None
__pattern = re.compile(r"([0-9a-f]+) ([Tt]) (.*)")
def addr_to_symbol(path, offset):
output = subprocess.check_output(["nm", "-D", path])
lines = output.decode().splitlines()
lines = [line for line in lines if " " in line]
lines.sort() # Sort by address
prev_sym = None
for line in lines:
match = __pattern.match(line)
if match is None: # Skip if the line does not match the pattern
continue
addr_str, type_, sym = match.groups()
addr = int(addr_str, 16)
if addr > offset:
return prev_sym
prev_sym = sym
return prev_sym
def diag_printf_proc_chains(proc_chains: proc_chains_detail, detail=1):
print(" 进程链信息:")
for i in range(PROCESS_CHAINS_COUNT):
a = proc_chains.chains[i][0]
if proc_chains.chains[i][0] == b"\x00":
break
if proc_chains.full_argv[i] == 0 and detail:
cmdline = get_pid_cmdline(proc_chains.tgid[i])
if len(cmdline) > 0:
print(f"#^ 0xffffffffffffff {cmdline} (UNKNOWN)")
else:
chains_bytes = bytes(proc_chains.chains[i])
chains_str = chains_bytes.decode(errors="ignore").split("\0", 1)[0]
print(f"#^ 0xffffffffffffff {chains_str} (UNKNOWN)")
else:
chains_bytes = bytes(proc_chains.chains[i])
chains_str = chains_bytes.decode(errors="ignore").split("\0", 1)[0]
print(f"#^ 0xffffffffffffff {chains_str} (UNKNOWN)")
cmdlines = {}
def get_pid_cmdline(pid):
if pid not in cmdlines:
try:
with open(f"/proc/{pid}/cmdline", "r") as f:
buf = f.read().replace("\x00", " ").strip()
cmdlines[pid] = buf
except FileNotFoundError:
cmdlines[pid] = ""
return cmdlines[pid]
def printk_task_brief(detail: task_detail):
print(
f" 进程信息: [{detail.cgroup_buf.decode()} / {detail.comm.decode()}], PID {detail.tgid} / {detail.pid}"
)