tushare获取股票每日重要的基本面指标数据,并存入Elasticsearch

时间:2024-03-06 20:45:59

tushare是一个开放的,免费的金融数据平台,包含沪深股票数据,指数数据,基金数据,期货数据,期权数据,债券数据,外汇数据,港股数据,行业经济数据,宏观经济数据以及新闻快讯等特色数据。其中以沪深股票数据最为丰富,包含了有:


基本包含了沪深股票全部常用数据。


tushare 目前提供了四种获取数据的方式,分别为 http, Python SDK, Matlab SDK, R SDK。

这里介绍如何用Python SDK获取股票的每日指标数据。


(1)注册tushare用户,获取 token

    注册网页链接为 https://tushare.pro/register?reg=369571

    注册完成后可以在个人主页的接口TOKEN下看到自己的token

image


(2)安装 tushare

个人使用的python开发的IDE为 pycharm

pip install tushare -i https://pypi.tuna.tsinghua.edu.cn/simple

tushare依赖了numpy,pandas等一些库,安装完之后可能需要根据报错提示安装对应的库


(4)安装Elasticsearch

pip3 install elasticsearch -i https://pypi.tuna.tsinghua.edu.cn/simple

 

(5)调用tushare

这里把调用tushare的函数都封装在了一个文件里面,代码如下

import datetime
import time
import numpy as np
import tushare as ts

ts.set_token(\'b15148f5ca285bd0e85bbc3f659daefff549ade3bba06fae6a037f03\')
pro = ts.pro_api()


# 股票列表
def get_all_stock():
    stocks = pro.stock_basic(exchange=\'\', list_status=\'L\', fields=\'ts_code,symbol,name,fullname,area,industry,list_date\')
    return stocks
    

# 每日指标
def get_daily_basic(share_code, start_date, end_date):
    while 1:
        try:
            df = pro.daily_basic(ts_code=share_code, start_date=start_date, end_date=end_date, timeout=60)
            return df
        except:
            print("get_daily_basic 获取失败,参数为:", share_code, start_date, end_date)
            time.sleep(0.5)

stock_basic接口用于获取股票列表,本接口文档网址:https://tushare.pro/document/2?doc_id=25

daily_basic接口用于获取每日指标,网址:https://tushare.pro/document/2?doc_id=32。 这里用一个循环来获取,因为tushare对每分钟调用次数有限制(这也是为啥我要把数据保存到本地),超过次数限制时会报错,所以我这里用一个except获取异常,等待0.5s后重新再试。


(6)保存到elasticsearch

保存到elasticsearch之前当然需要本机已经启动了elasticsearch。

关于elasticsearch的安装配置见我的另一篇博客https://www.cnblogs.com/betterwgo/p/11240821.html

python 调用 tushare,并将数据保存到elasticsearch的代码如下:

# 每日指标
import configparser
import logging

import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch import helpers

import stock_parser as parser

logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("log_daily_basic.txt")
handler.setLevel(logging.INFO)
formatter = logging.Formatter(\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("Start print log")

config = configparser.ConfigParser()
config.read("config.ini")
latest_daily_basic_tscode = config.get("daily", "latest_daily_basic_tscode")

es = Elasticsearch([{\'host\': \'127.0.0.1\', \'port\': 9200}])

# ts_code                str    TS股票代码
# trade_date            str    交易日期
# close                    float    当日收盘价
# turnover_rate            float    换手率(%)
# turnover_rate_f        float    换手率(*流通股)
# volume_ratio            float    量比
# pe                    float    市盈率(总市值/净利润)
# pe_ttm                float    市盈率(TTM)
# pb                    float    市净率(总市值/净资产)
# ps                    float    市销率
# ps_ttm                float    市销率(TTM)
# total_share            float    总股本 (万股)
# float_share            float    流通股本 (万股)
# free_share            float    *流通股本 (万)
# total_mv                float    总市值 (万元)
# circ_mv                float    流通市值(万元)
body = {
    "mappings": {
        "properties": {
            "ts_code": {
                "type": "keyword"
            },
            "trade_date": {
                "type": "integer"
            },
            "close": {
                "type": "float"
            },
            "turnover_rate": {
                "type": "float"
            },
            "turnover_rate_f": {
                "type": "float"
            },
            "volume_ratio": {
                "type": "float"
            },
            "pe": {
                "type": "float"
            },
            "pe_ttm": {
                "type": "float"
            },
            "pb": {
                "type": "float"
            },
            "ps": {
                "type": "float"
            },
            "ps_ttm": {
                "type": "float"
            },
            "total_share": {
                "type": "float"
            },
            "float_share": {
                "type": "float"
            },
            "free_share": {
                "type": "float"
            },
            "total_mv": {
                "type": "float"
            },
            "circ_mv": {
                "type": "float"
            }
        }
    }
}
index = \'index_daily_basic\'
es.indices.create(index=index, body=body, ignore=400)


def check_float(item, x_name):
    x = item[x_name]
    if x is None or np.isnan(x):
        x = 0.0
        logger.info("%s %s %s is None or nan" % (item[\'ts_code\'], item[\'trade_date\'], x_name))
    return x


def es_insert_daily_basic(df):
    actions = []
    for i in range(len(df)):
        df_item = df.iloc[i]
        tscode = df_item[\'ts_code\']
        trade_date = int(df_item[\'trade_date\'])
        x = tscode.split(\'.\', 1)
        col_name = x[1] + x[0]
        _id = col_name + df_item[\'trade_date\']

        close = check_float(df_item, \'close\')
        turnover_rate = check_float(df_item, \'turnover_rate\')
        turnover_rate_f = check_float(df_item, \'turnover_rate_f\')
        volume_ratio = check_float(df_item, \'volume_ratio\')
        pe = check_float(df_item, \'pe\')
        pe_ttm = check_float(df_item, \'pe_ttm\')
        pb = check_float(df_item, \'pb\')
        ps = check_float(df_item, \'ps\')
        ps_ttm = check_float(df_item, \'ps_ttm\')
        total_share = check_float(df_item, \'total_share\')
        float_share = check_float(df_item, \'float_share\')
        free_share = check_float(df_item, \'free_share\')
        total_mv = check_float(df_item, \'total_mv\')
        circ_mv = check_float(df_item, \'circ_mv\')
        action = {
            "_index": index,
            "_type": "_doc",
            "_id": _id,
            "_source": {
                "ts_code": ts_code,
                "trade_date": trade_date,
                "close": close,
                "turnover_rate": turnover_rate,
                "turnover_rate_f": turnover_rate_f,
                "volume_ratio": volume_ratio,
                "pe": pe,
                "pe_ttm": pe_ttm,
                "pb": pb,
                "ps": ps,
                "ps_ttm": ps_ttm,
                "total_share": total_share,
                "float_share": float_share,
                "free_share": free_share,
                "total_mv": total_mv,
                "circ_mv": circ_mv
            }
        }
        # 形成一个长度与查询结果数量相等的列表
        actions.append(action)
        if i % 1000 == 0 or i == (len(df) - 1):
            helpers.bulk(client=es, actions=actions)
            actions.clear()
    actions.clear()


def update_latest_daily_basic_tscode(tscode):
    config.set("daily", "latest_daily_basic_tscode", tscode)
    # write to file
    with open("config.ini", "w+") as f:
        config.write(f)


# 更新单只股票
def update_daily_basic(tscode, start_date, end_date):
    df = parser.get_daily_basic(tscode, start_date, end_date)
    es_insert_daily_basic(df)
    return len(df)


if __name__ == "__main__":
    # 获取全部上市股票代码
    stocks = parser.get_all_stock()
    bIn = True
    for i in range(len(stocks)):
        stock = stocks.iloc[i]
        ts_code = stock[\'ts_code\']
        if latest_daily_basic_tscode == ts_code:
            bIn = False
        if not bIn:
            count = update_daily_basic(ts_code, \'20000101\', \'\')
            print(i, ts_code, count)
            update_latest_daily_basic_tscode(ts_code)
        else:
            print(i, ts_code)

这里日志用的logging,没具体研究一股脑全搬上来了,反正我只需要打印个错误日志就行。

然后还用了一个  configparser 来解析 ini 配置文件,config.ini文件中配置如下信息:

[daily]
latest_daily_basic_tscode = 000001.SZ

配置文件的目的是再程序中断后重新启动不用从第一个开始,直接从配置文件中的开始。获取股票列表的接口的第一条是 000001.SZ,所以这里初始配置为它,这里其实可以优化一下。

数据保存到elasticsearch用的是 helps中的bulk函数,做批量索引

看一下保存的结果情况:

image






tushare注册: https://tushare.pro/register?reg=369571