67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
import numpy as np
|
||
from sklearn.metrics import f1_score, precision_score, recall_score
|
||
|
||
|
||
def adjust_predicts(score, label,
|
||
threshold=None,
|
||
pred=None,
|
||
calc_latency=False):
|
||
"""
|
||
该函数是point-adjust官方源码
|
||
Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`.
|
||
Args:
|
||
score =: The anomaly score
|
||
label : The ground-truth label
|
||
threshold (float): The threshold of anomaly score.
|
||
A point is labeled as "anomaly" if its score is lower than the threshold.
|
||
pred : if not None, adjust `pred` and ignore `score` and `threshold`,
|
||
calc_latency (bool):
|
||
Returns:
|
||
np.ndarray: predict labels
|
||
"""
|
||
if len(score) != len(label):
|
||
raise ValueError("score and label must have the same length")
|
||
score = np.asarray(score)
|
||
label = np.asarray(label)
|
||
latency = 0
|
||
if pred is None:
|
||
predict = score < threshold
|
||
else:
|
||
predict = pred
|
||
actual = label > 0.1
|
||
anomaly_state = False
|
||
anomaly_count = 0
|
||
for i in range(len(score)):
|
||
if actual[i] and predict[i] and not anomaly_state:
|
||
anomaly_state = True
|
||
anomaly_count += 1
|
||
for j in range(i, 0, -1):
|
||
if not actual[j]:
|
||
break
|
||
else:
|
||
if not predict[j]:
|
||
predict[j] = True
|
||
latency += 1
|
||
elif not actual[i]:
|
||
anomaly_state = False
|
||
if anomaly_state:
|
||
predict[i] = True
|
||
if calc_latency:
|
||
return predict, latency / (anomaly_count + 1e-4)
|
||
else:
|
||
return predict
|
||
|
||
|
||
def evaluate(y_true: list, y_pred: list) -> float:
|
||
"""
|
||
F1PA评估方法,经过point adjust调整标签后再用F1评分
|
||
:param y_true: 真实标签
|
||
:param y_pred: 检测标签
|
||
:return: 经过pa调整后的f1、recall、precision
|
||
"""
|
||
y_true, y_pred = y_true.copy(), y_pred.copy()
|
||
adjust_y_pred = adjust_predicts(score=np.array([0] * len(y_true)), label=y_true, pred=y_pred)
|
||
f1 = f1_score(y_true, adjust_y_pred)
|
||
return f1
|
||
|