tushare是一个开放的,免费的金融数据平台,包含沪深股票数据,指数数据,基金数据,期货数据,期权数据,债券数据,外汇数据,港股数据,行业经济数据,宏观经济数据以及新闻快讯等特色数据。其中以沪深股票数据最为丰富,包含了有:
基本包含了沪深股票全部常用数据。
tushare 目前提供了四种获取数据的方式,分别为 http, Python SDK, Matlab SDK, R SDK。
这里介绍如何用Python SDK获取股票的每日指标数据。
(1)注册tushare用户,获取 token
注册网页链接为 https://tushare.pro/register?reg=369571
注册完成后可以在个人主页的接口TOKEN下看到自己的token
(2)安装 tushare
个人使用的python开发的IDE为 pycharm
pip install tushare -i https://pypi.tuna.tsinghua.edu.cn/simple
tushare依赖了numpy,pandas等一些库,安装完之后可能需要根据报错提示安装对应的库
(4)安装Elasticsearch
pip3 install elasticsearch -i https://pypi.tuna.tsinghua.edu.cn/simple
(5)调用tushare
这里把调用tushare的函数都封装在了一个文件里面,代码如下
import datetime import time import numpy as np import tushare as ts ts.set_token(\'b15148f5ca285bd0e85bbc3f659daefff549ade3bba06fae6a037f03\') pro = ts.pro_api() # 股票列表 def get_all_stock(): stocks = pro.stock_basic(exchange=\'\', list_status=\'L\', fields=\'ts_code,symbol,name,fullname,area,industry,list_date\') return stocks # 每日指标 def get_daily_basic(share_code, start_date, end_date): while 1: try: df = pro.daily_basic(ts_code=share_code, start_date=start_date, end_date=end_date, timeout=60) return df except: print("get_daily_basic 获取失败,参数为:", share_code, start_date, end_date) time.sleep(0.5)
stock_basic接口用于获取股票列表,本接口文档网址:https://tushare.pro/document/2?doc_id=25
daily_basic接口用于获取每日指标,网址:https://tushare.pro/document/2?doc_id=32。 这里用一个循环来获取,因为tushare对每分钟调用次数有限制(这也是为啥我要把数据保存到本地),超过次数限制时会报错,所以我这里用一个except获取异常,等待0.5s后重新再试。
(6)保存到elasticsearch
保存到elasticsearch之前当然需要本机已经启动了elasticsearch。
关于elasticsearch的安装配置见我的另一篇博客https://www.cnblogs.com/betterwgo/p/11240821.html
python 调用 tushare,并将数据保存到elasticsearch的代码如下:
# 每日指标 import configparser import logging import numpy as np from elasticsearch import Elasticsearch from elasticsearch import helpers import stock_parser as parser logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("log_daily_basic.txt") handler.setLevel(logging.INFO) formatter = logging.Formatter(\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\') handler.setFormatter(formatter) logger.addHandler(handler) logger.info("Start print log") config = configparser.ConfigParser() config.read("config.ini") latest_daily_basic_tscode = config.get("daily", "latest_daily_basic_tscode") es = Elasticsearch([{\'host\': \'127.0.0.1\', \'port\': 9200}]) # ts_code str TS股票代码 # trade_date str 交易日期 # close float 当日收盘价 # turnover_rate float 换手率(%) # turnover_rate_f float 换手率(*流通股) # volume_ratio float 量比 # pe float 市盈率(总市值/净利润) # pe_ttm float 市盈率(TTM) # pb float 市净率(总市值/净资产) # ps float 市销率 # ps_ttm float 市销率(TTM) # total_share float 总股本 (万股) # float_share float 流通股本 (万股) # free_share float *流通股本 (万) # total_mv float 总市值 (万元) # circ_mv float 流通市值(万元) body = { "mappings": { "properties": { "ts_code": { "type": "keyword" }, "trade_date": { "type": "integer" }, "close": { "type": "float" }, "turnover_rate": { "type": "float" }, "turnover_rate_f": { "type": "float" }, "volume_ratio": { "type": "float" }, "pe": { "type": "float" }, "pe_ttm": { "type": "float" }, "pb": { "type": "float" }, "ps": { "type": "float" }, "ps_ttm": { "type": "float" }, "total_share": { "type": "float" }, "float_share": { "type": "float" }, "free_share": { "type": "float" }, "total_mv": { "type": "float" }, "circ_mv": { "type": "float" } } } } index = \'index_daily_basic\' es.indices.create(index=index, body=body, ignore=400) def check_float(item, x_name): x = item[x_name] if x is None or np.isnan(x): x = 0.0 logger.info("%s %s %s is None or nan" % (item[\'ts_code\'], item[\'trade_date\'], x_name)) return x def es_insert_daily_basic(df): actions = [] for i in range(len(df)): df_item = df.iloc[i] tscode = df_item[\'ts_code\'] trade_date = int(df_item[\'trade_date\']) x = tscode.split(\'.\', 1) col_name = x[1] + x[0] _id = col_name + df_item[\'trade_date\'] close = check_float(df_item, \'close\') turnover_rate = check_float(df_item, \'turnover_rate\') turnover_rate_f = check_float(df_item, \'turnover_rate_f\') volume_ratio = check_float(df_item, \'volume_ratio\') pe = check_float(df_item, \'pe\') pe_ttm = check_float(df_item, \'pe_ttm\') pb = check_float(df_item, \'pb\') ps = check_float(df_item, \'ps\') ps_ttm = check_float(df_item, \'ps_ttm\') total_share = check_float(df_item, \'total_share\') float_share = check_float(df_item, \'float_share\') free_share = check_float(df_item, \'free_share\') total_mv = check_float(df_item, \'total_mv\') circ_mv = check_float(df_item, \'circ_mv\') action = { "_index": index, "_type": "_doc", "_id": _id, "_source": { "ts_code": ts_code, "trade_date": trade_date, "close": close, "turnover_rate": turnover_rate, "turnover_rate_f": turnover_rate_f, "volume_ratio": volume_ratio, "pe": pe, "pe_ttm": pe_ttm, "pb": pb, "ps": ps, "ps_ttm": ps_ttm, "total_share": total_share, "float_share": float_share, "free_share": free_share, "total_mv": total_mv, "circ_mv": circ_mv } } # 形成一个长度与查询结果数量相等的列表 actions.append(action) if i % 1000 == 0 or i == (len(df) - 1): helpers.bulk(client=es, actions=actions) actions.clear() actions.clear() def update_latest_daily_basic_tscode(tscode): config.set("daily", "latest_daily_basic_tscode", tscode) # write to file with open("config.ini", "w+") as f: config.write(f) # 更新单只股票 def update_daily_basic(tscode, start_date, end_date): df = parser.get_daily_basic(tscode, start_date, end_date) es_insert_daily_basic(df) return len(df) if __name__ == "__main__": # 获取全部上市股票代码 stocks = parser.get_all_stock() bIn = True for i in range(len(stocks)): stock = stocks.iloc[i] ts_code = stock[\'ts_code\'] if latest_daily_basic_tscode == ts_code: bIn = False if not bIn: count = update_daily_basic(ts_code, \'20000101\', \'\') print(i, ts_code, count) update_latest_daily_basic_tscode(ts_code) else: print(i, ts_code)
这里日志用的logging,没具体研究一股脑全搬上来了,反正我只需要打印个错误日志就行。
然后还用了一个 configparser 来解析 ini 配置文件,config.ini文件中配置如下信息:
[daily]
latest_daily_basic_tscode = 000001.SZ
配置文件的目的是再程序中断后重新启动不用从第一个开始,直接从配置文件中的开始。获取股票列表的接口的第一条是 000001.SZ,所以这里初始配置为它,这里其实可以优化一下。
数据保存到elasticsearch用的是 helps中的bulk函数,做批量索引
看一下保存的结果情况:
tushare注册: https://tushare.pro/register?reg=369571