This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
nanwct 7e716ff740 abc
2022-05-31 21:37:21 +08:00

321 lines
10 KiB
Python

'''
Methods and classes to calculate entropies.
'''
from scipy import stats
import numpy as np
from util import *
def frequency(length_group):
return length_group / float(length_group.sum())
def weight(class_label, p=0.5):
r = float(class_label)
return p ** (r + 1)
def weighted_sizes(group):
label = group.class_label.iloc[0]
p_w = weight(label)
m_s = len(group)
return m_s * p_w
def entropy(x):
return -np.nansum(x * np.log2(x))
"""
Methods to calculate the conditional entropy H(W | S).
Start lookink at the method `cond_entropy`, that's the main entry point.
IMPORTANT: for all these methods we assume:
i) the dataset contains the same number of instances for each class
ii) the priors are uniform.
If these assumptions are met, computing the entropy can be done by counting
frequencies in the anonymity sets.
"""
def slice_as_tuples(df, position=None):
"""Get sequences up to a certain packet number."""
tmp_df = df.copy()
def _slice_tuples(lengths_list):
list_slice = lengths_list[:position]
lengths_tuple = tuple(list_slice)
return lengths_tuple
tmp_df['lengths'] = tmp_df.lengths.apply(_slice_tuples)
return tmp_df
def sliding_window(df, max_position=-1):
return [con_entropies(slice_as_tuples(df, p))
for p in xrange(max_position)]
class ProbabilityModel(object):
def __init__(self, df, l=None):
self.df = slice_as_tuples(df, l)
self.anon_sets = self.anonymity_sets(self.df)
self.posterior = self.estimate_posterior()
@staticmethod
def anonymity_sets(df):
length_groups = df.groupby('lengths')
return length_groups.class_label.apply(set)
def rank(self):
anon_sets = self.anon_sets.reset_index()
anon_sets['class_label'] = anon_sets.class_label.apply(tuple)
return AnonimitySetCounts(anon_sets)
def estimate_posterior(self, priors='uniform'):
"""Return dataframe of probabilities as estimated by frequencies."""
# group rows that have same length and class label
grouped = self.df.groupby(['lengths', 'class_label'], as_index=False)
# compute frequencies TODO: parallelize these applies
if priors == 'uniform':
# compute sizes of those groups
group_sizes = grouped.size()
elif priors == 'geometric':
# compute sizes of those groups
group_sizes = grouped.apply(weighted_sizes)
# group sizes only by having the same length
# TODO: parallelize this one?
posterior = group_sizes.groupby(level=0).apply(frequency)
return posterior
def entropy(self):
# compute entropies from probabilities for specific sequences
# that is H(W | S=s1), ..., H(W | S=s_n)
return self.posterior.groupby(level=0).apply(entropy)
def cond_entropy(self):
"""Compute conditional entropy for whole dataset."""
# get the probabilities of length sequences
prob_length = self.df.groupby('lengths').size() / len(self.df)
# compute conditional entropy
return sum(prob_length * self.entropy())
class AnonimitySetCounts():
def __init__(self, anon_sets):
self.counts = self.get(anon_sets)
@staticmethod
def get(anon_sets):
counts = anon_sets.groupby('class_label').count()
counts['list_lengths'] = anon_sets.groupby('class_label').lengths.apply(list)
counts = counts.reset_index()
counts['num_sites'] = counts.class_label.apply(len)
return counts
def filter(self, min_counts=0):
self.counts = self.counts[self.counts.num_sites > min_counts]
return self
def sort(self):
self.counts = self.counts.sort_values('lengths', ascending=False)
return self
def __str__(self):
to_display = self.counts.copy()
to_display['class_label'] = to_display.class_label.apply(lambda x: map(url, map(int, x)))
to_display['site_name_length'] = to_display.class_label.apply(lambda x: map(len, x))
with pd.option_context('max_colwidth', -1):
display(to_display)
return 'Displayed'
# Unit tests for the functions above:
def test_is_P_is_prob_distrib():
"""Values should add up to 1."""
P = get_prob_matrix(W, uS)
# The probabilities in each row in P should sum one (approx)
# the probabilities of a row of P are: P(w) * P(size_j | w)
# for the website w in that row and all j in the range of sizes.
assert P.shape == (nclasses, len(uS))
ones = np.ones(P.shape[0]).astype(float)
second_dec = ones / 100
assert (np.abs(np.sum(P, axis=1) - ones) < second_dec).all()
def test_is_B_is_prob_distrib():
"""Values should add up to 1."""
P = get_prob_matrix(W, uS)
B = np.apply_along_axis(lambda x: bayes(x, nclasses), 0, P)
# The probabilities in each column in B should sum one (approx)
# the probabilities of a column in P are: P(w_i | size)
# for the size in that column and all i in the range of websites.
assert B.shape == (nclasses, len(uS))
ones = np.ones(P.shape[1]).astype(float)
second_dec = ones / 100
assert (np.abs(np.sum(B, axis=0) - ones) < second_dec).all()
def test_bayes_method():
p_col = np.array([4, 6, 10])
# Bayes on this vector should do:
# numerator = 1 / 2 * [4, 6, 10] = [2, 3, 5]
# denominator = sum([2, 3, 5]) = 10
# B = [2, 3, 5] / 10 = [0.2, 0.3, 0.5]
B = bayes(p_col, nclasses)
ones = np.ones(p_col.shape[0]).astype(float)
second_dec = ones / 100
assert (np.abs(B - np.array([0.2, 0.3, 0.5])) < second_dec).all()
# --- test matrix
P = np.array([[0.3, 0.2, 0.5], [0.7, 0.1, 0.2]])
B = np.apply_along_axis(lambda x: bayes(x, nclasses), 0, P)
ones = np.ones(nclasses).astype(float)
second_dec = ones / 100
# Bayes on this matrix should do for first vector:
# numerator = 1 / 2 * [0.3, 0.7] = [0.15, 0.35]
# denominator = sum([0.15, 0.35]) = 0.5
# B = [0.15, 0.35] / 0.5 = [0.3, 0.7]
assert (np.abs(B[:, 0] - np.array([0.3, 0.7])) < second_dec).all()
# Bayes on this matrix should do for second vector:
# numerator = 1 / 2 * [0.2, 0.1] = [0.1, 0.05]
# denominator = sum([0.1, 0.05]) = 0.15
# B = [0.1, 0.05] / 0.15 = [0.66667, 0.33333]
assert (np.abs(B[:, 1] - np.array([0.66667, 0.33333])) < second_dec).all()
# Bayes on this matrix should do for second vector:
# numerator = 1 / 2 * [0.5, 0.2] = [0.25, 0.1]
# denominator = sum([0.25, 0.1]) = 0.35
# B = [0.25, 0.1] / 0.35 = [0.7142857, 0.285714]
assert (np.abs(B[:, 2] - np.array([0.7142857, 0.285714])) < second_dec).all()
def test_all_traces_the_same():
trace = [50, 100]
test_df = pd.DataFrame({'class_label': map(str, range(nclasses)),
'lengths': [trace] * nclasses})
test_df['num_pkts'] = test_df.lengths.apply(len)
W, uS, cS = process_sizes(position, test_df, 2, 1)
assert (W == np.array([[trace], [trace]])).all()
v = get_size_count_vector(0, W, uS)
# only one trace per site, counts should be one:
assert (v == np.array([1])).all()
P = get_prob_matrix(W, uS)
# ...this, probability of that size give that website
# should also be one
assert (P == np.array([[1], [1]])).all()
B = np.apply_along_axis(lambda x: bayes(x, nclasses), 0, P)
# Now, probability of that website given that size
# should be 0.5.
assert (B == np.array([[0.5], [0.5]])).all()
# Conditional entropy for 2 sites with identical traces
# should be 1.0.
cH = cond_entropy_from_bayes(B, cS)
assert (cH == 1.0)
def test_all_traces_different():
test_df = pd.DataFrame({'class_label': map(str, range(nclasses)),
'lengths': [[50, 100], [50, -50]]})
test_df['num_pkts'] = test_df.lengths.apply(len)
W, uS, cS = process_sizes(position, test_df, 2, 1)
assert (W == np.array([[[50, 100]], [[50, -50]]])).all()
v = get_size_count_vector(0, W, uS)
# one size should have zero counts, and the other 1 count
assert (v == np.array([0, 1])).all()
P = get_prob_matrix(W, uS)
# ...each website should have probability one for one size
# and zero for the other.
assert (P == np.array([[0, 1], [1, 0]])).all()
B = np.apply_along_axis(lambda x: bayes(x, nclasses), 0, P)
# Now, probability of that website given that size
# should be 1.
assert (B == np.array([[0, 1.], [1., 0]])).all()
# Conditional entropy for 2 sites with all traces
# different should be 0 (no uncertainty).
cH = cond_entropy_from_bayes(B, cS)
assert (cH == 0.)
def test_traces_different_length():
test_df = pd.DataFrame({'class_label': map(str, range(nclasses)),
'lengths': [[50], [50, -50]]})
test_df['num_pkts'] = test_df.lengths.apply(len)
W, uS, cS = process_sizes(position, test_df, 2, 1)
assert (W == np.array([[[50, np.inf]], [[50, -50]]])).all()
v = get_size_count_vector(0, W, uS)
# one size should have zero counts, and the other 1 count
assert (v == np.array([0, 1])).all()
P = get_prob_matrix(W, uS)
# ...each website should have probability one for one size
# and zero for the other.
assert (P == np.array([[0, 1], [1, 0]])).all()
B = np.apply_along_axis(lambda x: bayes(x, nclasses), 0, P)
# Now, probability of that website given that size
# should be 1.
assert (B == np.array([[0, 1.], [1., 0]])).all()
# Conditional entropy for 2 sites with all traces
# different should be 0 (no uncertainty).
cH = cond_entropy_from_bayes(B, cS)
assert (cH == 0.)
def test_two_real_sites():
# Facebook (2) and New York Times (126)
test_df = df[df.class_label.isin(['2', '126'])].copy()
# take only the 5 first lengths
position = 5
test_df['lengths'] = test_df.lengths.apply(lambda x: x[:position])
test_df['num_pkts'] = test_df.lengths.apply(len)
# verify that the intersection is empty
fb = set(test_df[test_df.class_label == '2'].lengths.apply(tuple).tolist())
ny = set(test_df[test_df.class_label == '126'].lengths.apply(tuple).tolist())
assert len(fb.intersection(ny)) == 0
# since intersection is zero, entropy should be zero
H = cond_entropy(test_df)
assert H == 0
def run_tests():
test_bayes_method()
test_is_P_is_prob_distrib()
test_is_B_is_prob_distrib()
test_all_traces_the_same()
test_all_traces_different()
test_traces_different_length()
test_two_real_sites()
if __name__ == "__main__":
run_tests()