aws boto3 下载文件
import os, json, urllib, base64
import time, re
from datetime import datetime
from playwright.sync_api import Playwright, sync_playwright, expect
from bs4 import BeautifulSoup
from functools import wraps
proxy = 'http://username:password@192.192.14.32:3128'
proxies = {
'http': proxy,
'https': proxy
}
# 缓存目录
CACHE_DIR = (r'D:\code\aws_s3\cache')
# 确保缓存目录存在
os.makedirs(CACHE_DIR, exist_ok=True)
def timethis(func):
'''
Decorator that reports the execution time
:param func:
:return:
'''
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
s1 = datetime.now()
result = func(*args, **kwargs)
end = time.time()
s2 = datetime.now()
func_name = func.__name__
consume = end - start
consume2 = s2 - s1
print(f'{func_name} consume time is ---> {consume}')
print(f'{func_name} consume minutes is ---> {consume2}')
return result
return wrapper
def handle_route(route):
# 获取请求的 URL
url = route.request.url
resource_type = route.request.resource_type
url = route.request.url
resource_type = route.request.resource_type
block_list = [
# 'telemetry', "browserCreds", 'module-utils.js',
# 'svg', 'gif', 'image',
# 'module', 'panoramaroute', 'log', 'tele', 'index', 'util', 'css'
]
if any(x in url for x in block_list):
# print(f"---: {url} (包含 'dist')")
route.abort() # 中止该请求
return
# print(f"处理请求: {url} ({resource_type})")
# 生成对应的缓存文件名
# 使用安全的 URL 名称
file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"
cache_file = os.path.join(CACHE_DIR, file_name)
# 检查缓存文件是否存在
if os.path.exists(cache_file):
# print(f"从缓存加载: {url}")
# 从缓存文件加载数据
try:
with open(cache_file, 'r') as f:
cached_response = json.load(f)
# 模拟返回缓存的响应
route.fulfill(
status=cached_response['status'],
headers=cached_response['headers'],
body=base64.b64decode(cached_response['body']) # 解码 body
)
except:
pass
else:
# 继续请求并缓存响应
route.continue_()
def log_response(response):
url = response.url
resource_type = response.request.resource_type
# 仅缓存 CSS、JS 和图片文件
if resource_type in ['script', 'stylesheet', 'image']:
file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"
cache_file = os.path.join(CACHE_DIR, file_name)
# 只有在成功状态时才缓存响应
if response.status == 200:
try:
response_body = {
'status': response.status,
'headers': dict(response.headers),
'body': base64.b64encode(response.body()).decode('utf-8') # 确保调用 body() 方法获取字节
}
# 将响应写入缓存文件
with open(cache_file, 'w') as f:
json.dump(response_body, f)
# print(f"缓存资源: {url}")
except Exception as e:
# print('cache error', url)
pass
requests_info = {}
def log_request(request):
# 记录请求的开始时间
requests_info[request.url] = {
'start_time': time.time() # 记录当前时间(开始时间)
}
def on_response(response, response_data):
# 检查响应的 URL
if 's3/tb/creds' in response.url and response.status == 200:
# 解析响应数据并存储到 response_data 中
boto3 = response.json()
print('boto3', boto3)
response_data.append(response.json())
# 使用已保存的状态文件跳过登录状态直接访问系统
@timethis
def get_boto3_token():
with sync_playwright() as playwright:
browser = playwright.chromium.launch(
headless=True,
proxy={
# 'server': 'http://username:password@192.192.13.193:3128',
'server': 'http://username:password@192.192.14.32:3128',
# 'server': 'http://username:password@10.67.9.200:3128',
# 'server': 'http://192.192.163.177:5003',
"username": "username",
"password": "password"
}
)
# 创建浏览器上下文时加载状态文件
context = browser.new_context(
)
page = context.new_page()
should_abort = False
# 定义一个列表来存储响应数据
response_data = []
def handle_route(route):
nonlocal should_abort
# 检查当前页面是否包含 "open"
if should_abort or response_data:
print("检测到 'open',停止加载其他内容。")
route.abort() # 中止该请求
else:
route.continue_() # 继续请求
# 注册请求拦截事件
# page.on("route", handle_route)
# 直接访问登录后的URL
url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs?prefix=RESPONSE/'
# 注册请求和响应事件
page.on("response", log_response)
# page.on("route", handle_route)
page.route("*", handle_route)
page.goto(url, timeout=30000 * 3)
# 屏蔽这一段就正常了
# if page.locator("input[id=\"root_user_radio_button\"]"):
# print('find')
# page.locator("input[id=\"iam_user_radio_button\"]").click()
# page.locator("input[id=\"resolving_input\"]").fill("1111111")
# page.locator("button[id=\"next_button\"]").click()
if page.locator("input[id=\"account\"]"):
print('find')
page.locator("input[id=\"account\"]").click()
page.locator("input[id=\"account\"]").fill("1111111")
# page.locator("button[id=\"next_button\"]").click()
print('input username')
while True:
try:
page.locator("input[name=\"username\"]").fill("username")
page.locator("input[name=\"password\"]").fill("password")
page.locator("#signin_button").click()
print('break-->')
break
except:
print(datetime.now(), 'error-->')
time.sleep(2)
print('wait 6 senconds')
time.sleep(2)
cookies = page.context.cookies()
print('cookie', cookies)
url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs-tai?region=us-west-2&bucketType=general&prefix=RESPONSE/2023/&showversions=false'
# 注册请求和响应事件
# 注册响应事件处理函数
page.on("response", lambda response: on_response(response, response_data))
page.goto(url, timeout=30000 * 3)
print('page on response')
while True:
try:
cookies = page.context.cookies()
break
except:
time.sleep(2)
print('sleep 2 seconds')
soup = BeautifulSoup(page.content(), 'lxml')
meta_tag = soup.find('meta', {'name': 'tb-data'})
# 提取 content 属性的值
tb_data = meta_tag.get('content')
# 将 JSON 字符串转换为 Python 字典
tb_data_dict = json.loads(tb_data)
# 提取 CSRF 令牌
xsrf_token = tb_data_dict['csrfToken']
print('xsrf token', xsrf_token)
print('response_data',response_data)
# if not response_data:
# get_boto3_token()
# else:
# print('return boto3 token')
# page.close()
# browser.close()
# playwright.stop()
return response_data[0]
if __name__ == '__main__':
get_boto3_token()
pass