使用库pybloom_live
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from pybloom_live import ScalableBloomFilter,BloomFilter
# 可自动伸缩的布隆过滤器
bloom = ScalableBloomFilter(initial_capacity = 100 ,error_rate = 0.001 )
# 添加内容
bloom.add( 'daqi' )
print ( 'daqi' in bloom)
# 定长的布隆过滤器
bloom1 = BloomFilter(capacity = 10000 )
bloom1.add( 'daqi' )
print ( 'daqi' in bloom1)
|
手动实现一个简单的布隆过滤器
使用bitarray实现,将初始数组置为0,根据hash计算出节点置为1,同时写了一个生成随机码的函数用于测试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
import random
import mmh3
from bitarray import bitarray
import os.path
import re
# bitarray长度
BIT_SIZE = 50000
class BloomFilter():
def __init__( self ):
bit_array = bitarray(BIT_SIZE)
bit_array.setall( 0 )
self .bit_array = bit_array
self .bit_size = self .length()
def get_points( self , url):
"""
生成需要插入的位置
:param url:
:return:节点的列表
"""
point_list = []
for i in range ( 7 ):
point = mmh3. hash (url, 30 + i) % self .bit_size
point_list.append(point)
return point_list
def add( self , url):
"""
添加url到bitarray中
:param url:
:return:
"""
res = self .bitarray_expand()
points = self .get_points(url)
try :
for point in points:
self .bit_array[point] = 1
return '注册完成!'
except Exception as e:
return e
def contains( self ,url):
"""
验证url是否存在
:param url:
:return:True or False
"""
points = self .get_points(url)
# 在bitarray中查找对应的点,如果有一个点值为0就说明该url不存在
for p in points:
if self .bit_array[p] = = 0 :
return False
return True
def count( self ):
"""
获取bitarrray中使用的节点数
:return: bitarray长度
"""
return self .bit_array.count()
def length( self ):
"""
获取bitarray的长度
:return:bitarray的长度
"""
return len ( self .bit_array)
def bitarray_expand( self ):
"""
扩充bitarray长度
:return:bitarray的长度或使用率,布隆过滤器的bitarray的使用最好不要超过50%,这样误判率低一些
"""
isusespace = round ( int ( self .count()) / int ( self .length()), 4 )
if 0.50 < isusespace:
# 新建bitarray
expand_bitarray = bitarray(BIT_SIZE)
expand_bitarray.setall( 0 )
# 增加新建的bitarray
self .bit_array = self .bit_array + expand_bitarray
self .bit_size = self .length()
return self .bit_size
else :
return f '长度尚可,{round(isusespace * 100,2)}%'
def get_captcha():
"""
生成用于测试的随机码
:return:
"""
seed = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
captcha = ""
for i in range ( 10 ):
captcha + = random.choice(seed)
print (captcha)
return captcha
if __name__ = = '__main__' :
bloom = BloomFilter()
for i in range ( 100000 ):
bloom.add(f 'www.{get_captcha()}.com' )
print (bloom.length())
print (bloom.count())
print (bloom.count())
|
到此这篇关于python使用布隆过滤器的实现示例的文章就介绍到这了,更多相关python 布隆过滤器内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/qq_41387609/article/details/108069894