16.14、异步任务:

16.14.1、使用协程任务:

函数create_task()用来创建协程任务,并将任务加入事件循环以实现异步并发。

wait_update()不能用在协程中,若在协程中等待业务更新,可调用register_update_notify函数把业务数据注册到TqChan,当业务数据有更新时会通知该TqChan,在协程里就可以用实时更新的业务数据运算。例如:

from tqsdk import TqApi, TqAuth

api = TqApi(auth=TqAuth("信易账号", "密码"))

quote1 = api.get_quote("CFFEX.T2103")

quote2 = api.get_quote("CFFEX.TF2103")

#带有业务更新的协程

async def demo(quote):

#将quote注册到TqChan命名为update_chan

async with api.register_update_notify(quote) as update_chan:

async for _ in update_chan: #遍历队列通知

print('品种:',quote.instrument_name,'最新价:',quote.last_price)

#无业务更新的协程

async def func():

return quote1.instrument_name,quote2.instrument_name

# 创建task1、task2,把quote1、quote2注册到TqChan

task1=api.create_task(demo(quote1))

task2=api.create_task(demo(quote2))

#把带有返回值的协程创建成task3

task3=api.create_task(func())

while True:

api.wait_update()

if task3.done(): #task3结束后,获取返回值

print(task3.result())

'''输出结果为:('债十2103', '债五2103')品种: 债十2103 最新价: 97.435('债十2103', '债五2103')品种: 债十2103 最新价: 97.435('债十2103', '债五2103')品种: 债十2103 最新价: 97.43('债十2103', '债五2103')品种: 债十2103 最新价: 97.43'''

16.14.2、使用多线程:

当用户策略实例很多,导致网络连接数无法容纳时,可以使用多线程。首先需要在主线程中创建一个 TqApi 实例 api_master,并用 TqApi.copy 函数创建多个slave副本,把slave副本用在多个线程中,主线程里的api_master 仍然需要持续调用 wait_update。

子线程和主线程其实是运行在同一个事件循环里,如果在子线程里调用api_slave.close()会引发主线程事件循环关闭的异常,如果主线程里调用api_master.close(),子线程可能因等待事件循环响应而阻塞,若想让子线程和主线程一起退出,可设置子线程为守护线程。

使用多线程需要自定义一个线程类,并重写run函数,在run函数里执行策略代码,例如:

import threading

from tqsdk import TqApi, TqAuth

#自定义线程类

class WorkerThread(threading.Thread):

def __init__(self, api, symbol):

threading.Thread.__init__(self)

self.api = api #初始化参数

self.symbol = symbol #初始化参数

#重写run函数,策略代码写在run函数中

def run(self):

SHORT = 30 # 短周期

LONG = 60 # 长周期

data_length = LONG + 2 # k线数据长度

klines = self.api.get_kline_serial(self.symbol, duration_seconds=60, data_length=data_length)

target_pos = TargetPosTask(self.api, self.symbol)

while True:

self.api.wait_update()

if self.api.is_changing(klines.iloc[-1], "datetime"): # 产生新k线:重新计算SMA

short_avg = ma(klines["close"], SHORT) # 短周期

long_avg = ma(klines["close"], LONG) # 长周期

if long_avg.iloc[-2] < short_avg.iloc[-2] and long_avg.iloc[-1] > short_avg.iloc[-1]:

target_pos.set_target_volume(-3)

print("均线下穿,做空")

if short_avg.iloc[-2] < long_avg.iloc[-2] and short_avg.iloc[-1] > long_avg.iloc[-1]:

target_pos.set_target_volume(3)

print("均线上穿,做多")

if __name__ == "__main__":

#主线程创建TqApi实例

api_master = TqApi(auth=TqAuth("信易账号", "密码"))

# 实例化线程类,传入TqApi实例的副本api_master.copy()

thread1 = WorkerThread(api_master.copy(), "SHFE.cu1901")

thread2 = WorkerThread(api_master.copy(), "SHFE.rb1901")

# 启动线程实例

thread1.start()

thread2.start()

while True:

api_master.wait_update() #主线程保持对wait_update()的调用

当线程太多时,操作系统因调度线程,可能把主要工作都用在了调度线程上,而降低了多线程的效率,更宜使用异步协程实现多策略。

16.14.3、使用多进程:

当程序比较耗CPU时,可以采用多进程,比如回测时,需要对大量的数据计算,可以用多个进程同时回测多个品种,注意: 由于服务器流控限制, 同时执行的回测任务请勿超过10个,例如:

from tqsdk import TqApi, TqAuth, TqSim, TargetPosTask, BacktestFinished, TqBacktest

from tqsdk.tafunc import ma

from datetime import date

import multiprocessing

from multiprocessing import Pool

def MyStrategy(SHORT):

LONG = 60

SYMBOL = "SHFE.cu1907"

acc = TqSim()

try:

api = TqApi(acc, backtest=TqBacktest(start_dt=date(2019, 5, 6), end_dt=date(2019, 5, 10)), auth=TqAuth("信易账户", "账户密码"))

data_length = LONG + 2

klines = api.get_kline_serial(SYMBOL, duration_seconds=60, data_length=data_length)

target_pos = TargetPosTask(api, SYMBOL)

while True:

api.wait_update()

if api.is_changing(klines.iloc[-1], "datetime"):

short_avg = ma(klines.close, SHORT)

long_avg = ma(klines.close, LONG)

if long_avg.iloc[-2] < short_avg.iloc[-2] and long_avg.iloc[-1] > short_avg.iloc[-1]:

target_pos.set_target_volume(-3)

if short_avg.iloc[-2] < long_avg.iloc[-2] and short_avg.iloc[-1] > long_avg.iloc[-1]:

target_pos.set_target_volume(3)

except BacktestFinished:

api.close()

print("SHORT=", SHORT, "最终权益=", acc.account.balance) # 每次回测结束时, 输出使用的参数和最终权益

if __name__ == '__main__':

#提供冻结以产生 Windows 可执行文件的支持,在非 Windows 平台上是无效的

multiprocessing.freeze_support()

p = Pool(4) # 进程池, 建议小于cpu数

for s in range(20, 40):

p.apply_async(MyStrategy, args=(s,)) # 把20个回测任务交给进程池执行

print('Waiting for all subprocesses done…')

p.close()

p.join()

print('All subprocesses done.')

17、TqSdk部分函数解读

17.1、DIFF 协议:

DIFF (Differential Information Flow for Finance) 是一个基于websocket和json的应用层协议。websocket是全双工通信,当客户端和服务器端建立连接后,就可以相互发数据,建立连接又称为“握手”,“握手”成功就可以建立通信了,不用在每次需要传输信息时重新建立连接,即不会“掉线”。json是数据存储格式,json数据可以方便的反序列化为Python数据。

DIFF协议可以简单的理解为服务端和客户端的通信方式,协议规定了数据格式,以便于服务端和客户端可以解读对方发来的数据。

DIFF 协议分为两部分:数据访问和数据传输。

17.1.1、数据传输:

DIFF 协议要求服务端将业务数据以JSON Merge Patch的格式推送给客户端,JSON Merge Patch的格式形如Python字典,可以在客户端反序列化为Python字典(其实是映射类型Entity)。例如:

{

"aid": "rtn_data", # 业务信息截面更新

"data": [ # 数据更新数组

{

"balance": 10237421.1, # 账户资金

},

{

"float_profit": 283114.780999997, # 浮动盈亏

},

{

"quotes":{

"SHFE.cu1612": {

"datetime": "2016-12-30 14:31:02.000000",

"last_price": 36605.0, # 最新价

"volume": 25431, # 成交量

"pre_close": 36170.0, # 昨收

}

}

}

]

}aid 字段值即为数据包类型,"aid":"rtn_data"表示该包的类型为业务信息截面更新包。整个 data 数组相当于一个事务,其中的每一个元素都是一个 JSON Merge Patch,处理完整个数组后业务截面即完成了从上一个时间截面推进到下一个时间截面。

DIFF 协议要求客户端发送 peek_message 数据包以获得业务信息截面更新,例如:

{

"aid": "peek_message"

}服务端在收到 peek_message 数据包后应检查是否有数据更新,如果有则应将更新内容立即发送给客户端,如果没有则应等到有更新发生时再回应客户端。

服务端发送 rtn_data 数据包后可以等收到下一个 peek_message 后再发送下一个 rtn_data 数据包。

一个简单的客户端实现可以在连接成功后及每收到一个 rtn_data 数据包后发送一个 peek_message 数据包,这样当客户端带宽不足时会自动降低业务信息截面的更新频率以适应低带宽。

当数据包中的 aid 字段不是 rtn_data 或 peek_message 则表示该包为一个指令包,具体指令由各业务模块定义,例如 subscribe_quote 表示订阅行情,insert_order 表示下单。由于客户端和服务端存在网络通讯延迟,客户端的指令需要过一段时间才会影响到业务信息截面中的业务数据,为了使客户端能分辨出服务端是否处理了该指令,通常服务端会将客户端的请求以某种方式体现在截面中(具体方式由各业务模块定义)。例如 subscribe_quote 订阅行情时服务端会将业务截面中的 ins_list 字段更新为客户端订阅的合约列表,这样当客户端检查服务端发来的业务截面时如果 ins_list 包含了客户端订阅的某个合约说明服务端处理了订阅指令,但若 quotes 没有该合约则说明该合约不存在订阅失败。

服务端发送包含"aid":"rtn_data"字段的业务数据截面更新包,客户端发送包含"aid":"peek_message"字段的数据包请求业务数据截面,或发送包含"aid":"subscribe_quote "、"aid":"insert_order"等字段的指令包,如此,服务端和客户端相互发信息,服务端和客户端根据字段识别数据及处理数据。

17.1.2、数据访问:

DIFF 协议要求服务端维护一个业务信息截面,例如:

{

"account_id": "41007684", # 账号

"static_balance": 9954306.319000003, # 静态权益

"balance": 9963216.550000003, # 账户资金

"available": 9480176.150000002, # 可用资金

"float_profit": 8910.231, # 浮动盈亏

"risk_ratio": 0.048482375, # 风险度

"using": 11232.23, # 占用资金

"position_volume": 12, # 持仓总手数

"ins_list": "SHFE.cu1609,…." # 行情订阅的合约列表

"quotes":{ # 所有订阅的实时行情

"SHFE.cu1612": {

"instrument_id": "SHFE.cu1612",

"datetime": "2016-12-30 13:21:32.500000",

"ask_priceN": 36590.0, #卖N价

"ask_volumeN": 121, #卖N量

"bid_priceN": 36580.0, #买N价

"bid_volumeN": 3, #买N量

"last_price": 36580.0, # 最新价

"highest": 36580.0, # 最高价

"lowest": 36580.0, # 最低价

"amount": 213445312.5, # 成交额

"volume": 23344, # 成交量

"open_interest": 23344, # 持仓量

"pre_open_interest": 23344, # 昨持

"pre_close": 36170.0, # 昨收

"open": 36270.0, # 今开

"close" : "-", # 收盘

"lower_limit": 34160.0, #跌停

"upper_limit": 38530.0, #涨停

"average": 36270.1 #均价

"pre_settlement": 36270.0, # 昨结

"settlement": "-", # 结算价

},

}

}

对应的客户端也维护了一个该截面的镜像,因此业务层可以简单同步的访问到全部业务数据。

TqSdk即是客户端,TqSdk把收到的业务数据截面以上面的格式合并到_data属性里,_data为多层嵌套的映射类型Entity,业务数据例如“quotes”,也是Entity,其“键”是合约代码,例如“SHFE.cu1612”,其“值”是最终的业务数据——Quote对象,业务函数get_quote()便是把_data里的Quote对象的一个引用返回给调用方,调用方获得的是Quote对象的动态引用。

_data是可变映射类型,会接收服务端发来的更新,因此业务函数返回的对象引用也会指向随时更新的业务数据。

17.2、业务函数:

以get_quote()为例,上节已经介绍了get_quote()与_data的关系,现在我们结合函数的代码再看下其执行过程,我们只取代码的主要部分:

def get_quote(self, symbol: str) -> Quote:

# 从_data属性中提取Quote

quote = _get_obj(self._data, ["quotes", symbol], self._prototype["quotes"]["#"])

# 若合约symbol是新添加的合约,则向服务端发送订阅该合约的指令包

if symbol not in self._requests["quotes"]:

self._requests["quotes"].add(symbol)

self._send_pack({

"aid": "subscribe_quote",

"ins_list": ",".join(self._requests["quotes"]),

})

#返回quote,其指向的是_data中的Quote

return quote

其他的业务函数工作逻辑类似。业务对象Quote、Trade、Order、Position、Account等都是Entity的子类,可以像类一样获取其属性,也可以像字典一样使用。业务对象在模块objs中定义。

17.3、insert_order():

insert_order用来下单,我们只截取主要代码看一下执行过程:

def insert_order(…) -> Order:

"""发送下单指令. **注意: 指令将在下次调用** :py:meth:`~tqsdk.api.TqApi.wait_update` **时发出**"""

if self._loop.is_running(): #事件循环正在运行

# 把下单请求函数打包成task排入事件循环

self.create_task(self._insert_order_async(…))

#下单后获取委托单order

order = self.get_order(order_id, account=account)

#更新委托单字段

order.update({"order_id": order_id,"exchange_id": exchange_id,…})

return order #返回委托单

else: #事件循环还未运行

#打包一个指令包

pack = self._get_insert_order_pack(…)

#发送指令包

self._send_pack(pack)

##下单后获取委托单order

order = self.get_order(order_id, account=account)

#更新委托单字段

order.update({ "order_id": order_id,"exchange_id": exchange_id,…})

return order #返回委托单

#发送指令包函数

def _send_pack(self, pack):

#立即向队列发送指令包

if not self._is_slave:

self._send_chan.send_nowait(pack)

else:

self._master._slave_send_pack(pack)

#下单请求函数

async def _insert_order_async(…):

#打包一个指令包

pack = self._get_insert_order_pack(…)

#发送指令包

self._send_pack(pack)

下单的主要流程为:用协程任务打包一个指令包再发出去。create_task是无阻塞的,创建完task立即返回,get_order获取委托单也是无阻塞的,因此insert_order执行后会立即返回一个Order对象引用——order,不会等待委托单成交与否。

create_task会在下单函数发送出指令包后(执行结束)停止事件循环,(主线程在执行时事件循环可能已经是停止状态),需要在调用wait_update启动事件循环时再从队列取出指令包并发送向服务端。

17.4、create_task():

create_task用来把协程打包成Task对象,以便于在事件循环中并发执行,我们看下函数的代码:

def create_task(self, coro: asyncio.coroutine) -> asyncio.Task:

task = self._loop.create_task(coro) #把协程打包成Task

# 获取当前正在运行的Task

current_task = asyncio.Task.current_task(loop=self._loop)\

if (sys.version_info[0] == 3 and sys.version_info[1] < 7) else asyncio.current_task(loop=self._loop)

# 当前Task没有正在运行,则将刚创建的task添加进_tasks

if current_task is None:

self._tasks.add(task)

task.add_done_callback(self._on_task_done) #为task添加结束时会调用的函数

return task #返回task

函数asyncio.current_task(loop=self._loop)用来返回正在运行的Task,如果没有正在运行的Task则返回None。

_tasks是由api维护的所有根task,不包含子task,子task由其父task维护。

add_done_callback()用来为Task添加一个回调,回调将在 Task 对象完成时被运行。

_on_task_done()函数用来将执行结束的task从_tasks里移除,并停止事件循环,执行结束包括正常结束和遇到异常结束。函数代码如下:

def _on_task_done(self, task):

"""当由 api 维护的 task 执行完成后取出运行中遇到的例外并停止 ioloop"""

try:

exception = task.exception()#返回 Task 对象的异常,如果没有异常返回None

if exception:

self._exceptions.append(exception)

except asyncio.CancelledError:

pass

finally:

self._tasks.remove(task)

self._loop.stop()

self._loop.stop()停止事件循环,以使wait_update()释放,让进程后续任务获得动作机会,并等待再次调用wait_update()。

TqSdk中大量用到了create_task创建Task,而Task执行结束后会调用回调函数_on_task_done()停止事件循环,而且主线程在执行时(取得了控制权)事件循环可能已经是停止状态,因此需要循环调用wait_update()再次开启事件循环以执行Task。

17.5、TqChan:

TqChan定义在模块channel中,TqChan是异步队列asyncio.Queue的子类,TqSdk中大量用到了TqChan,TqSdk各组件间通过TqChan传递数据,一个组件向TqChan放入数据,另一个组件从TqChan里取出数据。

TqChan里定义了发送数据和接收数据的函数,因此用TqChan可以连接收、发组件,使组件间建立通信。

数据在组件间单向传递,由TqChan连接的组件构成了生产者、消费者模型。

我们看下TqChan的主要代码,代码各部分的含义注释的很清楚了:

class TqChan(asyncio.Queue):

"""用于协程间通讯的channel"""

_chan_id: int = 0

def __init__(self, api: 'TqApi', last_only: bool = False, logger = None,

chan_name: str = "") -> None:

"""创建channel实例Args:api (tqsdk.api.TqApi): TqApi 实例last_only (bool): 为True时只存储最后一个发送到channel的对象"""

TqChan._chan_id += 1

asyncio.Queue.__init__(self, loop=api._loop)

self._last_only = last_only

self._closed = False

# 关闭函数

async def close(self) -> None:

"""关闭channel,并向队列放入一个None值关闭后send将不起作用,因此recv在收完剩余数据后会立即返回None"""

if not self._closed:

self._closed = True

await asyncio.Queue.put(self, None)

#发送数据的函数

async def send(self, item: Any) -> None:

"""异步发送数据到channel中Args:item (any): 待发送的对象"""

if not self._closed:

if self._last_only: #只存储最新数据

while not self.empty():

asyncio.Queue.get_nowait(self)#取出全部历史数据再放入最新数据

await asyncio.Queue.put(self, item) #放入新数据,如果队列已满则阻塞等待

#发送数据的函数

def send_nowait(self, item: Any) -> None:

"""类似send函数,但是立即发送数据到channel中Args:item (any): 待发送的对象Raises:asyncio.QueueFull: 如果channel已满则会抛出 asyncio.QueueFull"""

if not self._closed:

if self._last_only:

while not self.empty():

asyncio.Queue.get_nowait(self)

asyncio.Queue.put_nowait(self, item) #立即向队列中放入数据

#接收数据的函数

async def recv(self) -> Any:

"""异步接收channel中的数据,如果channel中没有数据则一直等待Returns:any: 收到的数据,如果channel已被关闭则会立即收到None"""

if self._closed and self.empty(): #channel已关闭且已空

return None #返回None值

item = await asyncio.Queue.get(self) #取出channel里的数据,若无则阻塞等待

return item #返回取到的值

#接收数据的函数

def recv_nowait(self) -> Any:

"""类似recv,但是立即接收channel中的数据Returns:any: 收到的数据,如果channel已被关闭则会立即收到NoneRaises:asyncio.QueueFull: 如果channel中没有数据则会抛出 asyncio.QueueEmpty"""

if self._closed and self.empty(): #channel已关闭且已空

return None #返回None值

item = asyncio.Queue.get_nowait(self) #立即取出队列中的数据

return item #返回取出的数据

#接收最新数据的函数

def recv_latest(self, latest: Any) -> Any:

"""尝试立即接收channel中的最后一个数据Args:latest (any): 如果当前channel中没有数据或已关闭则返回该对象Returns:any: channel中的最后一个数据"""

while (self._closed and self.qsize() > 1) or (not self._closed and not self.empty()):

latest = asyncio.Queue.get_nowait(self)

return latest

#重写的__iter__方法,返回自身的异步迭代器

def __aiter__(self):

return self

#重写的__next__方法,返回异步迭代器下一个元素

async def __anext__(self):

value = await asyncio.Queue.get(self) #如果队列无元素,则阻塞直到有数据

if self._closed and self.empty():

raise StopAsyncIteration

return value

#重写的 __enter__方法,使channel可用在上下文管理语句async with中开启自身

async def __aenter__(self):

return self

##重写的__exit__方法,使channel可用在上下文管理语句async with中以退出自身

async def __aexit__(self, exc_type, exc, tb):

await self.close()

TqSdk中大量用到了TqChan在组件间收发数据,当事件循环被stop停止时,收数据一端执行item = await asyncio.Queue.get(self)时会挂起自身并交出控制权给事件循环的调用方,调用方再次启动事件循环时,事件循环继续轮询执行task。

17.6、register_update_notify():

register_update_notify()函数用于把业务数据注册到TqChan,实际上是把TqChan添加到业务对象的_listener属性里,当业务对象更新时会向TqChan添加一个True值,当TqChan为空时则等待业务对象更新。

我们先看一个以TqChan实例在协程中接收数据更新的例子:

from tqsdk import TqApi, TqAuth

api = TqApi(auth=TqAuth("信易账号", "密码"))

quote = api.get_quote("CFFEX.T2103") #订阅盘口行情

#定义一个协程

async def func():

from tqsdk.channel import TqChan #导入TqChan

chan = TqChan(api,last_only=True) #实例化TqChan,接收数据更新

quote["_listener"].add(chan) #把chan添加进quote的_listener属性

async for p in chan: #若quote有更新会执行循环体,如无更新则阻塞等待

print(p)

print(quote.datetime,quote.last_price) #打印盘口时间和最新价

break

await chan.close() #chan使用完关闭

return quote.instrument_name,quote.instrument_name #返回值

task=api.create_task(func()) #把协程打包成Task

while True:

api.wait_update()

if task.done(): #Task结束后获取协程返回值

print(task.result())

'''输出结果为:True2021-02-05 13:11:02.300000 97.3('债十2103', 1615532400.0)('债十2103', 1615532400.0)('债十2103', 1615532400.0)'''

register_update_notify()函数是对上述代码的简化,再用with语句管理上下文,例如:

from tqsdk import TqApi, TqAuth

api = TqApi(auth=TqAuth("信易账号", "密码"))

quote = api.get_quote("CFFEX.T2103") #订阅盘口行情

#定义一个协程

async def func():

async with api.register_update_notify(quote) as chan: #把quote注册到chan

async for p in chan: #若quote有更新会执行循环体,如无更新则阻塞等待

print(p)

print(quote.datetime,quote.last_price) #打印盘口时间和最新价

break

return quote.instrument_name,quote.instrument_name #返回值

task=api.create_task(func()) #把协程打包成Task

while True:

api.wait_update()

if task.done(): #Task结束后获取协程返回值

print(task.result())

'''输出结果为:True2021-02-05 13:48:53.800000 97.26('债十2103', '债十2103')('债十2103', '债十2103')('债十2103', '债十2103')'''

若async for p in chan循环不用break跳出,则会随quote更新循环执行,若quote无更新,比如停盘,异步迭代函数__anext__()里将阻塞,循环也跟着阻塞,等待再次收到quote更新。

17.7、wait_update():

wait_update用于等待业务更新,我们结合其代码分析下其执行机制:

def wait_update(self, deadline: Optional[float] = None) -> None:

if self._loop.is_running(): #wait_update被放入了事件循环里

raise Exception("不能在协程中调用 wait_update, 如需在协程中等待业务数据更新请使用 register_update_notify")

elif asyncio._get_running_loop():

raise Exception(

"TqSdk 使用了 python3 的原生协程和异步通讯库 asyncio,您所使用的 IDE 不支持 asyncio, 请使用 pycharm 或其它支持 asyncio 的 IDE")

self._wait_timeout = False #是否触发超时

# 先尝试执行各个task,再请求下个业务数据

self._run_until_idle()

# 总会发送 serial_extra_array 数据,由 TqWebHelper 处理

for _, serial in self._serials.items():

self._process_serial_extra_array(serial)

# 上句发送数据创建的有task,先尝试执行各个task,再请求下个业务数据

self._run_until_idle()

#非api副本,且已收到了上次返回的更新数据,再次请求新数据

if not self._is_slave and self._diffs:

self._send_chan.send_nowait({

"aid": "peek_message"

})

# 先收取数据再判断 deadline, 避免当超时立即触发时无法接收数据

update_task = self.create_task(self._fetch_msg()) #从服务端收取数据

#超时后重置self._wait_timeout为True,并停止事件循环

deadline_handle = None if deadline is None else self._loop.call_later(max(0, deadline – time.time()),

self._set_wait_timeout)

try: #未触发超时且无待处理的新数据,启动事件循环执行全部Task

while not self._wait_timeout and not self._pending_diffs:

self._run_once() #未设置超时也未收到新数据,将在此阻塞

return len(self._pending_diffs) != 0 #True:还有待处理数据,False:数据已处理完或超时未收到数据

finally: #处理待处理的数据,将数据合并到self._data

self._diffs = self._pending_diffs

self._pending_diffs = []

# 清空K线更新范围,避免在 wait_update 未更新K线时仍通过 is_changing 的判断

self._klines_update_range = {}

for d in self._diffs:

# 判断账户类别, 对股票和期货的 trade 数据分别进行处理

if "trade" in d:

for k, v in d.get('trade').items():

prototype = self._security_prototype if self._account._is_stock_type(k) else self._prototype

_merge_diff(self._data, {'trade': {k: v} }, prototype, False)

# 非交易数据均按照期货处理逻辑

diff_without_trade = {k : v for k, v in d.items() if k != "trade"}

if diff_without_trade:

_merge_diff(self._data, diff_without_trade, self._prototype, False)

for query_id, query_result in d.get("symbols", {}).items():

if query_id.startswith("PYSDK_quote") and query_result.get("error", None) is None:

quotes = _symbols_to_quotes(query_result)

_merge_diff(self._data, {"quotes": quotes}, self._prototype, False)

for _, serial in self._serials.items():

# K线df的更新与原始数据、left_id、right_id、more_data、last_id相关,其中任何一个发生改变都应重新计算df

# 注:订阅某K线后再订阅合约代码、周期相同但长度更短的K线时, 服务器不会再发送已有数据到客户端,即chart发生改变但内存中原始数据未改变。

# 检测到K线数据或chart的任何字段发生改变则更新serial的数据

if self.is_changing(serial["df"]) or self.is_changing(serial["chart"]):

if len(serial["root"]) == 1: # 订阅单个合约

self._update_serial_single(serial)

else: # 订阅多个合约

self._update_serial_multi(serial)

if deadline_handle: #取消超时回调

deadline_handle.cancel()

update_task.cancel() #取消收取新业务task

# 最后处理 raise Exception,保证不会因为抛错导致后面的代码没有执行

for d in self._diffs:

for query_id, query_result in d.get("symbols", {}).items():

if query_result.get("error", None):

raise Exception(f"查询合约服务报错 {query_result['error']}")

从wait_update的代码可知,wait_update的工作可分成四大块:1、先执行事件循环中存在的task

2、向服务端请求新数据

3、事件循环轮询执行未完成的task,若未设置超时也未收到新数据,将阻塞

4、收到了新数据,停止事件循环,用新数据更新_data,等待下次调用wait_update

wait_update其实是事件循环的调用方(执行self._loop.run_forever()),因此,wait_update的核心工作是开启事件循环。

开启事件循环的函数:

def _run_once(self):

"""执行 ioloop 直到 ioloop.stop 被调用"""

if not self._exceptions:

self._loop.run_forever()

if self._exceptions:

raise self._exceptions.pop(0)

def _run_until_idle(self):

"""执行 ioloop 直到没有待执行任务"""

while self._check_rev != self._event_rev:

#用来追踪是否有任务未完成并等待执行

check_handle = self._loop.call_soon(self._check_event, self._event_rev + 1)

try:

self._run_once()

finally:

check_handle.cancel()

函数_run_until_idle中调用_run_once,核心工作就是执行self._loop.run_forever()来开启事件循环。

事件循环里有各种task,比如交易策略、业务处理任务等,事件循环会轮询执行各个task,当task执行结束或收到新数据时,事件循环会被stop停止,事件循环被停止才可以将控制权交给调用方wait_update(比如task执行await asyncio.Queue.get()时让出控制权),执行wait_update后续代码,用新数据更新业务字段,wait_update执行完后之后,主程序会再次调用wait_update再次开启事件循环(在主程序while循环中),事件循环接着上次停止的上下文状态继续执行未完成的task。

即:task执行结束或收到新数据时,会停止事件循环并让出控制权给调用方wait_update使wait_update执行结束。主程序调用wait_update时则开启事件循环。

wait_update是事件循环的调用方,因此,wait_update不能用在事件循环中,函数代码开头部分会先检查wait_update是否被放入了事件循环。

事件循环每次只运行一个task,task执行结束或收到业务更新使事件循环停止,才能让出控制权给wait_update使后续任务得到执行,否则事件循环保持运行,主程序将阻塞在wait_update,停止后的事件循环还需要重新开启以恢复执行未完成的task及继续收取新数据,因此,应在主程序中将wait_update放在while True循环中循环调用,即可随着业务更新对事件循环启、停操作。

数据流通过队列TqChan传递,队列中有数据才能get出,否则将阻塞,因此task阻塞实际发生在get阻塞时,若事先没有订阅数据或已停盘,队列无法get出数据,事件循环也没有被stop而保持运行等待get,则事件循环无法让出控制权,主程序将阻塞在wait_update。

若是设置了超时,则超时后会停止事件循环,超时语句为:

self._loop.call_later(max(0, deadline – time.time()),self._set_wait_timeout)

loop.call_later(delay, callback, *args, context=None):安排 callback 在给定的 delay 秒(可以是 int 或者 float)后被调用。

因此事件循环超时后执行了函数self._set_wait_timeout,代码为:

def _set_wait_timeout(self):

self._wait_timeout = True #重置超时变量为True

self._loop.stop() #停止事件循环

即超时后也会主动停止事件循环以让出控制权给wait_update。