CTP获取tick数据的测试

发布时间 2023-09-07 09:48:30作者: C羽言

下面的代码,是为了测试tick数据的有效性。 在服务器上运行几天后,观察保存到csv的tick数据,哪些是无效的tick,然后总结出规则,剔除掉。

from CTP_API import thostmduserapi as mdapi
import Global_var as g
import threading, time, csv
from datetime import datetime

'''
# 以下日盘交易时间一样 9:00-10:15、10:30-11:30、13:30-15:00
郑商所 日夜盘 'FG401' 21:00-23:00
郑商所 无夜盘 'AP310'
大商所 日夜盘 'i2401' 21:00-23:00
大商所 无夜盘 'jd2310'
上期所 日夜盘 'rb2310' 21:00-23:00
上期所 日夜盘 'cu2309' 21:00-凌晨1:00
上期所 日夜盘 'au2310' 21:00-凌晨2:30
上期所 无夜盘 'wr2310'
广期所 无夜盘 'si2310'
能源中心 日夜盘 'sc2310' 21:00-凌晨2:30
能源中心 日夜盘 'bc2310' 21:00--凌晨1:00
能源中心 日夜盘 'lu2311' 21:00-23:00
能源中心 无夜盘 'ec2404'
# 中金所 均无夜盘
‘IF2309’ 9:30-11:30、13:00-15:00
‘TL2312’ 9:15-11:30、13:00-15:15

#集合竞价时间
日盘品种(指无夜盘的品种)的集合竞价时间是8:55-8:59
夜盘品种的集合竞价时间是20:55-20:59,有夜盘的品种日盘不再进行集合竞价。
'''
subID = ['FG401', 'AP310', 'i2401', 'jd2310', 'rb2310', 'cu2309', 'au2310', 'wr2310', 'si2310', 'sc2310', 'bc2310', 'lu2311', 'ec2404', 'IF2309', 'TL2312']


class Storedata(object):
    def __init__(self):
        self.files = {}
        self.files_w = {}
        for id in subID:
            tickname = f"./ticktest/{id}_tick.csv"
            tickfile = open(tickname, 'a', newline='')
            tickfile_w = csv.writer(tickfile)
            self.files['tick_' + id] = tickfile
            self.files_w['tick_' + id] = tickfile_w

    def storetick(self):
        while True:
            d = g.mddata.get()
            dt = datetime.strptime(datetime.now().strftime('%Y-%m-%d') + " " + d['UpdateTime'], "%Y-%m-%d %H:%M:%S")
            nowdt = datetime.now().replace(microsecond=0)
            mdlist = [d['TradingDay'], d['UpdateTime'], d['InstrumentID'], d['ExchangeID'], d['LastPrice'], d['Volume'], d['OpenInterest'], d['ClosePrice'], dt, nowdt]
            key = 'tick_' + d['InstrumentID']
            self.files_w[key].writerow(mdlist)
            self.files[key].flush()
            print(d['InstrumentID'], dt)


class CustomMdSpi(mdapi.CThostFtdcMdSpi):
    def __init__(self, tapi):
        mdapi.CThostFtdcMdSpi.__init__(self)
        self.tapi = tapi

    def OnFrontConnected(self):
        print("=====建立行情连接成功=====")
        # 开始登录
        loginfield = mdapi.CThostFtdcReqUserLoginField()
        loginfield.BrokerID = g.BrokerID
        loginfield.UserID = g.InvesterID
        loginfield.Password = g.InvesterPassword
        ret = self.tapi.ReqUserLogin(loginfield, 0)
        if ret == 0:
            print(">>>>>>发送登录请求成功")
        else:
            print("--->>>发送登录请求失败")

    def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
        if pRspInfo.ErrorID == 0:
            print("=====账户登录成功=====")
            print("登录时间=", datetime.now())  # pRspUserLogin.LoginTime 返回空
            print("交易日=", pRspUserLogin.TradingDay)
        else:
            print(f"返回错误--->>> ErrorID={pRspInfo.ErrorID},ErrorMsg={pRspInfo.ErrorMsg}")
        # 开始订阅行情
        ret = self.tapi.SubscribeMarketData([id.encode('utf-8') for id in subID], len(subID))
        if ret == 0:
            print(">>>>>>发送订阅行情请求成功")
        else:
            print("--->>>发送订阅行情请求失败")

    def OnRspSubMarketData(self, pSpecificInstrument, pRspInfo, nRequestID, bIsLast):
        if pRspInfo.ErrorID == 0:
            print(f"=====订阅 {pSpecificInstrument.InstrumentID} 行情成功")
        else:
            print(f"返回错误--->>> ErrorID={pRspInfo.ErrorID},ErrorMsg={pRspInfo.ErrorMsg}")

    # 获得深度行情
    def OnRtnDepthMarketData(self, pDepthMarketData):
        d = {'TradingDay': pDepthMarketData.TradingDay, 'UpdateTime': pDepthMarketData.UpdateTime, 'InstrumentID': pDepthMarketData.InstrumentID, 'ExchangeID': pDepthMarketData.ExchangeID, 'LastPrice': pDepthMarketData.LastPrice, 'Volume': pDepthMarketData.Volume, 'OpenInterest': pDepthMarketData.OpenInterest, 'ClosePrice': pDepthMarketData.ClosePrice}
        g.mddata.put(d)


def main():
    """
    获取数据的线程 不放在订阅最后一个行情成功后,是因为行情连接有时会断开,自动重连后就会重新执行。
    文件的写入是实时的,如果这边断开了,那边还没执行flush,文件就会出错,数据也就丢失了。
    """
    storedata = Storedata()
    t = threading.Thread(target=storedata.storetick)
    t.start()

    # 初始化行情线程
    g_pMdUserApi = mdapi.CThostFtdcMdApi_CreateFtdcMdApi('./con_file/')  # 创建行情实例
    pMdUserSpi = CustomMdSpi(g_pMdUserApi)  # 创建行情回调实例
    g_pMdUserApi.RegisterSpi(pMdUserSpi)  # 注册事件类
    g_pMdUserApi.RegisterFront(g.MdFrontAddr)  # 设置行情前置地址
    g_pMdUserApi.Init()  # 连接运行
    while 1:
        time.sleep(60)


if __name__ == '__main__':
    main()