1.环境准备
(1).安装python3.8
(2).安装pip
python3.8 -m easy_install pip
(3).安装mongodb驱动
pip install pymongo==3.4
2.准备测试数据
use wang
db.test1.insert({name:"关云长",age:30})
db.test1.insert({name:"张翼德",age:29})
db.test1.insert({name:"刘玄德",age:32})
3.脱敏脚本
#! coding:utf8
"""
安装依赖的库 pip install pymongo==3.4
"""
import pymongo
from urllib.parse import quote_plus
from bson import ObjectId
def connect(user='admin', password='123456', host='10.2.6.31', dbname='wang', port=27017):
try:
# 对连接进行urlecode编码,全是ASCII字符时不需要
url = "mongodb://%s:%s@%s:%s" % (quote_plus(user), quote_plus(password), host, port)
print('mongodb连接信息:%s' % url)
# 实例化mongo客户端
db_client = pymongo.MongoClient(url)[dbname]
return db_client
except Exception as e:
raise e
def cover_user_name(user_name):
if len(user_name) > 2:
user_name = user_name[0] + "*" + "".join(user_name[2:])
elif len(user_name) == 2:
user_name = user_name[0] + "*"
return user_name
def main():
db_client = connect()
# 切换集合
collection = db_client["test1"]
# 查询所有文档
for document in collection.find({}):
_id = document.get("_id")
userName = document.get('name')
userName = cover_user_name(userName)
print("replace ->%s user_name->%s" % (_id, userName))
# 根据文档id 更新 userName
collection.replace_one({'_id': ObjectId(_id)}, {'name': userName})
if __name__ == "__main__":
main()
执行python脚本
python3.8 mongo_replace.py
此时查看数据是否已经修改
4.准备测试数据
db.test2.insert({info:{name:"关云长",age:30}})
db.test2.insert({info:{name:"张翼德",age:29}})
db.test2.insert({info:{name:"刘玄德",age:32}})
5.脱敏脚本
#! coding:utf8
"""
安装依赖的库 pip install pymongo==3.4
"""
import pymongo
from urllib.parse import quote_plus
from bson import ObjectId
def connect(user='admin', password='Lachesis-mh_1024', host='10.2.6.31', dbname='wang', port=27017):
try:
# 对连接进行urlecode编码,全是ASCII字符时不需要
url = "mongodb://%s:%s@%s:%s" % (quote_plus(user), quote_plus(password), host, port)
print('mongodb连接信息:%s' % url)
# 实例化mongo客户端
db_client = pymongo.MongoClient(url)[dbname]
return db_client
except Exception as e:
raise e
def cover_user_name(user_name):
if len(user_name) > 2:
user_name = user_name[0] + "*" + "".join(user_name[2:])
elif len(user_name) == 2:
user_name = user_name[0] + "*"
return user_name
def main():
db_client = connect()
# 切换集合
collection = db_client["test2"]
# 查询所有文档
for document in collection.find({}):
_id = document.get("_id")
info = document.get("info")
names= info.get("name")
userName = cover_user_name(names)
print("replace ->%s user_name->%s" % (_id, userName))
# 根据文档id 更新 userName
collection.update_one({'_id': ObjectId(_id)},{'$set': {'info.name': userName}})
if __name__ == "__main__":
main()
执行完脚本查看是否修改
可以看到已经成功替换。