cfg = #用Minio获取s3服务器client,查看桶(存放文件的文件夹cfg['bucket'])是否存在。 client = Minio(cfg['endpoint'], access_key=cfg["access_key"], secret_key=cfg["secret_key"]) found = client.bucket_exists(cfg['bucket'])
if found:
#用boto3创建获取s3服务器的反应(参数为地址和密码,参考S3 — Boto3 Docs 1.23.3 documentation)
s3 = (
's3',
endpoint_url=f'{"https" if cfg["ssl"] else "http"}://{cfg["endpoint"]}',
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"]
)
self.s3 = s3
#获取桶,获取客户端
self.my_bucket = (cfg['bucket'])
self.boto_client = ('s3',endpoint_url=f'{"https" if cfg["ssl"] else "http"}://{cfg["endpoint"]}',
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"])
#过滤是cfg['file_dictory'] 文件夹中的文件,每个文件是一个对象
#cfg['file_dictory'] = ‘file’
#object_summary.key是文件对象的路径 例如:file/334413350/NWP/SFAF06_20220510_0000_NWP.WPD
for object_summary in self.my_bucket.(Prefix=cfg['file_dictory']):
if '.' in str(object_summary.key):
self.files_path_list.append(object_summary.key)
#读取文件中内容 obj = self.(['bucket'], file_path_name) body = ()['Body'].read() f_charInfo = (body) # 获取文本编码信息 #'GB2312' o_data = (f_charInfo['encoding'])
######移动文件。没有找到可以直接移动文件的接口,只能执行复制再删除。
#复制文件到另一个地方
old_source = {'Bucket':['bucket'], 'Key': self.file_path_name} #被复制的文件地址信息
new_key = self.file_path_name.replace(['file_dictory'], ['file_dictory_uploaded'])#移动到的地址路径
self.my_bucket.copy(old_source, new_key)
#删除原来文件
self.boto_client.delete_object(Bucket=['bucket'], Key=self.file_path_name)