# -*- coding: utf-8 -*- """ Created on Sat Jul 16 13:27:31 2022 @author : fangfang666 """ import math import pandas as pd import numpy as np import matplotlib.dates as mdates import matplotlib.pyplot as plt from datetime import datetime from WindPy import w w.start() from sklearn.linear_model import LinearRegression startDate = datetime(2019,1,1) endDate = datetime(2022,7,15) startDate_str = startDate.strftime("%Y%m%d") endDate_str = endDate.strftime("%Y%m%d") N=200 T0=10 data = w.wsd("CN00.SG,IH00.CFE,USDCNY.EX", "close", "%s"%startDate, "%s"%endDate, "") df = pd.DataFrame(data.Data).T df.columns = data.Codes df.insert(0,"Times",data.Times) df_train = df.iloc[:(N+T0)] lineModel = LinearRegression() lineModel.fit(df_train['CN00.SG']*df_train['USDCNY.EX'], df_train[['IH00.CFE']]*300) y = (df['IH00.CFE']*300).to_numpy() x = (df['CN00.SG']*df['USDCNY.EX']).to_numpy() wx = np.rint(lineModel.coef_[0]) wy = 1 e= wx*x -wy*y # 止盈线倍数u u=1.5 # 止损线倍数d d=1 # 均线天数N # N=50 # 统计输赢天数 T # T0=50 # 起始资金 start_money=1.8e+06 # 区间宽度w w=int(1/0.1) # 最小数据比 θ least_percentage=0.15 #根据历史上N+T+1的数据,更新偏离倍数和赢输的统计结果 def update_statistics(prices, pvs, winlose): past_prices = prices[0:N] current_price = prices[N] future_prices = prices[(N+1):len(prices)] #N天前的偏离倍数sigma difference_times_sigma = math.floor(((current_price - past_prices.mean())/past_prices.std(ddof=1)) / 0.1) #记录N天的sigma及输赢情况 if abs(difference_times_sigma)>=10 and abs(difference_times_sigma)<100 : result=win_or_lose(current_price, future_prices, past_prices.std(ddof=1), difference_times_sigma) pvs.append(difference_times_sigma) winlose.append(result) # 根据偏离倍数和赢输的统计结果,选择表现最好,且占比超过least_percentage的偏离倍数区间 def getRange(pvs, winlos): if len(pvs) == 0: return (0,0) df=pd.DataFrame({"pvs":np.abs(pvs), "winlose":winlose}) total=len(pvs) # 得到历史上偏离倍数的最大最小值 minp=np.abs(pvs).min() maxp=np.abs(pvs).max() best_ratio=0.0 lower, uper = minp, minp+w if (maxp-w) > minp: # 从最小值开始遍历 for sig in range(minp, maxp-w): # 计算每个区间内"win"的比例,如果占比高,且数量达到最小比例,则更新最优的lower和uper wls = df[(df["pvs"]>=sig) & (df["pvs"]<sig+w)]["winlose"] if len(wls)/total >= least_percentage: ratio=sum(wls=="win")/len(wls) if best_ratio<ratio: best_ratio=ratio lower,uper=sig,sig+w return lower, uper # 根据当天价格price,之后T天的价格my_list,方差和偏离倍数,计算是输、赢还是平均 def win_or_lose(price, my_list, sigma, diff): # 偏离倍数小于0,说明是买仓 if diff<0: # 计算平仓线和止损线 upper_bound = price + u * sigma lower_bound = price - d * sigma # 未来T天内,先达到平仓线则win,反之则lose for future_price in my_list: if future_price >= upper_bound: return 'win' if future_price <= lower_bound: return 'lose' # 偏离倍数小于0,说明是卖仓 elif diff>0: # 计算平仓线和止损线 upper_bound = price + d * sigma lower_bound = price - u * sigma # 未来T天内,先达到平仓线则win,反之则lose for future_price in my_list: if future_price >= upper_bound: return 'lose' if future_price <= lower_bound: return 'win' return 'even' # 记录N日方差 sigmas=np.zeros(len(e)) # 记录N日偏离倍数 daily_sigma=np.zeros(len(e)) # 记录N日均值 mas=np.zeros(len(e)) # 记录每日买开数量 buys=np.zeros(len(e)) # 记录每日卖平数量 exitbuys=np.zeros(len(e)) # 记录每日卖开数量 sells=np.zeros(len(e)) # 记录每日卖平数量 exitsells=np.zeros(len(e)) # 记录每日持仓数量 positions=np.zeros(len(e)) # 记录每日查询到的历史最优偏离倍数的上下线 lowerp=np.zeros(len(e)) upperp=np.zeros(len(e)) #用于记录偏离倍数和赢输的统计结果 pvs=[] winlose=[] for i in range(T0+N, len(e)): # 计算当前价格,N天前的均值,方差 current_price=e[i] past_prices=e[(i-N):i] mas[i]=past_prices.mean() sigmas[i]=past_prices.std(ddof=1) # 根据历史T0+N+1天的表现,更新偏离倍数和赢输的统计结果 update_statistics(e[(i-T0-N):(i+1)], pvs, winlose) # 根据更新的统计结果,找出最优的偏离倍数区间 rag=getRange(pvs,winlose) lowerp[i],upperp[i]=rag #计算当前的偏离倍数 times_sigma=(current_price - mas[i]) / sigmas[i] daily_sigma[i]=times_sigma #如果偏离倍数绝对值在最优区间内则下单 if abs(times_sigma*10)>=rag[0] and abs(times_sigma*10)<rag[1]: # 偏离倍数小于0,说明是买仓 if times_sigma<0: buys[i]=1 # 如果能拿到未来T天的价格,则一定会平仓 # 计算出平常的上下限,寻找平仓日期,找不到则为最后一天 if i+T0<len(e): sel=i+T0 for j in range(i, i+T0): upper_bound = current_price + u * sigmas[i] lower_bound = current_price - d * sigmas[i] if e[j]>=upper_bound or e[j]<=lower_bound: sel = j break exitbuys[sel]=exitbuys[sel]+buys[i] # 拿不到未来T天的价格,不能保证一定平仓 # 计算出平常的上下限,寻找平仓日期,找不到则继续持有 elif i+1 < len(e): sel = -1 for j in range(i,len(e)): upper_bound = current_price + u * sigmas[i] lower_bound = current_price - d * sigmas[i] if e[j]>=upper_bound or e[j]<=lower_bound: sel = j break if sel > 0: exitbuys[sel]=exitbuys[sel]+buys[i] # 偏离倍数大于0,说明是卖仓 elif times_sigma>0: sells[i]=1 # 如果能拿到未来T天的价格,则一定会平仓 # 计算出平常的上下限,寻找平仓日期,找不到则为最后一天 if i+T0<len(e): sel=i+T0 for j in range(i,i+T0): upper_bound = current_price + d * sigmas[i] lower_bound = current_price - u * sigmas[i] if e[j]>=upper_bound or e[j]<=lower_bound: sel = j break exitsells[sel]=exitsells[sel]+sells[i] # 拿不到未来T天的价格,不能保证一定平仓 # 计算出平常的上下限,寻找平仓日期,找不到则继续持有 elif i+1 < len(e): sel = -1 for j in range(i, len(e)): upper_bound = current_price + d * sigmas[i] lower_bound = current_price - u * sigmas[i] if e[j]>=upper_bound or e[j]<=lower_bound: sel = j break if sel > 0: exitsells[sel]=exitsells[sel]+sells[i] # 根据更新持仓 positions[i]=positions[i-1]+buys[i]+exitsells[i]-sells[i]-exitbuys[i] #计算价差走势 price=wx*x-wy*y #计算保证金 onecosts=wx*1000*7+wy*y*0.13 c1=price[(T0+N):len(price)] c2=buys[(T0+N):len(e)] c3=sells[(T0+N):len(e)] c4=exitbuys[(T0+N):len(e)] c5=exitsells[(T0+N):len(e)] c6=sigmas[(T0+N):len(e)] c7=mas[(T0+N):len(e)] c8=daily_sigma[(T0+N):len(e)] c9=onecosts[(T0+N):len(onecosts)] c10=np.cumsum(c2+c5-c3-c4)#buy+exitb-sell-exitsell c11=e[(T0+N):len(e)] c12=lowerp[(T0+N):len(e)] c13=upperp[(T0+N):len(e)] datas=pd.DataFrame({"price":c1,"buy":c2,"sell":c3,"exitb":c4,"exits":c5,"sigmasN":c6, "masN":c7,"p":c8,"onecosts":c9,"positions":c10,"e":c11, "lowerp":c12,"upperp":c13}) xs=df['Times'][(T0+N):len(price)] num,_=datas.shape moneys = np.zeros(num) moneys[0] = start_money # 计算每日盈亏及资金变化 for i in range(1, num): earn = (datas["price"][i] - datas["price"][i-1])*datas["positions"][i-1] moneys[i] = moneys[i-1] + earn # 计算回撤值 drawDownRatios=np.zeros(num) drawDownRatios[0]=(1-moneys[0]/start_money) for i in range(1,num): maxprev=max(max(moneys[0:i]),start_money) drawDownRatios[i]=max((1-moneys[i]/ maxprev),0) # 输出结果 last_money=moneys[num-1] annual_return=(last_money/start_money - 1)/num*250 print("start money:" + str(start_money)+ ", end money: "+str(last_money)) print("earn:" + str(last_money-start_money)) margins=abs(datas["onecosts"]*datas["positions"]) print("Max Margin:" + str(max(margins))) print("Max Risk:" + str(max(margins/moneys))) print("Annual return: "+str(annual_return)) print("Max Max Drawdown(Ratio): " + str(max(drawDownRatios))) plt.plot(xs, moneys) plt.plot(xs, -drawDownRatios) out = pd.ExcelWriter('Mean Reversion.xlsx') datas.insert(0,"Date",xs.to_list()) datas.to_excel(out) out.save()