# # Name:fang xiaoyu # # Time: 2023/3/10 23:53 # # 导入所需库 # import pandas as pd # from sklearn.neighbors import KNeighborsClassifier # from sklearn.model_selection import train_test_split # from subprocess import run # # # 使用Tranalyzer2分析PCAP文件,并提取TCP流量特征 # def extract_features(pcap_file): # # 定义Tranalyzer2命令行参数 # tranalyzer_args = [ # "t2build", "-x", "tcp", "--no-tests", "--no-progress", # "--tcp-fields", "sip dip sport dport tcp_flags tcp_flags_str bytes", # "--histo", "sip dip sport dport", "--top", "sip dip sport dport bytes", # "--both-ways", "--export", "csv", "-w", "-" # ] # # 运行Tranalyzer2命令行,并将结果保存为CSV文件 # result = run(["sudo", "tshark", "-r", pcap_file, "-w", "-", "-F", "pcapng", "-Y", "tcp"], # stdout=PIPE, check=True) # result = run(["sudo", "tranalyzer2", "-r", "-", *tranalyzer_args], input=result.stdout, stdout=PIPE, check=True) # result_csv = result.stdout.decode() # # 解析CSV文件,并返回特征数据框 # features_df = pd.read_csv(pd.compat.StringIO(result_csv)) # return features_df # # # 加载训练数据集,并提取特征 # train_pcap = "20230309_fxy_psiphon_operation.pcapng" # train_labels = pd.read_csv("/path/to/train_labels.csv") # train_features = extract_features(train_pcap) # # # 将标签与特征合并为单个数据框 # train_data = pd.merge(train_features, train_labels, on="flow_key") # # # 分割数据集为训练集和测试集 # X_train, X_test, y_train, y_test = train_test_split(train_data.drop(["flow_key", "label"], axis=1), train_data["label"], test_size=0.3) # # # 训练KNN模型 # knn = KNeighborsClassifier(n_neighbors=5) # knn.fit(X_train, y_train) # # # 在测试集上评估模型性能 # accuracy = knn.score(X_test, y_test) # print("Accuracy:", accuracy) # # ''' import subprocess import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder from sklearn.neighbors import KNeighborsClassifier from sklearn.metrics import accuracy_score # Step 1: Install Tranalyzer2 and required Python modules # Step 2: Extract features using Tranalyzer2 pcap_file = '20230309_fxy_psiphon_operation.pcapng' output_file = 'output.csv' command = f'sudo t2 -r {pcap_file} -w {output_file} -c basic' subprocess.call(command, shell=True) # Step 3: Load features into a Pandas dataframe and convert to NumPy array data = pd.read_csv(output_file) features = np.array(data) # Step 4: Prepare the dataset X = features[:, :-1] y = features[:, -1] le = LabelEncoder() y = le.fit_transform(y) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3) # Step 5: Train the KNN model knn = KNeighborsClassifier(n_neighbors=5) knn.fit(X_train, y_train) # Step 6: Evaluate the model y_pred = knn.predict(X_test) accuracy = accuracy_score(y_test, y_pred) print("Accuracy:", accuracy) ''' import pandas as pd import numpy as np import os import glob import subprocess from sklearn.model_selection import cross_val_score, train_test_split from sklearn.svm import SVC from sklearn.metrics import accuracy_score # 提取流量特征 def extract_features(pcap_file): # 运行tranalyzer2命令行工具 command = "t2 -r {pcap_file} -w {pcap_file}.csv -f features.csv" subprocess.run(command, shell=True) # 读取特征数据 features = pd.read_csv('features.csv', skiprows=6, header=None, delimiter=';', index_col=0) # 删除无用列 features.drop([1, 2, 3, 4, 5], axis=1, inplace=True) # 重命名列名 features.columns = ['duration', 'protocol', 'src_ip', 'src_port', 'dst_ip', 'dst_port', 'packets', 'bytes', 'flows', 'flags', 'tos', 'class'] # 删除class列,因为我们不需要使用它 features.drop(['class'], axis=1, inplace=True) return features # 获取所有pcap文件 # pcap_files = [] # for file in os.listdir('.'): # if file.endswith('.pcap'): # pcap_files.append(file) folder_path = "wcx-抓包-用于模型复现" pcap_files = [] for file in glob.glob(os.path.join(folder_path, "*.pcap")): pcap_files.append(file) # 提取所有pcap文件的特征 features_list = [] for pcap_file in pcap_files: features = extract_features(pcap_file) features_list.append(features) # 将所有特征合并成一个DataFrame all_features = pd.concat(features_list) # 标准化特征数据 mean = all_features.mean() std = all_features.std() normalized_features = (all_features - mean) / std # 将标准化后的特征数据和SVM模型拟合 X_train, X_test, y_train, y_test = train_test_split(normalized_features.values, np.zeros(len(normalized_features)), test_size=0.2, random_state=42) clf = SVC() # 使用交叉验证来评估模型性能 scores = cross_val_score(clf, X_train, y_train, cv=5) print(f"Cross Validation Scores: {scores}") print(f"Mean Score: {np.mean(scores)}") print(f"Std Score: {np.std(scores)}") # 在测试集上测试并计算准确率 clf.fit(X_train, y_train) y_pred = clf.predict(X_test) accuracy = accuracy_score(y_test, y_pred) print(f"Accuracy: {accuracy}")