319 lines
14 KiB
Python
319 lines
14 KiB
Python
import os
|
||
import numpy
|
||
import torch.nn as nn
|
||
import torch
|
||
import torch.utils.data as tud
|
||
import preprocess
|
||
import evaluation
|
||
from loguru import logger
|
||
import time
|
||
from tqdm import tqdm
|
||
|
||
|
||
def create_dataloader(dataset_name: str, input_size: int = 1, output_size: int = 1, step: int = 1, batch_size: int = 1,
|
||
time_index: bool = True, del_column_name: bool = True,
|
||
preprocess_name: str = "standardization") -> (tud.DataLoader, tud.DataLoader):
|
||
"""
|
||
针对一个数据集,构建Dataloader
|
||
:param dataset_name: 数据集名称
|
||
:param input_size: 输入数据长度
|
||
:param output_size: 输出数据长度
|
||
:param step: 截取数据的窗口移动间隔
|
||
:param batch_size: batch的大小
|
||
:param time_index: True为第一列是时间戳,False为不
|
||
:param del_column_name: 文件中第一行为列名时,使用True
|
||
:param preprocess_name: 预处理方法
|
||
:return: 训练数据与测试数据
|
||
"""
|
||
ds = eval(f"preprocess.{preprocess_name}.MyDataset")(name=dataset_name.replace("/", "-"),
|
||
train_path=f"./dataset/{dataset_name}/train.csv",
|
||
test_path=f"./dataset/{dataset_name}/test.csv",
|
||
input_size=input_size, output_size=output_size,
|
||
step=step, time_index=time_index,
|
||
del_column_name=del_column_name)
|
||
normal_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=True)
|
||
ds.mode = "test"
|
||
attack_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=False)
|
||
return normal_dl, attack_dl
|
||
|
||
|
||
def create_all_dataloader(datasets: [str], input_size: int = 1, output_size: int = 1, step: int = 1,
|
||
batch_size: int = 1, time_index: bool = True, del_column_name: bool = True,
|
||
preprocess_name: str = "standardization") -> [{}]:
|
||
"""
|
||
对所有数据集构建dataloader
|
||
:param datasets: 数据集列表
|
||
:param input_size: 输入数据长度
|
||
:param output_size: 输出数据长度
|
||
:param step: 截取数据的窗口移动间隔
|
||
:param batch_size: batch的大小
|
||
:param time_index: True为第一列是时间戳,False为不。
|
||
:param del_column_name: 文件中第一行为列名时,使用True
|
||
:param preprocess_name: 预处理方法
|
||
:return: 所有数据集的dataloader构建结果
|
||
"""
|
||
all_dataloader = []
|
||
for dataset_name in datasets:
|
||
logger.info(f'开始建立 dataloader {dataset_name}')
|
||
if "train.csv" in os.listdir(f"./dataset/{dataset_name}"):
|
||
normal_dl, attack_dl = create_dataloader(dataset_name=dataset_name, input_size=input_size,
|
||
output_size=output_size, step=step, batch_size=batch_size,
|
||
time_index=time_index, del_column_name=del_column_name,
|
||
preprocess_name=preprocess_name)
|
||
all_dataloader.append([{
|
||
'dataset_name': dataset_name,
|
||
'normal': normal_dl,
|
||
'attack': attack_dl
|
||
}])
|
||
else:
|
||
all_sub_dataloader = []
|
||
for sub_dataset_dir in os.listdir(f"./dataset/{dataset_name}"):
|
||
sub_dataset_name = f"{dataset_name}/{sub_dataset_dir}"
|
||
normal_dl, attack_dl = create_dataloader(dataset_name=sub_dataset_name, input_size=input_size,
|
||
output_size=output_size, step=step, batch_size=batch_size,
|
||
del_time=del_time, del_column_name=del_column_name)
|
||
all_sub_dataloader.append({
|
||
'dataset_name': sub_dataset_name.replace("/", "-"),
|
||
'normal': normal_dl,
|
||
'attack': attack_dl
|
||
})
|
||
all_dataloader.append(all_sub_dataloader)
|
||
|
||
logger.info(f'完成建立 dataloader {dataset_name}')
|
||
return all_dataloader
|
||
|
||
|
||
class EvaluationScore:
|
||
def __init__(self, evaluations: [str], attack=1):
|
||
"""
|
||
用于自动划分阈值并进行批量评估
|
||
:param evaluations: 使用的评估方法名称(需在evaluation文件夹中进行定义)
|
||
:param attack: 异常的标签,0 or 1
|
||
"""
|
||
self.time = 0
|
||
self.f1 = 0
|
||
self.f1_pa = 0
|
||
self.f_tad = 0
|
||
self.attack = attack
|
||
self.normal = 1 - attack
|
||
self._total_y_loss = None
|
||
self._total_label = None
|
||
self._total_pred_label = None
|
||
self.true_pred_df = None
|
||
self.true_pred_dict = None
|
||
self.evaluations = evaluations
|
||
self.scores = {}
|
||
|
||
def add_data(self, y_loss, true_label, pred_label=None):
|
||
"""
|
||
添加每个batch的数据
|
||
:param y_loss: 数据偏差
|
||
:param true_label: 真实数据标签
|
||
:param pred_label: 预测标签
|
||
"""
|
||
if pred_label is not None:
|
||
if self._total_label is None and self._total_pred_label is None:
|
||
self._total_label = true_label
|
||
self._total_pred_label = pred_label
|
||
else:
|
||
self._total_label = torch.cat([self._total_label, true_label], dim=0)
|
||
self._total_pred_label = torch.cat([self._total_pred_label, pred_label], dim=0)
|
||
return
|
||
|
||
y_loss = y_loss.view(-1).cpu().detach().numpy()
|
||
true_label = true_label.view(-1).cpu().detach().numpy()
|
||
|
||
if self._total_y_loss is None and self._total_label is None:
|
||
self._total_y_loss = y_loss
|
||
self._total_label = true_label
|
||
return
|
||
self._total_y_loss = numpy.concatenate((self._total_y_loss, y_loss), axis=0)
|
||
self._total_label = numpy.concatenate((self._total_label, true_label), axis=0)
|
||
|
||
def best_threshold(self, true_label: list, y_loss: list) -> dict:
|
||
ret = {}
|
||
for func_name in self.evaluations:
|
||
threshold_max = max(y_loss)
|
||
threshold_min = 0
|
||
best_threshold = 0
|
||
for _ in range(5):
|
||
threshold_list = [threshold_max - i * (threshold_max - threshold_min) / 10 for i in range(11)]
|
||
f1_list = []
|
||
for threshold_one in threshold_list:
|
||
prediction_loss = numpy.where(numpy.array(y_loss) > threshold_one, self.attack, self.normal)
|
||
f1 = eval(f"evaluation.{func_name}.evaluate")(y_true=true_label, y_pred=prediction_loss.tolist())
|
||
f1_list.append(f1)
|
||
ind = f1_list.index(max(f1_list))
|
||
best_threshold = threshold_list[ind]
|
||
if ind == 0:
|
||
threshold_max = threshold_list[ind]
|
||
threshold_min = threshold_list[ind+1]
|
||
elif ind == len(threshold_list)-1:
|
||
threshold_max = threshold_list[ind-1]
|
||
threshold_min = threshold_list[ind]
|
||
else:
|
||
threshold_max = threshold_list[ind-1]
|
||
threshold_min = threshold_list[ind+1]
|
||
ret[func_name] = best_threshold
|
||
return ret
|
||
|
||
def auto_threshold(self):
|
||
if self._total_pred_label is not None:
|
||
return
|
||
self._total_y_loss[numpy.isnan(self._total_y_loss)] = 0
|
||
self._total_y_loss = self._total_y_loss / max(self._total_y_loss)
|
||
thresholds = self.best_threshold(
|
||
self._total_label.reshape(-1).data.tolist(), self._total_y_loss.reshape(-1).data.tolist())
|
||
self.true_pred_dict = {
|
||
'true': self._total_label.squeeze().tolist()
|
||
}
|
||
for func_name in thresholds:
|
||
self.true_pred_dict[func_name] = \
|
||
numpy.where(self._total_y_loss > thresholds[func_name], self.attack, self.normal).squeeze().tolist()
|
||
# self.true_pred_df = pandas.DataFrame(self.true_pred_dict)
|
||
for func_name in self.true_pred_dict:
|
||
if func_name == "true":
|
||
continue
|
||
self.scores[func_name] = self.get_score(func_name)
|
||
|
||
def get_score(self, func_name):
|
||
if self._total_pred_label is not None:
|
||
return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
|
||
self._total_pred_label.reshape(-1).tolist())
|
||
return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
|
||
self.true_pred_dict[f"{func_name}"])
|
||
|
||
def __str__(self):
|
||
res = ""
|
||
for func_name in self.scores:
|
||
res += f"{func_name}={self.scores[func_name]:.3f} "
|
||
return res[:-1]
|
||
|
||
|
||
def train_model(epoch: int, optimizer: torch.optim, dataloader: tud.DataLoader, model: nn.Module,
|
||
device: str = "cpu") -> (nn.Module, str):
|
||
"""
|
||
训练模型
|
||
:param epoch: 当前训练轮数
|
||
:param optimizer: 优化器
|
||
:param dataloader: 数据集
|
||
:param model: 模型
|
||
:param device: 训练设备使用cpu还是gpu
|
||
:return: 训练完成的模型;训练完成需要输出的信息
|
||
"""
|
||
model.train()
|
||
avg_loss = []
|
||
dataloader.dataset.mode = "train"
|
||
start_time = time.time()
|
||
with tqdm(total=len(dataloader), ncols=150) as _tqdm:
|
||
_tqdm.set_description(f'(进度条部分不会写进本地日志)epoch:{epoch},训练进度')
|
||
for data in dataloader:
|
||
x = data[0].to(device)
|
||
y_true = data[2].to(device)
|
||
optimizer.zero_grad()
|
||
loss = model.loss(x=x, y_true=y_true, epoch=epoch, device=device)
|
||
avg_loss.append(loss)
|
||
optimizer.step()
|
||
_tqdm.set_postfix(loss='{:.6f}'.format(sum(avg_loss) / len(avg_loss)))
|
||
_tqdm.update(1)
|
||
end_time = time.time()
|
||
info = f"epoch={epoch}, average loss={'{:.6f}'.format(sum(avg_loss) / len(avg_loss))}, " \
|
||
f"train time={'{:.1f}'.format(end_time-start_time)}s"
|
||
return model, info
|
||
|
||
|
||
def test_model(dataloader: tud.DataLoader, model: nn.Module, evaluations: [str], device: str = "cpu") -> \
|
||
(EvaluationScore, str):
|
||
"""
|
||
测试模型
|
||
:param dataloader: 数据集
|
||
:param model: 模型
|
||
:param device: 训练设备使用cpu还是gpu
|
||
:return: 评估分数;测试完成需要输出的信息
|
||
"""
|
||
es = EvaluationScore(evaluations)
|
||
model.eval()
|
||
dataloader.dataset.mode = "test"
|
||
start_time = time.time()
|
||
with tqdm(total=len(dataloader), ncols=150) as _tqdm:
|
||
_tqdm.set_description(f'(进度条部分不会写进本地日志)测试进度')
|
||
with torch.no_grad():
|
||
for data in dataloader:
|
||
x = data[0].to(device)
|
||
y_true = data[2].to(device)
|
||
label_true = data[1].int().to(device)
|
||
y_loss, label_pred = model.detection(x=x, y_true=y_true, device=device)
|
||
if label_pred is not None:
|
||
es.add_data(y_loss=None, true_label=label_true, pred_label=label_pred)
|
||
else:
|
||
es.add_data(y_loss=y_loss, true_label=label_true, pred_label=None)
|
||
_tqdm.update(1)
|
||
end_time = time.time()
|
||
es.auto_threshold()
|
||
es_score = es.__str__().replace(" ", ", ")
|
||
info = f"{es_score}, test time={'{:.1f}'.format(end_time-start_time)}s"
|
||
return es, info
|
||
|
||
|
||
def train_and_test_model(start_time: str, epochs: int, normal_dataloader: tud.DataLoader, attack_dataloader: tud.DataLoader,
|
||
model: nn.Module, evaluations: [str], device: str = "cpu", lr: float = 1e-4,
|
||
model_path: str = None, train: bool = True) -> (dict, dict):
|
||
"""
|
||
训练与测试
|
||
:param start_time: 实验的开始时间。此处用于寻找存放路径。
|
||
:param epochs: 总共训练轮数
|
||
:param normal_dataloader: 训练数据集
|
||
:param attack_dataloader: 测试数据集
|
||
:param model: 模型
|
||
:param evaluations: 评估方法
|
||
:param device: 设备
|
||
:param lr: 学习率
|
||
:param model_path: 模型参数文件路径
|
||
:param train: 是否训练,如果为否,则仅进行测试
|
||
:return: 各个评估方法的最佳分数、各个评估方法最佳情况下的检测标签
|
||
"""
|
||
dataset_name = normal_dataloader.dataset.name
|
||
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
|
||
if model_path:
|
||
try:
|
||
checkpoint = torch.load(model_path)
|
||
model.load_state_dict(checkpoint['model_state_dict'])
|
||
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
|
||
logger.info(f"模型参数文件{model_path}加载成功")
|
||
except:
|
||
logger.warning(f"模型参数文件{model_path}加载失败,将训练新模型")
|
||
logger.info(f"模型:{model.name},数据集:{dataset_name},设备:{device},训练开始")
|
||
best_score = {}
|
||
best_detection = {}
|
||
if train:
|
||
logger.info(f"模式:训练并测试")
|
||
for epoch in range(1, epochs+1):
|
||
model, train_info = train_model(epoch=epoch, optimizer=optimizer, dataloader=normal_dataloader, model=model,
|
||
device=device)
|
||
es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations,
|
||
device=device)
|
||
logger.info(f"{train_info}, {test_info}")
|
||
es_score = es.__str__().replace(" ", "_")
|
||
torch.save({
|
||
'model_state_dict': model.state_dict(),
|
||
'optimizer_state_dict': optimizer.state_dict()
|
||
}, f'./records/{start_time}/model/model={model.name}_dataset={dataset_name}_epoch={epoch}_{es_score}.pth')
|
||
for func_name in es.scores:
|
||
if func_name not in best_score or es.scores[func_name] > best_score[func_name]:
|
||
best_score[func_name] = es.scores[func_name]
|
||
best_detection[func_name] = es.true_pred_dict[func_name]
|
||
best_detection["true"] = es.true_pred_dict["true"]
|
||
else:
|
||
logger.info(f"模式:仅进行测试")
|
||
es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations, device=device)
|
||
logger.info(test_info)
|
||
for func_name in es.scores:
|
||
best_score[func_name] = es.scores[func_name]
|
||
best_detection[func_name] = es.true_pred_dict[func_name]
|
||
best_detection["true"] = es.true_pred_dict["true"]
|
||
return best_score, best_detection
|
||
|
||
|
||
|
||
|