aws boto3 下载文件

时间:2024-10-30 09:40:34
import os, json, urllib, base64 import time, re from datetime import datetime from playwright.sync_api import Playwright, sync_playwright, expect from bs4 import BeautifulSoup from functools import wraps proxy = 'http://username:password@192.192.14.32:3128' proxies = { 'http': proxy, 'https': proxy } # 缓存目录 CACHE_DIR = (r'D:\code\aws_s3\cache') # 确保缓存目录存在 os.makedirs(CACHE_DIR, exist_ok=True) def timethis(func): ''' Decorator that reports the execution time :param func: :return: ''' @wraps(func) def wrapper(*args, **kwargs): start = time.time() s1 = datetime.now() result = func(*args, **kwargs) end = time.time() s2 = datetime.now() func_name = func.__name__ consume = end - start consume2 = s2 - s1 print(f'{func_name} consume time is ---> {consume}') print(f'{func_name} consume minutes is ---> {consume2}') return result return wrapper def handle_route(route): # 获取请求的 URL url = route.request.url resource_type = route.request.resource_type url = route.request.url resource_type = route.request.resource_type block_list = [ # 'telemetry', "browserCreds", 'module-utils.js', # 'svg', 'gif', 'image', # 'module', 'panoramaroute', 'log', 'tele', 'index', 'util', 'css' ] if any(x in url for x in block_list): # print(f"---: {url} (包含 'dist')") route.abort() # 中止该请求 return # print(f"处理请求: {url} ({resource_type})") # 生成对应的缓存文件名 # 使用安全的 URL 名称 file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json" cache_file = os.path.join(CACHE_DIR, file_name) # 检查缓存文件是否存在 if os.path.exists(cache_file): # print(f"从缓存加载: {url}") # 从缓存文件加载数据 try: with open(cache_file, 'r') as f: cached_response = json.load(f) # 模拟返回缓存的响应 route.fulfill( status=cached_response['status'], headers=cached_response['headers'], body=base64.b64decode(cached_response['body']) # 解码 body ) except: pass else: # 继续请求并缓存响应 route.continue_() def log_response(response): url = response.url resource_type = response.request.resource_type # 仅缓存 CSS、JS 和图片文件 if resource_type in ['script', 'stylesheet', 'image']: file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json" cache_file = os.path.join(CACHE_DIR, file_name) # 只有在成功状态时才缓存响应 if response.status == 200: try: response_body = { 'status': response.status, 'headers': dict(response.headers), 'body': base64.b64encode(response.body()).decode('utf-8') # 确保调用 body() 方法获取字节 } # 将响应写入缓存文件 with open(cache_file, 'w') as f: json.dump(response_body, f) # print(f"缓存资源: {url}") except Exception as e: # print('cache error', url) pass requests_info = {} def log_request(request): # 记录请求的开始时间 requests_info[request.url] = { 'start_time': time.time() # 记录当前时间(开始时间) } def on_response(response, response_data): # 检查响应的 URL if 's3/tb/creds' in response.url and response.status == 200: # 解析响应数据并存储到 response_data 中 boto3 = response.json() print('boto3', boto3) response_data.append(response.json()) # 使用已保存的状态文件跳过登录状态直接访问系统 @timethis def get_boto3_token(): with sync_playwright() as playwright: browser = playwright.chromium.launch( headless=True, proxy={ # 'server': 'http://username:password@192.192.13.193:3128', 'server': 'http://username:password@192.192.14.32:3128', # 'server': 'http://username:password@10.67.9.200:3128', # 'server': 'http://192.192.163.177:5003', "username": "username", "password": "password" } ) # 创建浏览器上下文时加载状态文件 context = browser.new_context( ) page = context.new_page() should_abort = False # 定义一个列表来存储响应数据 response_data = [] def handle_route(route): nonlocal should_abort # 检查当前页面是否包含 "open" if should_abort or response_data: print("检测到 'open',停止加载其他内容。") route.abort() # 中止该请求 else: route.continue_() # 继续请求 # 注册请求拦截事件 # page.on("route", handle_route) # 直接访问登录后的URL url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs?prefix=RESPONSE/' # 注册请求和响应事件 page.on("response", log_response) # page.on("route", handle_route) page.route("*", handle_route) page.goto(url, timeout=30000 * 3) # 屏蔽这一段就正常了 # if page.locator("input[id=\"root_user_radio_button\"]"): # print('find') # page.locator("input[id=\"iam_user_radio_button\"]").click() # page.locator("input[id=\"resolving_input\"]").fill("1111111") # page.locator("button[id=\"next_button\"]").click() if page.locator("input[id=\"account\"]"): print('find') page.locator("input[id=\"account\"]").click() page.locator("input[id=\"account\"]").fill("1111111") # page.locator("button[id=\"next_button\"]").click() print('input username') while True: try: page.locator("input[name=\"username\"]").fill("username") page.locator("input[name=\"password\"]").fill("password") page.locator("#signin_button").click() print('break-->') break except: print(datetime.now(), 'error-->') time.sleep(2) print('wait 6 senconds') time.sleep(2) cookies = page.context.cookies() print('cookie', cookies) url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs-tai?region=us-west-2&bucketType=general&prefix=RESPONSE/2023/&showversions=false' # 注册请求和响应事件 # 注册响应事件处理函数 page.on("response", lambda response: on_response(response, response_data)) page.goto(url, timeout=30000 * 3) print('page on response') while True: try: cookies = page.context.cookies() break except: time.sleep(2) print('sleep 2 seconds') soup = BeautifulSoup(page.content(), 'lxml') meta_tag = soup.find('meta', {'name': 'tb-data'}) # 提取 content 属性的值 tb_data = meta_tag.get('content') # 将 JSON 字符串转换为 Python 字典 tb_data_dict = json.loads(tb_data) # 提取 CSRF 令牌 xsrf_token = tb_data_dict['csrfToken'] print('xsrf token', xsrf_token) print('response_data',response_data) # if not response_data: # get_boto3_token() # else: # print('return boto3 token') # page.close() # browser.close() # playwright.stop() return response_data[0] if __name__ == '__main__': get_boto3_token() pass