5. 本地数据库
很简单的用本地Sqlite查找股票数据。
DataSource类,返回的是Dataframe物件。这个Dataframe物件,在之后的业务,如计算股票指标,还需要特别处理。
import os import sqlite3 as sqlite3 import numpy as np import pandas as pd # 数据源 class DataSource: def __init__(self): self.db = None # 数据库 self.cursor = None # 指针 self.stocks = {} # 股票池 self.indexs = {} # 指数池 self.name = ‘unit_test.db‘ # 数据源名称 def connect(self): self.db = sqlite3.connect(os.path.abspath(self.name)) self.cursor = self.db.cursor() def get_stocks(self, ucodes): # 股票池 try: self.stocks = {} self.connect() self.db.row_factory = lambda cursor, row: row[0] for ucode in ucodes: sql = """SELECT t.code, t.lot, t.nmll, t.stime, t.high, t.low, t.open, t.close, t.volume FROM (SELECT n.code, n.lot, n.nmll, c.stime, c.high, c.low, c.open, c.close, c.volume FROM s_{} AS c INNER JOIN name AS n ON c.code=n.code ORDER BY c.stime DESC LIMIT 365*20) AS t /*INNER JOIN financial AS f ON t.code=f.code AND substr(t.stime,1,4)=f.year*/ ORDER BY t.stime""".format(ucode) self.cursor.execute(sql) columns = [‘code‘, ‘lot‘, ‘nmll‘, ‘sdate‘, ‘high‘, ‘low‘, ‘open‘, ‘last‘, ‘vol‘] self.stocks[ucode] = pd.DataFrame(self.cursor.fetchall(), columns=columns) self.db.commit() self.cursor.close() self.db.close() return self.stocks except sqlite3.Error as e: print(e) def get_indexs(self, indexs): try: # 指数池 self.indexs = {} self.connect() self.db.row_factory = lambda cursor, row: row[0] for index in indexs: sql = """SELECT t.code, t.lot, t.nmll, t.stime, t.high, t.low, t.open, t.close, t.volume FROM (SELECT n.code, n.lot, n.nmll, c.stime, c.high, c.low, c.open, c.close, c.volume FROM s_{} AS c INNER JOIN name AS n ON c.code=n.code ORDER BY c.stime DESC LIMIT 365*20) AS t /*INNER JOIN financial AS f ON t.code=f.code AND substr(t.stime,1,4)=f.year*/ ORDER BY t.stime""".format(index.upper()) self.cursor.execute(sql) columns = [‘code‘, ‘lot‘, ‘nmll‘, ‘sdate‘, ‘high‘, ‘low‘, ‘open‘, ‘last‘, ‘vol‘] self.indexs[index] = pd.DataFrame(self.cursor.fetchall(), columns=columns) self.db.commit() self.cursor.close() self.db.close() return self.indexs except sqlite3.Error as e: print(e) data_source = DataSource() df1 = data_source.get_stocks([‘00700‘]) df2 = data_source.get_indexs([‘hsi‘])