Add dablooms

This commit is contained in:
luwenpeng
2023-12-29 11:39:49 +08:00
parent b353c9c824
commit d47efe6aad
9 changed files with 1321 additions and 1 deletions

View File

@@ -2,4 +2,5 @@ add_subdirectory(timestamp)
add_subdirectory(tuple)
add_subdirectory(packet)
add_subdirectory(session)
add_subdirectory(stellar)
add_subdirectory(stellar)
add_subdirectory(dablooms)

View File

@@ -0,0 +1,9 @@
###############################################################################
# dablooms
###############################################################################
add_library(dablooms dablooms.cpp murmur.cpp)
target_include_directories(dablooms PUBLIC ${CMAKE_SOURCE_DIR}/src/dablooms)
target_link_libraries(dablooms)
add_subdirectory(test)

259
src/dablooms/README.md Normal file
View File

@@ -0,0 +1,259 @@
Dablooms: A Scalable, Counting, Bloom Filter
----------------------------------
_Note_: this project has been mostly unmaintained for a while.
### Overview
This project aims to demonstrate a novel Bloom filter implementation that can
scale, and provide not only the addition of new members, but reliable removal
of existing members.
Bloom filters are a probabilistic data structure that provide space-efficient
storage of elements at the cost of possible false positive on membership
queries.
**dablooms** implements such a structure that takes additional metadata to classify
elements in order to make an intelligent decision as to which Bloom filter an element
should belong.
### Features
**dablooms**, in addition to the above, has several features.
* Implemented as a static C library
* Memory mapped
* 4 bit counters
* Sequence counters for clean/dirty checks
* Python wrapper
For performance, the low-level operations are implemented in C. It is also
memory mapped which provides async flushing and persistence at low cost.
In an effort to maintain memory efficiency, rather than using integers, or
even whole bytes as counters, we use only four bit counters. These four bit
counters allow up to 15 items to share a counter in the map. If more than a
small handful are sharing said counter, the Bloom filter would be overloaded
(resulting in excessive false positives anyway) at any sane error rate, so
there is no benefit in supporting larger counters.
The Bloom filter also employs change sequence numbers to track operations performed
on the Bloom filter. These allow the application to determine if a write might have
only partially completed (perhaps due to a crash), leaving the filter in an
inconsistent state. The application can thus determine if a filter is ok or needs
to be recreated. The sequence number can be used to determine what a consistent but
out-of-date filter missed, and bring it up-to-date.
There are two sequence numbers (and helper functions to get them): "mem_seqnum" and
"disk_seqnum". The "mem" variant is useful if the user is sure the OS didn't crash,
and the "disk" variant is useful if the OS might have crashed since the Bloom filter
was last changed. Both values could be "0", meaning the filter is possibly
inconsistent from their point of view, or a non-zero sequence number that the filter
is consistent with. The "mem" variant is often non-zero, but the "disk" variant only
becomes non-zero right after a (manual) flush. This can be expensive (it's an fsync),
so the value can be ignored if not relevant for the application. For example, if the
Bloom file exists in a directory which is cleared at boot (like `/tmp`), then the
application can safely assume that any existing file was not affected by an OS crash,
and never bother to flush or check disk_seqnum. Schemes involving batching up changes
are also possible.
The dablooms library is not inherently thread safe, this is the clients responsibility.
Bindings are also not thread safe, unless they state otherwise.
### Installing
Clone the repo, or download and extract a tarball of a tagged version
[from github](https://github.com/bitly/dablooms/tags).
In the source tree, type `make`, `make install` (`sudo` may be needed).
This will only install static and dynamic versions of the C dablooms library "libdablooms".
To use a specific build directory, install prefix, or destination directory for packaging,
specify `BLDDIR`, `prefix`, or `DESTDIR` to make. For example:
`make install BLDDIR=/tmp/dablooms/bld DESTDIR=/tmp/dablooms/pkg prefix=/usr`
Look at the output of `make help` for more options and targets.
Also available are bindings for various other languages:
#### Python (pydablooms)
To install the Python bindings "pydablooms" (currently only compatibly with python 2.x)
run `make pydablooms`, `make install_pydablooms` (`sudo` may be needed).
To use and install for a specific version of Python installed on your system,
use the `PYTHON` option to make. For example: `make install_pydablooms PYTHON=python2.7`.
You can override the module install location with the `PY_MOD_DIR` option to make,
and the `BLDDIR` and `DESTDIR` options also affect pydablooms.
The Makefile attempts to determine the python module location `PY_MOD_DIR`
automatically. It prefers a location in `/usr/local`, but you can specify
`PY_MOD_DIR_ARG=--user` to try to use the location which `pip install --user`
would use in your HOME dir. You can instead specify `PY_MOD_DIR_ARG=--system`
to prefer the normal/central system python module dir.
See pydablooms/README.md for more info.
#### Go (godablooms)
The Go bindings "godablooms" are not integrated into the Makefile.
Install libdablooms first, then look at `godablooms/README.md`
### Contributing
If you make changes to C portions of dablooms which you would like merged into the
upstream repository, it would help to have your code match our C coding style. We use
[astyle](http://astyle.sourceforge.net/), svn rev 353 or later, on our code, with the
following options:
astyle --style=1tbs --lineend=linux --convert-tabs --preserve-date \
--fill-empty-lines --pad-header --indent-switches \
--align-pointer=name --align-reference=name --pad-oper -n <files>
### Testing
To run a quick and dirty test, type `make test`. This test uses a list of words
and defaults to `/usr/share/dict/words`. If your path differs, you can use the
`WORDS` flag to specific its location, such as `make test WORDS=/usr/dict/words`.
This will run a simple test that iterates through a word list and
adds each word to dablooms. It iterates again, removing every fifth
element. Lastly, it saves the file, opens a new filter, and iterates a third time
checking the existence of each word. It prints results of the true negatives,
false positives, true positives, and false negatives, and the false positive rate.
The false positive rate is calculated by "false positives / (false positivies + true negatives)".
That is, what rate of real negatives are false positives. This is the interesting
statistic because the rate of false negatives should always be zero.
The test uses a maximum error rate of .05 (5%) and an initial capacity of 100k. If
the dictionary is near 500k, we should have created 4 new filters in order to scale to size.
A second test adds every other word in the list, and removes no words, causing each
used filter to stay at maximum capacity, which is a worse case for accuracy.
Check out the performance yourself, and checkout the size of the resulting file!
## Bloom Filter Basics
Bloom filters are probabilistic data structures that provide
space-efficient storage of elements at the cost of occasional false positives on
membership queries, i.e. a Bloom filter may state true on query when it in fact does
not contain said element. A Bloom filter is traditionally implemented as an array of
`M` bits, where `M` is the size of the Bloom filter. On initialization all bits are
set to zero. A filter is also parameterized by a constant `k` that defines the number
of hash functions used to set and test bits in the filter. Each hash function should
output one index in `M`. When inserting an element `x` into the filter, the bits
in the `k` indices `h1(x), h2(x), ..., hk(X)` are set.
In order to query a Bloom filter, say for element `x`, it suffices to verify if
all bits in indices `h1(x), h2(x), ..., hk(x)` are set. If one or more of these
bits is not set then the queried element is definitely not present in the
filter. However, if all these bits are set, then the element is considered to
be in the filter. Given this procedure, an error probability exists for positive
matches, since the tested indices might have been set by the insertion of other
elements.
### Counting Bloom Filters: Solving Removals
The same property that results in false positives *also* makes it
difficult to remove an element from the filter as there is no
easy means of discerning if another element is hashed to the same bit.
Unsetting a bit that is hashed by multiple elements can cause **false
negatives**. Using a counter, instead of a bit, can circumvent this issue.
The bit can be incremented when an element is hashed to a
given location, and decremented upon removal. Membership queries rely on whether a
given counter is greater than zero. This reduces the exceptional
space-efficiency provided by the standard Bloom filter.
### Scalable Bloom Filters: Solving Scale
Another important property of a Bloom filter is its linear relationship between size
and storage capacity. If the maximum allowable error probability and the number of elements to store
are both known, it is relatively straightforward to dimension an appropriate
filter. However, it is not always possible to know how many elements
will need to be stored a priori. There is a trade off between over-dimensioning filters or
suffering from a ballooning error probability as it fills.
Almeida, Baquero, Preguiça, Hutchison published a paper in 2006, on
[Scalable Bloom Filters](http://www.sciencedirect.com/science/article/pii/S0020019006003127),
which suggested a means of scalable Bloom filters by creating essentially
a list of Bloom filters that act as one large Bloom filter. When greater
capacity is desired, a new filter is added to the list.
Membership queries are conducted on each filter with the positives
evaluated if the element is found in any one of the filters. Naively, this
leads to an increasing compounding error probability since the probability
of the given structure evaluates to:
1 - 𝚺(1 - P)
It is possible to bound this error probability by adding a reducing tightening
ratio, `r`. As a result, the bounded error probability is represented as:
1 - 𝚺(1 - P0 * r^i) where r is chosen as 0 < r < 1
Since size is simply a function of an error probability and capacity, any
array of growth functions can be applied to scale the size of the Bloom filter
as necessary. We found it sufficient to pick .9 for `r`.
## Problems with Mixing Scalable and Counting Bloom Filters
Scalable Bloom filters do not allow for the removal of elements from the filter.
In addition, simply converting each Bloom filter in a scalable Bloom filter into
a counting filter also poses problems. Since an element can be in any filter, and
Bloom filters inherently allow for false positives, a given element may appear to
be in two or more filters. If an element is inadvertently removed from a filter
which did not contain it, it would introduce the possibility of **false negatives**.
If however, an element can be removed from the correct filter, it maintains
the integrity of said filter, i.e. prevents the possibility of false negatives. Thus,
a scaling, counting, Bloom filter is possible if upon additions and deletions
one can correctly decide which Bloom filter contains the element.
There are several advantages to using a Bloom filter. A Bloom filter gives the
application cheap, memory efficient set operations, with no actual data stored
about the given element. Rather, Bloom filters allow the application to test,
with some given error probability, the membership of an item. This leads to the
conclusion that the majority of operations performed on Bloom filters are the
queries of membership, rather than the addition and removal of elements. Thus,
for a scaling, counting, Bloom filter, we can optimize for membership queries at
the expense of additions and removals. This expense comes not in performance,
but in the addition of more metadata concerning an element and its relation to
the Bloom filter. With the addition of some sort of identification of an
element, which does not need to be unique as long as it is fairly distributed, it
is possible to correctly determine which filter an element belongs to, thereby able
to maintain the integrity of a given Bloom filter with accurate additions
and removals.
## Enter dablooms
dablooms is one such implementation of a scaling, counting, Bloom filter that takes
additional metadata during additions and deletions in the form of a (generally)
monotonically increasing integer to classify elements (possibly a timestamp).
This is used during additions/removals to easily determine the correct Bloom filter
for an element (each filter is assigned a range). Checking an item against the Bloom
filter, which is assumed to be the dominant activity, does not use the id (it works
like a normal scaling Bloom filter).
dablooms is designed to scale itself using these identifiers and the given capacity.
When a Bloom filter is at capacity, dablooms will create a new Bloom filter which
starts at the next id after the greatest id of the previous Bloom filter. Given the
fact that the identifiers monotonically increase, new elements will be added to the
newest Bloom filter. Note, in theory and as implemented, nothing prevents one from
adding an element to any "older" filter. You just run the increasing risk of the
error probability growing beyond the bound as it becomes "overfilled".
You can then remove any element from any Bloom filter using the identifier to intelligently
pick which Bloom filter to remove from. Consequently, as you continue to remove elements
from Bloom filters that you are not continuing to add to, these Bloom filters will become
more accurate.
The "id" of an element does not need to be known to check the Bloom filter, but does need
to be known when the element is removed (and the same as when it was added). This might
be convenient if the item already has an appropriate id (almost always increasing for new
items) associated with it.
### Example use case
There is a database with a collection of entries. There is a series of items, each of which
you want to look up in the database; most will have no entry in the database, but some
will. Perhaps it's a database of spam links. If you use dablooms in front of the database,
you can avoid needing to check the database for almost all items which won't be found in
it anyway, and save a lot of time and effort. It's also much easier to distribute the
Bloom filter than the entire database. But to make it work, you need to determine an "id"
whenever you add to or remove from the Bloom filter. You could store the timestamp when
you add the item to the database as another column in the database, and give it to
`scaling_bloom_add()` as well. When you remove the item, you look it up in the database
first and pass the timestamp stored there to `scaling_bloom_remove()`. The timestamps for
new items will be equal or greater, and definitely greater over time. Instead of
timestamps, you could also use an auto-incrementing index. Checks against the Bloom
don't need to know the id and should be quick. If a check comes back negative, you can be
sure the item isn't in the database, and skip that query completely. If a check comes
back positive, you have to query the database, because there's a slight chance that the
item isn't actually in there.

705
src/dablooms/dablooms.cpp Normal file
View File

@@ -0,0 +1,705 @@
/* Copyright @2012 by Justin Hines at Bitly under a very liberal license. See LICENSE in the source distribution. */
#include <sys/stat.h>
#include <stdint.h>
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <fcntl.h>
#include <math.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include "murmur.h"
#include "dablooms.h"
#define DABLOOMS_VERSION "0.9.1"
#define ERROR_TIGHTENING_RATIO 0.5
#define SALT_CONSTANT 0x97c29b3a
const char *dablooms_version(void)
{
return DABLOOMS_VERSION;
}
void free_bitmap(bitmap_t *bitmap)
{
#if 0
if ((munmap(bitmap->array, bitmap->bytes)) < 0) {
perror("Error, unmapping memory");
}
#else
free(bitmap->array);
#endif
free(bitmap);
}
bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
{
#if 0
/* resize if mmap exists and possible on this os, else new mmap */
if (bitmap->array != NULL) {
#if __linux
bitmap->array = mremap(bitmap->array, old_size, new_size, MREMAP_MAYMOVE);
if (bitmap->array == MAP_FAILED) {
perror("Error resizing mmap");
free_bitmap(bitmap);
return NULL;
}
#else
if (munmap(bitmap->array, bitmap->bytes) < 0) {
perror("Error unmapping memory");
free_bitmap(bitmap);
return NULL;
}
bitmap->array = NULL;
#endif
}
if (bitmap->array == NULL) {
bitmap->array = mmap(NULL, new_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
if (bitmap->array == MAP_FAILED) {
perror("Error init mmap");
free_bitmap(bitmap);
return NULL;
}
}
#else
if (bitmap->array != NULL)
{
bitmap->array = (char *)realloc(bitmap->array, new_size);
if (bitmap->array == NULL)
{
perror("Error resizing memory");
free_bitmap(bitmap);
return NULL;
}
memset(bitmap->array + old_size, 0, new_size - old_size);
}
else
{
bitmap->array = (char *)malloc(new_size);
if (bitmap->array == NULL)
{
perror("Error init memory");
free_bitmap(bitmap);
return NULL;
}
memset(bitmap->array, 0, new_size);
}
#endif
bitmap->bytes = new_size;
return bitmap;
}
/* Create a new bitmap, not full featured, simple to give
* us a means of interacting with the 4 bit counters */
bitmap_t *new_bitmap(size_t bytes)
{
bitmap_t *bitmap;
if ((bitmap = (bitmap_t *)malloc(sizeof(bitmap_t))) == NULL)
{
return NULL;
}
bitmap->bytes = bytes;
bitmap->array = NULL;
if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL)
{
return NULL;
}
return bitmap;
}
int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
uint8_t temp;
__builtin_prefetch(&(bitmap->array[access]), 0, 1);
uint8_t n = bitmap->array[access];
if (index % 2 != 0)
{
temp = (n & 0x0f);
n = (n & 0xf0) + ((n & 0x0f) + 0x01);
}
else
{
temp = (n & 0xf0) >> 4;
n = (n & 0x0f) + ((n & 0xf0) + 0x10);
}
if (temp == 0x0f)
{
// fprintf(stderr, "Error, 4 bit int Overflow\n");
return -1;
}
__builtin_prefetch(&(bitmap->array[access]), 1, 1);
bitmap->array[access] = n;
return 0;
}
/* increments the four bit counter */
int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
uint8_t temp;
uint8_t n = bitmap->array[access];
if (index % 2 != 0)
{
temp = (n & 0x0f);
n = (n & 0xf0) + ((n & 0x0f) - 0x01);
}
else
{
temp = (n & 0xf0) >> 4;
n = (n & 0x0f) + ((n & 0xf0) - 0x10);
}
if (temp == 0x00)
{
// fprintf(stderr, "Error, Decrementing zero\n");
return -1;
}
bitmap->array[access] = n;
return 0;
}
/* decrements the four bit counter */
int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
if (index % 2 != 0)
{
return bitmap->array[access] & 0x0f;
}
else
{
return bitmap->array[access] & 0xf0;
}
}
int bitmap_flush(bitmap_t *bitmap)
{
#if 0
if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) {
perror("Error, flushing bitmap to disk");
return -1;
} else {
return 0;
}
#else
return 0;
#endif
}
/*
* Perform the actual hashing for `key`
*
* Only call the hash once to get a pair of initial values (h1 and
* h2). Use these values to generate all hashes in a quick loop.
*
* See paper by Kirsch, Mitzenmacher [2006]
* http://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
*/
void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_t *hashes)
{
uint32_t checksum[4];
MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum);
uint32_t h1 = checksum[0];
uint32_t h2 = checksum[1];
for (size_t i = 0; i < bloom->nfuncs; i++)
{
hashes[i] = (h1 + i * h2) % bloom->counts_per_func;
}
}
int free_counting_bloom(counting_bloom_t *bloom)
{
if (bloom != NULL)
{
free(bloom->hashes);
bloom->hashes = NULL;
free_bitmap(bloom->bitmap);
free(bloom);
bloom = NULL;
}
return 0;
}
counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset)
{
counting_bloom_t *bloom;
if ((bloom = (counting_bloom_t *)malloc(sizeof(counting_bloom_t))) == NULL)
{
fprintf(stderr, "Error, could not realloc a new bloom filter\n");
return NULL;
}
bloom->bitmap = NULL;
bloom->capacity = capacity;
bloom->error_rate = error_rate;
bloom->offset = offset + sizeof(counting_bloom_header_t);
bloom->nfuncs = (int)ceil(log(1 / error_rate) / log(2));
bloom->counts_per_func = (int)ceil(capacity * fabs(log(error_rate)) / (bloom->nfuncs * pow(log(2), 2)));
bloom->size = bloom->nfuncs * bloom->counts_per_func;
/* rounding-up integer divide by 2 of bloom->size */
bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t);
bloom->hashes = (uint32_t *)calloc(bloom->nfuncs, sizeof(uint32_t));
return bloom;
}
counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate)
{
counting_bloom_t *cur_bloom;
cur_bloom = counting_bloom_init(capacity, error_rate, 0);
cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes);
cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array);
return cur_bloom;
}
int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
{
unsigned int index, offset;
unsigned int *hashes = bloom->hashes;
hash_func(bloom, s, len, hashes);
for (size_t i = 0; i < bloom->nfuncs; i++)
{
offset = i * bloom->counts_per_func;
index = hashes[i] + offset;
bitmap_increment(bloom->bitmap, index, bloom->offset);
}
bloom->header->count++;
return 0;
}
int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len)
{
unsigned int index, offset;
unsigned int *hashes = bloom->hashes;
hash_func(bloom, s, len, hashes);
for (size_t i = 0; i < bloom->nfuncs; i++)
{
offset = i * bloom->counts_per_func;
index = hashes[i] + offset;
bitmap_decrement(bloom->bitmap, index, bloom->offset);
}
bloom->header->count--;
return 0;
}
int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
{
unsigned int index, offset;
unsigned int *hashes = bloom->hashes;
hash_func(bloom, s, len, hashes);
for (size_t i = 0; i < bloom->nfuncs; i++)
{
offset = i * bloom->counts_per_func;
index = hashes[i] + offset;
if (!(bitmap_check(bloom->bitmap, index, bloom->offset)))
{
return 0;
}
}
return 1;
}
int free_scaling_bloom(scaling_bloom_t *bloom)
{
int i;
for (i = bloom->num_blooms - 1; i >= 0; i--)
{
free(bloom->blooms[i]->hashes);
bloom->blooms[i]->hashes = NULL;
free(bloom->blooms[i]);
bloom->blooms[i] = NULL;
}
free(bloom->blooms);
free_bitmap(bloom->bitmap);
free(bloom);
return 0;
}
/* creates a new counting bloom filter from a given scaling bloom filter, with count and id */
counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom)
{
long offset;
double error_rate;
counting_bloom_t *cur_bloom;
error_rate = bloom->error_rate * (pow(ERROR_TIGHTENING_RATIO, bloom->num_blooms + 1));
if ((bloom->blooms = (counting_bloom_t **)realloc(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *))) == NULL)
{
fprintf(stderr, "Error, could not realloc a new bloom filter\n");
return NULL;
}
cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes);
bloom->blooms[bloom->num_blooms] = cur_bloom;
bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes);
/* reset header pointer, as mmap may have moved */
bloom->header = (scaling_bloom_header_t *)bloom->bitmap->array;
/* Set the pointers for these header structs to the right location since mmap may have moved */
bloom->num_blooms++;
for (unsigned int i = 0; i < bloom->num_blooms; i++)
{
offset = bloom->blooms[i]->offset - sizeof(counting_bloom_header_t);
bloom->blooms[i]->header = (counting_bloom_header_t *)(bloom->bitmap->array + offset);
}
bloom->num_bytes += cur_bloom->num_bytes;
cur_bloom->bitmap = bloom->bitmap;
return cur_bloom;
}
uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom)
{
uint64_t seqnum;
if (bloom->header->disk_seqnum != 0)
{
// disk_seqnum cleared on disk before any other changes
bloom->header->disk_seqnum = 0;
bitmap_flush(bloom->bitmap);
}
seqnum = bloom->header->mem_seqnum;
bloom->header->mem_seqnum = 0;
return seqnum;
}
int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
{
int i;
uint64_t seqnum;
counting_bloom_t *cur_bloom = NULL;
for (i = bloom->num_blooms - 1; i >= 0; i--)
{
cur_bloom = bloom->blooms[i];
if (id >= cur_bloom->header->id)
{
break;
}
}
seqnum = scaling_bloom_clear_seqnums(bloom);
if ((id > bloom->header->max_id) && (cur_bloom->header->count >= cur_bloom->capacity - 1))
{
cur_bloom = new_counting_bloom_from_scale(bloom);
cur_bloom->header->count = 0;
cur_bloom->header->id = bloom->header->max_id + 1;
}
if (bloom->header->max_id < id)
{
bloom->header->max_id = id;
}
counting_bloom_add(cur_bloom, s, len);
bloom->header->mem_seqnum = seqnum + 1;
return 1;
}
int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
{
counting_bloom_t *cur_bloom;
int i;
uint64_t seqnum;
for (i = bloom->num_blooms - 1; i >= 0; i--)
{
cur_bloom = bloom->blooms[i];
if (id >= cur_bloom->header->id)
{
seqnum = scaling_bloom_clear_seqnums(bloom);
counting_bloom_remove(cur_bloom, s, len);
bloom->header->mem_seqnum = seqnum + 1;
return 1;
}
}
return 0;
}
int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len)
{
int i;
counting_bloom_t *cur_bloom;
for (i = bloom->num_blooms - 1; i >= 0; i--)
{
cur_bloom = bloom->blooms[i];
if (counting_bloom_check(cur_bloom, s, len))
{
return 1;
}
}
return 0;
}
int scaling_bloom_flush(scaling_bloom_t *bloom)
{
if (bitmap_flush(bloom->bitmap) != 0)
{
return -1;
}
// all changes written to disk before disk_seqnum set
if (bloom->header->disk_seqnum == 0)
{
bloom->header->disk_seqnum = bloom->header->mem_seqnum;
return bitmap_flush(bloom->bitmap);
}
return 0;
}
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
{
return bloom->header->mem_seqnum;
}
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
{
return bloom->header->disk_seqnum;
}
scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
{
scaling_bloom_t *bloom;
if ((bloom = (scaling_bloom_t *)malloc(sizeof(scaling_bloom_t))) == NULL)
{
return NULL;
}
if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL)
{
fprintf(stderr, "Error, Could not create bitmap with file\n");
free_scaling_bloom(bloom);
return NULL;
}
bloom->header = (scaling_bloom_header_t *)bloom->bitmap->array;
bloom->capacity = capacity;
bloom->error_rate = error_rate;
bloom->num_blooms = 0;
bloom->num_bytes = sizeof(scaling_bloom_header_t);
bloom->blooms = NULL;
return bloom;
}
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
{
scaling_bloom_t *bloom;
counting_bloom_t *cur_bloom;
bloom = scaling_bloom_init(capacity, error_rate);
if (!(cur_bloom = new_counting_bloom_from_scale(bloom)))
{
fprintf(stderr, "Error, Could not create counting bloom\n");
free_scaling_bloom(bloom);
return NULL;
}
cur_bloom->header->count = 0;
cur_bloom->header->id = 0;
bloom->header->mem_seqnum = 1;
return bloom;
}
struct expiry_dablooms_handle
{
scaling_bloom_t *cur_bloom;
scaling_bloom_t *next_bloom;
time_t cur_bloom_start;
time_t next_bloom_start;
time_t last_bloom_check;
uint64_t cur_bloom_inc_id;
uint64_t next_bloom_inc_id;
unsigned int capacity;
int expiry_time;
time_t cur_time;
double error_rate;
};
char *expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno)
{
switch (_errno)
{
case EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL:
return (char *)"scaling_bloom_null";
case EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL:
return (char *)"new_scaling_bloom_fail";
default:
return (char *)"unknown";
}
}
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle)
{
if (handle != NULL)
{
if (handle->cur_bloom != NULL)
{
free_scaling_bloom(handle->cur_bloom);
}
if (handle->next_bloom != NULL)
{
free_scaling_bloom(handle->next_bloom);
}
FREE(&handle);
}
}
struct expiry_dablooms_handle *expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time)
{
struct expiry_dablooms_handle *handle = ALLOC(struct expiry_dablooms_handle, 1);
scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate);
if (cur_bloom == NULL)
{
goto error_out;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
handle->cur_bloom_start = cur_time;
handle->capacity = capacity;
handle->error_rate = error_rate;
handle->expiry_time = expiry_time;
handle->cur_time = cur_time;
return handle;
error_out:
expiry_dablooms_destroy(handle);
return NULL;
}
int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count)
{
if (handle == NULL || handle->cur_bloom == NULL)
{
return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
}
*count = handle->cur_bloom_inc_id;
return 0;
}
static int bloom_expired_check(struct expiry_dablooms_handle *handle, time_t cur_time)
{
if (handle == NULL || handle->cur_bloom == NULL)
{
return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
}
if (cur_time <= handle->last_bloom_check)
{
return 0;
}
time_t delta_time = cur_time - handle->cur_bloom_start;
handle->cur_time = cur_time;
if (delta_time >= handle->expiry_time)
{
free_scaling_bloom(handle->cur_bloom);
if (handle->next_bloom != NULL)
{
handle->cur_bloom = handle->next_bloom;
handle->cur_bloom_start = handle->next_bloom_start;
handle->cur_bloom_inc_id = handle->next_bloom_inc_id;
handle->next_bloom = NULL;
handle->last_bloom_check = 0;
}
else
{
scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
if (cur_bloom == NULL)
{
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
handle->cur_bloom_start = cur_time;
handle->last_bloom_check = 0;
}
}
else
{
handle->last_bloom_check = cur_time;
}
return 0;
}
int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time)
{
if (key == NULL || len == 0 || handle == NULL)
{
return -1;
}
int ret = bloom_expired_check(handle, cur_time);
if (ret < 0)
{
return ret;
}
scaling_bloom_add(handle->cur_bloom, key, len, handle->cur_bloom_inc_id);
handle->cur_bloom_inc_id++;
time_t delta_time = cur_time - handle->cur_bloom_start;
handle->cur_time = cur_time;
if (delta_time >= handle->expiry_time)
{
if (handle->next_bloom == NULL)
{
scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
if (next_bloom == NULL)
{
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->next_bloom = next_bloom;
handle->next_bloom_inc_id = 0;
handle->next_bloom_start = cur_time;
}
scaling_bloom_add(handle->next_bloom, key, len, handle->next_bloom_inc_id);
handle->next_bloom_inc_id++;
}
return 0;
}
int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time)
{
if (key == NULL || len == 0 || handle == NULL)
{
return -1;
}
int ret = bloom_expired_check(handle, cur_time);
if (ret < 0)
{
return ret;
}
int bloom_hit = scaling_bloom_check(handle->cur_bloom, key, len);
return bloom_hit;
}

100
src/dablooms/dablooms.h Normal file
View File

@@ -0,0 +1,100 @@
/* Copyright @2012 by Justin Hines at Bitly under a very liberal license. See LICENSE in the source distribution. */
#ifndef __BLOOM_H__
#define __BLOOM_H__
#include <stdint.h>
#include <stdlib.h>
#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
#define FREE(p) \
{ \
free(*p); \
*p = NULL; \
}
const char *dablooms_version(void);
typedef struct
{
size_t bytes;
char *array;
} bitmap_t;
bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
bitmap_t *new_bitmap(size_t bytes);
int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_flush(bitmap_t *bitmap);
void free_bitmap(bitmap_t *bitmap);
typedef struct
{
uint64_t id;
uint32_t count;
uint32_t _pad;
} counting_bloom_header_t;
typedef struct
{
counting_bloom_header_t *header;
unsigned int capacity;
long offset;
unsigned int counts_per_func;
uint32_t *hashes;
size_t nfuncs;
size_t size;
size_t num_bytes;
double error_rate;
bitmap_t *bitmap;
} counting_bloom_t;
int free_counting_bloom(counting_bloom_t *bloom);
counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate);
int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
typedef struct
{
uint64_t max_id;
uint64_t mem_seqnum;
uint64_t disk_seqnum;
} scaling_bloom_header_t;
typedef struct
{
scaling_bloom_header_t *header;
unsigned int capacity;
unsigned int num_blooms;
size_t num_bytes;
double error_rate;
counting_bloom_t **blooms;
bitmap_t *bitmap;
} scaling_bloom_t;
scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
int free_scaling_bloom(scaling_bloom_t *bloom);
int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
int scaling_bloom_flush(scaling_bloom_t *bloom);
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom);
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom);
struct expiry_dablooms_handle;
enum expiry_dablooms_errno
{
EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL = -1,
EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL = -2,
};
char *expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno);
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle);
struct expiry_dablooms_handle *expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time);
int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count);
int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
#endif

153
src/dablooms/murmur.cpp Normal file
View File

@@ -0,0 +1,153 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.
#include "murmur.h"
#define FORCE_INLINE inline static
FORCE_INLINE uint64_t rotl64(uint64_t x, int8_t r)
{
return (x << r) | (x >> (64 - r));
}
#define ROTL64(x, y) rotl64(x, y)
#define BIG_CONSTANT(x) (x##LLU)
#define getblock(x, i) (x[i])
//-----------------------------------------------------------------------------
// Finalization mix - force all bits of a hash block to avalanche
FORCE_INLINE uint64_t fmix64(uint64_t k)
{
k ^= k >> 33;
k *= BIG_CONSTANT(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
k ^= k >> 33;
return k;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x64_128(const void *key, const int len, const uint32_t seed, void *out)
{
const uint8_t *data = (const uint8_t *)key;
const int nblocks = len / 16;
uint64_t h1 = seed;
uint64_t h2 = seed;
uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
int i;
//----------
// body
const uint64_t *blocks = (const uint64_t *)(data);
for (i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock(blocks, i * 2 + 0);
uint64_t k2 = getblock(blocks, i * 2 + 1);
k1 *= c1;
k1 = ROTL64(k1, 31);
k1 *= c2;
h1 ^= k1;
h1 = ROTL64(h1, 27);
h1 += h2;
h1 = h1 * 5 + 0x52dce729;
k2 *= c2;
k2 = ROTL64(k2, 33);
k2 *= c1;
h2 ^= k2;
h2 = ROTL64(h2, 31);
h2 += h1;
h2 = h2 * 5 + 0x38495ab5;
}
//----------
// tail
const uint8_t *tail = (const uint8_t *)(data + nblocks * 16);
uint64_t k1 = 0;
uint64_t k2 = 0;
switch (len & 15)
{
case 15:
k2 ^= ((uint64_t)tail[14]) << 48;
case 14:
k2 ^= ((uint64_t)tail[13]) << 40;
case 13:
k2 ^= ((uint64_t)tail[12]) << 32;
case 12:
k2 ^= ((uint64_t)tail[11]) << 24;
case 11:
k2 ^= ((uint64_t)tail[10]) << 16;
case 10:
k2 ^= ((uint64_t)tail[9]) << 8;
case 9:
k2 ^= ((uint64_t)tail[8]) << 0;
k2 *= c2;
k2 = ROTL64(k2, 33);
k2 *= c1;
h2 ^= k2;
case 8:
k1 ^= ((uint64_t)tail[7]) << 56;
case 7:
k1 ^= ((uint64_t)tail[6]) << 48;
case 6:
k1 ^= ((uint64_t)tail[5]) << 40;
case 5:
k1 ^= ((uint64_t)tail[4]) << 32;
case 4:
k1 ^= ((uint64_t)tail[3]) << 24;
case 3:
k1 ^= ((uint64_t)tail[2]) << 16;
case 2:
k1 ^= ((uint64_t)tail[1]) << 8;
case 1:
k1 ^= ((uint64_t)tail[0]) << 0;
k1 *= c1;
k1 = ROTL64(k1, 31);
k1 *= c2;
h1 ^= k1;
}
//----------
// finalization
h1 ^= len;
h2 ^= len;
h1 += h2;
h2 += h1;
h1 = fmix64(h1);
h2 = fmix64(h2);
h1 += h2;
h2 += h1;
((uint64_t *)out)[0] = h1;
((uint64_t *)out)[1] = h2;
}
//-----------------------------------------------------------------------------

12
src/dablooms/murmur.h Normal file
View File

@@ -0,0 +1,12 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
#ifndef _MURMURHASH3_H_
#define _MURMURHASH3_H_
#include <stdint.h>
void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out);
#endif // _MURMURHASH3_H_

View File

@@ -0,0 +1,10 @@
###############################################################################
# gtest
###############################################################################
add_executable(gtest_dablooms gtest_dablooms.cpp)
target_include_directories(gtest_dablooms PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(gtest_dablooms dablooms gtest)
include(GoogleTest)
gtest_discover_tests(gtest_dablooms)

View File

@@ -0,0 +1,71 @@
#include <gtest/gtest.h>
#include "dablooms.h"
struct packet_key
{
unsigned int tcp_seq;
unsigned int tcp_ack;
unsigned short sport;
unsigned short dport;
unsigned short checksum;
unsigned short ip_id;
unsigned int ip_src;
unsigned int ip_dst;
} __attribute__((packed, aligned(1)));
struct duplicate_packet_idetify_config
{
int enable;
unsigned int capacity;
double error_rate;
int expiry_time;
} config = {
.enable = 1,
.capacity = 1000000,
.error_rate = 0.00001,
.expiry_time = 10,
};
struct packet_key key = {
.tcp_seq = 2172673142,
.tcp_ack = 2198097831,
.sport = 46582,
.dport = 443,
.checksum = 0x2c4b,
.ip_id = 65535,
.ip_src = 123456,
.ip_dst = 789000,
};
TEST(DABLOOMS, ADD_SEARCH)
{
struct expiry_dablooms_handle *handle = expiry_dablooms_init(config.capacity, config.error_rate, time(NULL), config.expiry_time);
EXPECT_TRUE(handle != nullptr);
EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) != 1); // no exist
EXPECT_TRUE(expiry_dablooms_add(handle, (const char *)&key, sizeof(key), time(NULL)) == 0); // add
for (int i = 0; i < 15; i++)
{
if (i < config.expiry_time)
{
EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) == 1); // exist
}
else
{
EXPECT_TRUE(expiry_dablooms_search(handle, (const char *)&key, sizeof(key), time(NULL)) != 1); // no exist
}
sleep(1);
printf("sleep[%02d] 1s\n", i);
}
expiry_dablooms_destroy(handle);
}
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}