import os import ctypes import fcntl import pickle import bisect from lib import * DEVICE = "/dev/variable_monitor" file_desc = None def open_device(): global file_desc if file_desc is None: try: file_desc = os.open(DEVICE, os.O_RDWR) except OSError: print(f"Can't open device file: {DEVICE}") return -1 return 0 def close_device(): global file_desc if file_desc is None: print(f"Device not open: {DEVICE}, {file_desc}") return file_desc os.close(file_desc) file_desc = None return 0 class ioctl_dump_param(Structure): _fields_ = [ ("user_ptr_len", POINTER(c_int)), ("user_buf_len", c_size_t), ("user_buf", c_void_p), ] variant_buf = create_string_buffer(50 * 1024 * 1024) len_temp = c_int() def do_dump(arg): dump_param = ioctl_dump_param( user_ptr_len=pointer(len_temp), user_buf_len=50 * 1024 * 1024, user_buf=addressof(variant_buf), ) ret = fcntl.ioctl(file_desc, 1, dump_param) if ret == 0: buf_bytes = bytes(variant_buf) with open("buff", "wb") as f: pickle.dump(buf_bytes, f) with open("len", "wb") as f: pickle.dump(len_temp, f) do_extract(variant_buf, len_temp.value) def do_extract(buf, len): extract_variant_buffer(buf, len, load_monitor_extract, None) class diag_variant_buffer_head(Structure): _fields_ = [ ("magic", c_ulong), ("len", c_ulong), ] DIAG_VARIANT_BUFFER_HEAD_MAGIC_SEALED = 197612031122 def extract_variant_buffer(buf, len, func, arg): pos = 0 dir = os.getcwd() while pos < len: head = diag_variant_buffer_head.from_buffer(buf, pos) if pos + ctypes.sizeof(diag_variant_buffer_head) >= len: break if head.magic != DIAG_VARIANT_BUFFER_HEAD_MAGIC_SEALED: break if head.len < ctypes.sizeof(diag_variant_buffer_head): break rec = buf[pos + ctypes.sizeof(diag_variant_buffer_head) : pos + head.len] rec_len = head.len - ctypes.sizeof(diag_variant_buffer_head) func(rec, rec_len, arg) pos += head.len os.chdir(dir) def parse_file(file_path): result = {} with open(file_path, "r") as file: for line in file: parts = line.split() if len(parts) == 3 and (parts[1] == "t" or parts[1] == "T"): address = int(parts[0], 16) # Convert address from string to c_ulong name = parts[2] result[address] = name return result def parse_kallsyms(filename): symbols = [] with open(filename, "r") as f: for line in f: parts = line.split() if len(parts) < 3: continue if parts[1] in ("t", "T"): # only consider text symbols addr = int(parts[0], 16) name = parts[2] symbols.append((addr, name)) # sort by address symbols.sort() return symbols def print_stack_trace(symbols, address): symbol_addrs = [addr for addr, _ in symbols] # find the nearest symbol that is less than or equal to the addr i = bisect.bisect_right(symbol_addrs, address) if i: nearest_addr, nearest_symbol = symbols[i - 1] offset = address - nearest_addr return (nearest_symbol, offset) print(f"{nearest_symbol}+0x{offset:x}") else: return (None, None) # kallsymsMap = parse_file("/proc/kallsyms") # parse /proc/kallsyms symbols = parse_kallsyms("/proc/kallsyms") def diag_printf_kern_stack(kern_stack: kern_stack_detail): print(" 内核态堆栈:") for i in range(BACKTRACE_DEPTH - 1, -1, -1): if (kern_stack.stack[i] == ctypes.c_ulong(-1)) or (kern_stack.stack[i] == 0): continue # temp = kern_stack.stack[i] nearest_symbol, offset = print_stack_trace(symbols, kern_stack.stack[i]) # sym = kallsymsMap.get(kern_stack.stack[i]) if nearest_symbol != None: print( "#@ 0x%lx %s ([kernel.kallsyms])" % (kern_stack.stack[i], nearest_symbol) ) else: print("#@ 0x%lx %s" % (kern_stack.stack[i], "UNKNOWN")) def print_structure(structure, indent=0, struct_name=""): indent_spaces = " " * indent for field_name, field_type in structure._fields_: # if field_name == "chains": # print_structure(value, indent + 4, field_name) value = getattr(structure, field_name) if hasattr(value, "_fields_"): # 如果是嵌套的结构体 print(f"{indent_spaces}{struct_name} {field_name}:") print_structure(value, indent + 4, field_name) # 递归打印,增加缩进 elif isinstance(value, ctypes.Array): # 如果是数组 if field_name == "chains": for i, item in enumerate(value): chains_bytes = bytes(item) chains_str = chains_bytes.decode(errors="ignore") print(f"{indent_spaces}{struct_name} {field_name}: {chains_str}") else: print(f"{indent_spaces}{struct_name} {field_name}: {list(value)}") else: print(f"{indent_spaces}{struct_name} {field_name}: {value}") def diag_printf_user_stack(pid: int, user_stack: user_stack_detail): p = None print(" 用户态堆栈:") for i in range(BACKTRACE_DEPTH - 1, -1, -1): a = user_stack.stack[i] if (user_stack.stack[i] == 18446744073709551615) or (user_stack.stack[i] == 0): continue if not p: p = ProcMapsParser(pid) if not p: print("#~ 0x%lx NO /proc/pid/maps" % (user_stack.stack[i])) continue path, addr = p.lookup(user_stack.stack[i]) if path != None: if path == "kernel": nearest_symbol, offset = print_stack_trace(symbols, addr) print( "#~ 0x%lx %s ([kernel.kallsyms])" % (user_stack.stack[i], nearest_symbol) ) else: symbol = addr_to_symbol(path, addr) print("#~ 0x%lx %s ([symbol])" % (user_stack.stack[i], symbol)) else: print("#~ 0x%lx UNKNOWN" % (user_stack.stack[i])) # 加载 .so 文件 lib = cdll.LoadLibrary("/root/variable_monitor/source/ucli_py/libunwind/libunwind.so") # 定义原型等价的ctypes函数 lib.diag_printf_raw_stack.argtypes = [ ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.POINTER(raw_stack_detail), ctypes.c_int, ] lib.diag_printf_raw_stack.restype = None def diag_printf_raw_stack(task_info: variable_monitor_task): lib.diag_printf_raw_stack( task_info.task.tgid, task_info.task.container_tgid, task_info.task.comm, ctypes.byref(task_info.raw_stack), c_int(1), ) def load_monitor_extract(buf, len, _): seq = 0 if len == 0: return 0 et_type = ctypes.cast(buf, ctypes.POINTER(ctypes.c_int)).contents.value if et_type == 0: if len < ctypes.sizeof(variable_monitor_record): return 0 vm_record = ctypes.cast(buf, ctypes.POINTER(variable_monitor_record)).contents print(f"超出阈值:{vm_record.tv}") for i in range(vm_record.threshold_num): print( f"\t: pid: {vm_record.threshold_record[i].task_id}, name: {vm_record.threshold_record[i].name.decode()}, ptr: {vm_record.threshold_record[i].ptr}, threshold:{vm_record.threshold_record[i].threshold}" ) elif et_type == 1: if len < ctypes.sizeof(variable_monitor_task): return 0 tsk_info = ctypes.cast(buf, ctypes.POINTER(variable_monitor_task)).contents seq += 1 print( f"##CGROUP:[{tsk_info.task.cgroup_buf.decode()}] {tsk_info.task.pid} [{seq:03d}] 采样命中[{'R' if tsk_info.task.state == 0 else 'D'}]" ) # 打印 tsk_info # print_structure(tsk_info, struct_name="variable_monitor_task") printk_task_brief(tsk_info.task) # diag_printf_user_stack(tsk_info.task.pid, tsk_info.user_stack) diag_printf_raw_stack(tsk_info) # for user_stack :18446744073709551615 print(f"#* 0xffffffffffffff {tsk_info.task.comm.decode()} (UNKNOWN)") diag_printf_kern_stack(tsk_info.kern_stack) diag_printf_proc_chains(tsk_info.proc_chains) print("##") return 0 if __name__ == "__main__": # open_device() # do_dump(None) # close_device() with open("buff", "rb") as f: buf_bytes = pickle.load(f) buf2 = ctypes.create_string_buffer(len(buf_bytes)) buf2.raw = buf_bytes with open("len", "rb") as f: len2 = pickle.load(f) do_extract(buf2, len2.value)