使用Python访问AWS S3
AWS配置
访问S3需要aws_access_key_id和aws_secret_access_key。
func1.在boto3提供的API中,可以指定这两个值。
func2.但是把安全相关的这些信息放到代码中并不是一个好的选择。
所以,也可通过awscli配置,将其存放到~/.aws/credentials。
boto3的API在访问S3时,会自动从该文件中读取相关信息。
awscli可以通过pip install awscli进行安装。
import os
import boto3
from loguru import logger
BUCKET_NAME = "your-bucket-name" # 存储桶名称
# aws_access_key_id和aws_secret_access_key
CN_S3_AKI = 'your_aws_access_key_id'
CN_S3_SAK = 'your_aws_secret_access_key'
CN_REGION_NAME = 'your-cn-region-name' # 区域
# s3 实例
s3 = ('s3', region_name=CN_REGION_NAME,
aws_access_key_id=CN_S3_AKI, aws_secret_access_key=CN_S3_SAK
)
def upload_files(path_local, path_s3):
"""
上传(重复上传会覆盖同名文件)
:param path_local: 本地路径
:param path_s3: s3路径
"""
(f'Start upload files.')
if not upload_single_file(path_local, path_s3):
(f'Upload files failed.')
(f'Upload files successful.')
def upload_single_file(src_local_path, dest_s3_path):
"""
上传单个文件
:param src_local_path:
:param dest_s3_path:
:return:
"""
try:
with open(src_local_path, 'rb') as f:
s3.upload_fileobj(f, BUCKET_NAME, dest_s3_path)
except Exception as e:
(f'Upload data failed. | src: {src_local_path} | dest: {dest_s3_path} | Exception: {e}')
return False
(f'Uploading file successful. | src: {src_local_path} | dest: {dest_s3_path}')
return True
def download_zip(path_s3, path_local):
"""
下载
:param path_s3:
:param path_local:
:return:
"""
retry = 0
while retry < 3: # 下载异常尝试3次
(f'Start downloading files. | path_s3: {path_s3} | path_local: {path_local}')
try:
s3.download_file(BUCKET_NAME, path_s3, path_local)
file_size = (path_local)
(f'Downloading completed. | size: {round(file_size / 1048576, 2)} MB')
break # 下载完成后退出重试
except Exception as e:
(f'Download zip failed. | Exception: {e}')
retry += 1
if retry >= 3:
(f'Download zip failed after max retry.')
def delete_s3_zip(path_s3, file_name=''):
"""
删除
:param path_s3:
:param file_name:
:return:
"""
try:
# copy
# copy_source = {'Bucket': BUCKET_NAME, 'Key': path_s3}
# s3.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key='is-zips-cache/' + file_name)
s3.delete_object(Bucket=BUCKET_NAME, Key=path_s3)
except Exception as e:
(f'Delete s3 file failed. | Exception: {e}')
(f'Delete s3 file Successful. | path_s3 = {path_s3}')
def batch_delete_s3(delete_key_list):
"""
批量删除
:param delete_key_list: [
{'Key': "test-01/虎式03的副本.jpeg"},
{'Key': "test-01/"},
]
:return:
"""
try:
res = s3.delete_objects(
Bucket=BUCKET_NAME,
Delete={'Objects': delete_key_list}
)
except Exception as e:
(f"Batch delete file failed. | Excepthon: {e}")
(f"Batch delete file success. ")
def get_files_list(Prefix=None):
"""
查询
:param start_after:
:return:
"""
(f'Start getting files from s3.')
try:
if Prefix is not None:
all_obj = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=Prefix)
# 获取某个对象的head信息
# obj = s3.head_object(Bucket=BUCKET_NAME, Key=Prefix)
# (f"obj = {obj}")
else:
all_obj = s3.list_objects_v2(Bucket=BUCKET_NAME)
except Exception as e:
(f'Get files list failed. | Exception: {e}')
return
contents = all_obj.get('Contents')
(f"--- contents = {contents}")
if not contents:
return
file_name_list = []
for zip_obj in contents:
# (f"zip_obj = {zip_obj}")
file_size = round(zip_obj['Size'] / 1024 / 1024, 3) # 大小
# (f"file_path = {zip_obj['Key']}")
# (f"LastModified = {zip_obj['LastModified']}")
# (f"file_size = {file_size} Mb")
# zip_name = zip_obj['Key'][len(start_after):]
zip_name = zip_obj['Key']
file_name_list.append(zip_name)
(f'Get file list successful.')
return file_name_list
# TODO 检测是否存在 check_exists,可用get_files_list时遍历路径下是否存在要检测的文件名称来判断
# def check_exists(bucket_name, path_name):
# s3 = ('s3')
# bucket = (bucket_name)
# counter = 0
# for _ in (Prefix=path_name):
# counter = counter + 1
# if counter > 0:
# return True
# return False
if __name__ == "__main__":
pass
# TODO test 查询 / 上传 / 下载 /删除
# 查询
# file_name_list = get_files_list(Prefix='test-01/')
file_name_list = get_files_list()
(f"file_name_list = {file_name_list}")
# 上传
# path_local = '/Users/Desktop/test/local_file_path/虎式02的副本.jpeg'
# path_s3 = 'test-02/虎式02的副本.jpeg' # s3路径不存在则自动创建
# upload_files(path_local, path_s3)
# 下载
# path_s3 = 'test-01/虎式02的副本.jpeg'
# path_local = '/Users/Desktop/test/download_from_s3/虎式02的副本.jpeg'
# download_zip(path_s3, path_local)
# 删除
# path_s3 = 'test-01/虎式02的副本.jpeg'
# delete_s3_zip(path_s3)
# 批量删除 - 桶内多个文件
# delete_key_list = [
# {'Key': "test-01/虎式"},
# {'Key': "test-01/虎式"},
# {'Key': "test-01/"},
# {'Key': "test-01/"},
# {'Key': "test-01/"},
# {'Key': "test-01/"},
# {'Key': "test-01/"},
# ]
# batch_delete_s3(delete_key_list)