python3 操作阿里云OSS

时间:2024-03-09 21:21:02
# -*- coding: utf-8 -*
import oss2
import os



access_key_id = os.getenv(\'OSS_TEST_ACCESS_KEY_ID\', \'输入你的key\')
access_key_secret = os.getenv(\'OSS_TEST_ACCESS_KEY_SECRET\', \'输入你的密码\')
endpoint = os.getenv(\'OSS_TEST_ENDPOINT\', \'http://oss-cn-beijing.aliyuncs.com\')
path = os.path.abspath(\'.\')
dir_path = os.path.abspath(\'dir\')
dirs = os.listdir(path)


def showBucket():
    print("**********   获取bucket信息  *******")
    service = oss2.Service(oss2.Auth(access_key_id, access_key_secret), endpoint)
    print("*****************************")
    print("     现有bucket有:      ")
    print(\'\n\'.join(info.name for info in oss2.BucketIterator(service)))
    print("*****************************")


def createBucket():
    print("**********   创建  *******")
    bucket_input = input("请输入想创建的bucket名:   ")
    # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input)

    # 带权限与存储类型创建bucket
    bucket.create_bucket(permission=oss2.BUCKET_ACL_PRIVATE,
                         input=oss2.models.BucketCreateConfig(oss2.BUCKET_STORAGE_CLASS_STANDARD))
    if oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input):
        print("     成功创建%s" %bucket_input)
        showBucket()
    print("***************************")

def bucketInfo():
    print("**********   获取bucket_info  *******")
    bucket_input = input(\'请输入bucket名:   \')
    # 获取bucket相关信息
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input)
    bucket_info = bucket.get_bucket_info()
    print("     bucket_info:")
    print(\' name: \' + bucket_info.name)
    print(\' storage class: \' + bucket_info.storage_class)
    print(\' creation date: \' + bucket_info.creation_date)
    print("*******************************")

    print("*******************************")
    print("     bucket_stat:")
    bucket_stat = bucket.get_bucket_stat()
    print(\' storage: \' + str(bucket_stat.storage_size_in_bytes))
    print(\' object count: \' + str(bucket_stat.object_count))
    print(\' multi part upload count: \' + str(bucket_stat.multi_part_upload_count))
    print("********************************")



def upload():
    print("**********   上传  *******")
    bucket_input = input(\'请输入要传入的bucket名:   \')
    print("**************************")
    print("     当前目录下所有文件:")
    for file in dirs:
        print(file)
    print("***************************")

    filename = input(\'请输入要上传的文件名: \')
    cloud_name = input(\'请输入云端文件名:   \')
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input)
    with open(oss2.to_unicode(filename), \'rb\') as f:
        bucket.put_object(cloud_name, f)
    meta = bucket.get_object_meta(cloud_name)
    if meta:
        print("     上传成功")
        print("     云端所有文件:")
        for i in oss2.ObjectIterator(bucket):
            print(i.key)

    else:
        print("     上传失败")





def download():
    print("**********   下载  *******")
    bucket_input = input(\'请输入bucket名:\')
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input)
    print("     %s下有如下文件:" %bucket_input)
    for i in oss2.ObjectIterator(bucket):
        print(i.key)
    print("***************************")
    cloud_name = input(\'请输入要下载的文件名:\')
    file_name = input(\'请输入保存至本地文件名:\')
    bucket.get_object_to_file(cloud_name, file_name)
    print(file_name[4:])
    if file_name[4:] in os.listdir(dir_path):
        print("     成功下载%s" %cloud_name)
    print("**************************")
    print("     当前目录下所有文件:")
    for file in os.listdir(dir_path):
        print(file)
    print("***************************")



def remove():
    print("**********   删除  *******")
    bucket_input = input(\'请输入bucket名:\'  )
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name=bucket_input)
    print("     %s下有如下文件(删除前):" % bucket_input)
    for i in oss2.ObjectIterator(bucket):
        print(i.key)
    print("***************************")
    file_name = input(\'请输入要删除的文件名:\'  )

    # 删除名为motto.txt的Object
    bucket.delete_object(file_name)
    print("     成功删除%s" %file_name)
    print("     %s下有如下文件(删除后):" % bucket_input)
    for i in oss2.ObjectIterator(bucket):
        print(i.key)

def main():
    print("************************")
    print("     「指令数字」:")
    print("0. 获取bucket信息")
    print("1. 查看具体bucket信息")
    print("2. 创建bucket")
    print("3. 上传文件")
    print("4. 下载文件")
    print("5. 删除文件")
    print("6. 退出")
    print("************************")
    while 1:
        num = int(input(\'请输入指令数:\'))
        if num == 0:
            showBucket()
        elif num == 1:
            bucketInfo()
        elif num == 2:
            createBucket()
        elif num == 3:
            upload()
        elif num == 4:
            download()
        elif num == 5:
            remove()
        else:
            break



if __name__ == \'__main__\':
    main()