【数据结构与算法】系列三十九 - 布隆过滤器

如果要经常判断1个元素是否存在,你会怎么做?

很容易想到使用哈希表(HashSet、HashMap),将元素作为key去查找,时间复杂度是$O(1)$,但是空间利用率不高,需要占用比较多的内存资源。

如果需要编写一个网络爬虫去爬10亿个网站数据,为了避免爬到重复的网站,如何判断某个网站是否爬过?很显然,HashSet、HashMap并不是非常好的选择。是否存在时间复杂度低、占用内存较少的方案?布隆过滤器

一、布隆过滤器

布隆过滤器(Bloom Filter),1970年由布隆提出,它是一个空间效率高的概率型数据结构,可以用来告诉你:一个元素一定不存在或者可能存在

优点:空间效率和查询时间都远远超过一般的算法。

缺点:有一定的误判率(误判率可以通过代码调整到尽可能低,但绝对不可能调整到零),并且删除非常困难(也因此布隆过滤器没有删除功能)。

它实质上是一个很长的二进制向量和一系列随机映射函数(Hash函数)。

常见应用:网页黑名单系统、垃圾邮件过滤系统、爬虫的网址判重系统、解决缓存穿透问题等。

1.1. 原理

假设布隆过滤器由20位二进制,3个哈希函数组成,每个元素经过哈希函数处理都能生成一个索引位置。

添加元素:将每一个哈希函数生成的索引位置都设为1。

查询元素是否存在

  • 如果有一个哈希函数生成的索引位置不为1,就代表不存在(100%准确)
  • 如果每一个哈希函数生成的索引位置都为1,就代表存在(存在一定的误判率)

如下图,创建一个20位的二进制向量(存放二进制的数组,初始默认都是0),添加元素A和B到二进制向量中大概需要以下步骤:

  1. 元素A经过三个哈希函数计算出在二进制向量中3个不同的索引,分别是3,7,8;
  2. 然后把对应索引位置的值设为1;
  3. 元素B经过三个哈希函数计算出在二进制向量中3个不同的索引,分别是2,7,15;
  4. 然后把对应索引位置的值设为1。但是索引为7的值之前就是1,所以从这也可以看出,判断某个元素是否存在,返回true时是可能存在误判的,因为相同索引的值可能代表的是其他元素存在,同样的问题导致删除元素也会变得非常困难。

添加、查询的时间复杂度都是$O(k)$,k是哈希函数的个数。空间复杂度是$O(m)$,m是二进制位的个数。

因此只要需求满足以下三个条件,就可以考虑使用布隆过滤器

  1. 经常要判断某个元素是否存在;
  2. 元素数量巨大,希望用比较少的内存空间;
  3. 允许有一定的误判率。

1.2. 误判率

误判率 $p$ 受3个因素影响:二进制位的个数 $m$,哈希函数的个数 $k$,数据规模 $n$。经过科学家的实验结果可以得出以下公式:

$$
p = \Big(1-e^{-\frac{k(n+0.5)}{m-1}}\Big)^k \qquad \underrightarrow {忽略常数} \qquad \Big(1-e^{-\frac{k(n+0.5)}{m-1}}\Big)^k
$$

已知误判率 $p$,数据规模 $n$,求二进制位的个数 $m$,哈希函数的个数:

$$
m = -\frac{nlnp}{(ln2)^2} \qquad \underrightarrow {根据m求出k} \qquad
k = \frac{m}{n}ln2
$$

知识补充:$ln$是自然数对数,即 $ln{a} = log_e{a}$。

1.3. 实现

有了上面的公式,代码实现起来就会简单很多,但仍然有很多细节需要注意。

1.3.1. 计算

根据已知条件,误判率 $p$,数据规模 $n$,计算出二进制位的个数(int bitSize)和哈希函数的个数(int hashSize),并且根据bitSize计算出二进制向量long[] bits的容量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BloomFilter<T> {
/**
* 二进制向量的长度(二进制位的个数)
*/
private int bitSize;
/**
* 二进制向量
*/
private long[] bits;
/**
* 哈希函数的个数
*/
private int hashSize;

/**
* @param n 数据规模
* @param p 误判率, 取值范围(0, 1)
*/
public BloomFilter(int n, double p) {
if (n <= 0 || p <= 0 || p >= 1) {
throw new IllegalArgumentException("wrong n or p");
}
// 求ln2的值(在java中,Math有提供对应的函数)
double ln2 = Math.log(2);
// 求出二进制向量的长度
bitSize = (int) (- (n * Math.log(p)) / (ln2 * ln2));
// 求出哈希函数的个数
hashSize = (int) (bitSize * ln2 / n);
// 计算bits数组的长度
// 下面代码也可以使用ceil函数进行向上取整,但没有加减法效率高
bits = new long[(bitSize + Long.SIZE - 1) / Long.SIZE];
}
}

1.3.2. 添加元素

添加元素首先要计算哈希值,根据哈希值计算其在二进制向量中的索引。

为了计算方便(主要是位运算),位的索引在long内存中使用倒序存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 添加元素
*/
public boolean put(T value) {
// 非空检查
if (value == null) {
throw new IllegalArgumentException("Value must not be null.");
}
// 利用value生成2个整数
int hash1 = value.hashCode();
int hash2 = hash1 >>> 16;

boolean result = false;
for (int i = 1; i <= hashSize; i++) {
int combinedHash = hash1 + (i * hash2);
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
// 生成索引
int index = combinedHash % bitSize;
// 设置index位置的二进位为1,如果是未添加过就返回true
if (set(index)) result = true;
}
return result;
}

/**
* 设置index位置的二进位为1
*/
private boolean set(int index) {
// 计算二进制向量中的long的索引(二进制向量由有多个long组成的数组)(index的粗略位置)
int indexOfLong = index / Long.SIZE;
// 获取对应位置的long值
long valueOfLong = bits[indexOfLong];
// 计算出index在long中的二进制位索引(index的精确位置)
int indexOfBit = index % Long.SIZE;
// 向左移动indexOfBit位
long bitValue = 1 << indexOfBit;

/*
比如:
index == 130
indexOfLong == 2
valueOfLong == 00000000
indexOfBit == 2
bitValue == 1 << 2 => 00000100
valueOfLong | bitValue == 00000100 | 00000100 => 00000100
*/
// 更新指定位的值(按位或:遇到1就是1,遇到0保持不动)
long updatedValue = valueOfLong | bitValue;
// 覆盖indexOfLong位置的数据
bits[indexOfLong] = updatedValue;

// 如果原来对应位置存储的是0,意味着本次添加数据导致对应bit发生了改变(0变为1),返回true
return (valueOfLong & bitValue) == 0;
}

1.3.3. 元素是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 判断一个元素是否存在
*/
public boolean contains(T value) {
// 非空检查
if (value == null) {
throw new IllegalArgumentException("Value must not be null.");
}
// 利用value生成2个整数
int hash1 = value.hashCode();
int hash2 = hash1 >>> 16;

for (int i = 1; i <= hashSize; i++) {
int combinedHash = hash1 + (i * hash2);
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
// 生成索引
int index = combinedHash % bitSize;
// 查询index位置的二进位是否为0
if (!get(index)) return false;
}
return true;
}

/**
* 查看index位置的二进位的值
* @return true代表1, false代表0
*/
private boolean get(int index) {
// 计算二进制向量中的long的索引(二进制向量由有多个long组成的数组)(index的粗略位置)
int indexOfLong = index / Long.SIZE;
// 获取对应位置的long值
long valueOfLong = bits[indexOfLong];
// 计算出index在long中的二进制位索引(index的精确位置)
int indexOfBit = index % Long.SIZE;
// 向左移动indexOfBit位
long bitValue = 1 << indexOfBit;
/*
按位与:两者同1就是1,否则就是0
比如:
00010100
&
00000100
-----------
00000100
*/
// 如果对应位置是0,返回false(一定不存在);否则返回true(可能存在)
return (valueOfLong & bitValue) != 0;
}

1.3.4. 使用

假设要爬虫10亿个网站,误判率为1%。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
// 数组
String[] urls = {};
BloomFilter<String> bf = new BloomFilter<String>(10_0000_0000, 0.01);
for (String url : urls) {
// 此处误判指的是新的URL经过布隆过滤器后发现已经被爬过(真实情况是这个URL没有爬过),即应该爬的网站没有爬
/* 如果put方法没有返回值,可以使用contains方法判断是否爬过,但是需要手动把URL放进BloomFilter中
for (String url : urls) {
if (bf.contains(url)) continue;
// 爬这个url
// ......

// 放进BloomFilter中
bf.put(url);
}
*/
// 使用put方法的返回值判断是否爬过,使用这种方式可以直接把URL放到BloomFilter中
if (bf.put(url) == false) continue;
// 爬这个url
// ......
}
}

Google开源的布隆过滤器:https://mvnrepository.com/artifact/com.google.guava/guava