1. 추세 판단 분류기
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
DATA_PATH = '/aiffel/aiffel/fnguide/data/'
modify_data = pd.read_csv(os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick.csv'), index_col=0, parse_dates=True)
modify_data.loc['2017-11-01':'2017-12-31','close'].plot()
2. Data Labeling
- 4가지 방법
- Price Change Direction
- Using Moving Average
- Local Min-Max
- Trend Scanning
현재 가격 및 특정 영업일 이전 가격 차이로 라벨링
간단하지만, 가격 비교 기준에 따라 변동폭이 커질 수 있기 때문에 주의 필요
모멘텀 시그널 생성
window = 10
momentum_signal = np.sign(np.sign(modify_data['close'] - modify_data['close'].shift(window)) + 1)
s_momentum_signal = pd.Series(momentum_signal, index=modify_data.index)
sub_data = modify_data.loc['2017-11-21', 'close']
c_sig = s_momentum_signal.loc['2017-11-21']
c_sig['color'] = np.where(c_sig == 1, 'red', 'blue')
plt.figure(figsize=(10,5))
plt.scatter(sub_data.index, sub_data, c=c_sig['color'])
이동평균 이용
현재 주가가 특정 이동평균선 위 or 아래 위치 여부에 따라 라벨링
이 역시 간단하지만, 이동평균을 며칠로 할 것인지에 대한 결정 필요
이동 평균으로 인해 lag(지연) 발생
.rolling(window).mean()) + 1
momentum_signal = np.sign(np.sign(modify_data['close'] - modify_data['close'].rolling(window).mean()) + 1)
s_momentum_signal = pd.Series(momentum_signal, index=modify_data.index)
sub_data = modify_data.loc['2017-11-21', 'close']
c_sig = s_momentum_signal.loc['2017-11-21']
c_sig['color'] = np.where(c_sig == 1, 'red', 'blue')
plt.figure(figsize=(10,5))
plt.scatter(sub_data.index, sub_data, c=c_sig['color'])
Wait 계수
를 통해 조절슈도 코드
𝐼𝑛𝑖𝑡𝑖𝑎𝑙𝑖𝑧𝑒 𝑣𝑎𝑟𝑖𝑎𝑏𝑙𝑒𝑠𝑓𝑜𝑟 𝑖 𝑖𝑛 𝑑𝑎𝑡𝑎𝑠𝑒𝑡: 𝑖𝑓 𝑐𝑢𝑟𝑟𝑒𝑛𝑡𝑝𝑟𝑖𝑐𝑒<𝑝𝑟𝑒𝑣𝑖𝑜𝑢𝑠𝑚𝑖𝑛𝑝𝑟𝑖𝑐𝑒 𝑚𝑖𝑛𝑝𝑟𝑖𝑐𝑒←𝑐𝑢𝑟𝑟𝑒𝑛𝑡𝑝𝑟𝑖𝑐𝑒 𝑝𝑎𝑠𝑠𝑖𝑛𝑔 𝑡ℎ𝑟𝑜𝑢𝑔ℎ 𝑡ℎ𝑒 𝐹𝑎𝑙𝑙𝑖𝑛𝑔 𝑇𝑟𝑒𝑛𝑑 𝑎𝑐𝑐𝑢𝑚𝑢𝑙𝑎𝑡𝑒𝑠𝑚𝑖𝑛𝑝𝑟𝑖𝑐𝑒 𝑖𝑓 𝑐𝑢𝑟𝑟𝑒𝑛𝑡𝑝𝑟𝑖𝑐𝑒>𝑝𝑟𝑒𝑣𝑖𝑜𝑢𝑠𝑚𝑎𝑥𝑝𝑟𝑖𝑐𝑒 𝑚𝑎𝑥𝑝𝑟𝑖𝑐𝑒←𝑐𝑢𝑟𝑟𝑒𝑛𝑡𝑝𝑟𝑖𝑐𝑒 𝑝𝑎𝑠𝑠𝑖𝑛𝑔 𝑡ℎ𝑟𝑜𝑢𝑔ℎ 𝑡ℎ𝑒 𝑅𝑖𝑠𝑖𝑛𝑔 𝑇𝑟𝑒𝑛𝑑 𝑎𝑐𝑐𝑢𝑚𝑢𝑙𝑎𝑡𝑒𝑠𝑚𝑎𝑥𝑝𝑟𝑖𝑐𝑒 𝑖𝑓 𝑒𝑛𝑑𝑠 𝑡ℎ𝑒 𝐹𝑎𝑙𝑙𝑖𝑛𝑔 𝑇𝑟𝑒𝑛𝑑 𝑚𝑎𝑥𝑝𝑟𝑖𝑐𝑒←𝑚𝑖𝑛𝑝𝑟𝑖𝑐𝑒 𝑖𝑓 𝑒𝑛𝑑𝑠 𝑡ℎ𝑒 𝑅𝑖𝑠𝑖𝑛𝑔 𝑇𝑟𝑒𝑛𝑑 𝑚𝑖𝑛𝑝𝑟𝑖𝑐𝑒←𝑚𝑎𝑥𝑝𝑟𝑖𝑐𝑒
get_local_min_max
함수 정의def get_local_min_max(close, wait=3):
min_value = close.iloc[0]
max_value = close.iloc[0]
n_cnt_min, n_cnt_max = 0, 0
mins, maxes = [], []
min_idxes, max_idxes = [], []
b_min_update, b_max_update = False, False
for idx, val in zip(close.index[1:], close.values[1:]):
if val < min_value:
min_value = val
mins.append(min_value)
min_idxes.append(idx)
n_cnt_min = 0
b_min_update = True
if val > max_value:
max_value = val
maxes.append(max_value)
max_idxes.append(idx)
n_cnt_max = 0
b_max_update = True
if not b_max_update:
b_min_update = False
n_cnt_min += 1
if n_cnt_min >= wait:
max_value = min_value
n_cnt_min = 0
if not b_min_update:
b_max_update = False
n_cnt_max += 1
if n_cnt_max >= wait:
min_value = max_value
n_cnt_max = 0
return pd.DataFrame.from_dict({'min_time': min_idxes, 'local_min': mins}), pd.DataFrame.from_dict({'max_time': max_idxes, 'local_max': maxes})
mins, maxes = get_local_min_max(sub_data, wait=3)
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(sub_data, 'c')
ax.scatter(mins.min_time, mins.local_min, c='blue')
ax.scatter(maxes.max_time, maxes.local_max, c='red')
ax.set_ylim([sub_data.min() * 0.99, sub_data.max() * 1.01])
st_time, ed_time = '2017-11-21 09:00:00', '2017-11-21 16:00:00'
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(sub_data.loc[st_time:ed_time], 'c')
ax.scatter(mins.set_index('min_time', drop=False).min_time.loc[st_time:ed_time], mins.set_index('min_time').local_min.loc[st_time:ed_time], c='blue')
ax.scatter(maxes.set_index('max_time', drop=False).max_time.loc[st_time:ed_time], maxes.set_index('max_time').local_max.loc[st_time:ed_time], c='red')
ax.set_ylim([sub_data.min() * 0.99, sub_data.max() * 1.01])
mins.shape[0], maxes.shape[0]
maxTvalue
부호를 통해 라벨링Tvalue?
코드 참고 : Git MLFinLab
def t_val_lin_r(close):
import statsmodels.api as sml
# t-value from a linear trend
x = np.ones((close.shape[0], 2))
x[:, 1] = np.arange(close.shape[0])
ols = sml.OLS(close, x).fit()
return ols.tvalues[1]
look_forward_window
: 현재 시점에서 특정 미래 시점까지의 관찰할 윈도우 크기min_sample_length
: 샘플 데이터 최소 길이step
: 슬라이딩 윈도우 이동 시의 간격t1_array
: 특정 기준 시점 결과값 저장t_values_array
: 각 윈도우의 t-value 저장look_forward_window = 60
min_sample_length = 5
step = 1
t1_array = []
t_values_array = []
현재 시점(ind)
부터 미래 look_forward_window
까지의 샘플 데이터 추출molecule = modify_data['2017-11-01':'2017-11-30'].index
label = pd.DataFrame(index=molecule, columns=['t1', 't_val', 'bin'])
tmp_out = []
for ind in tqdm(molecule):
subset = modify_data.loc[ind:, 'close'].iloc[:look_forward_window] # sample 추출
if look_forward_window > subset.shape[0]:
continue
tmp_subset = pd.Series(index=subset.index[min_sample_length-1:subset.shape[0]-1])
tval = []
for forward_window in np.arange(min_sample_length, subset.shape[0]):
df = subset.iloc[:forward_window]
tval.append(t_val_lin_r(df.values))
tmp_subset.loc[tmp_subset.index] = np.array(tval)
idx_max = tmp_subset.replace([-np.inf, np.inf, np.nan], 0).abs().idxmax()
tmp_t_val = tmp_subset[idx_max]
tmp_out.append([tmp_subset.index[-1], tmp_t_val, np.sign(tmp_t_val)])
label.loc[molecule] = np.array(tmp_out) # prevent leakage
label['t1'] = pd.to_datetime(label['t1'])
label['bin'] = pd.to_numeric(label['bin'], downcast='signed')
sub_data = modify_data.loc['2017-11-21', 'close']
c_sig = label['bin'].loc['2017-11-21']
c_sig['color'] = np.where(c_sig == 1, 'red', 'blue')
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.scatter(sub_data.index, sub_data.values,
c=c_sig['color'])
information gain
, correlation coefficient
, ...Recursive Feature Elimination
, Sequential Feature Selection
, ...Lasso
, Ridge
, Elastic Net
, ...
- Wrapper Method 사용법
- 주가 모멘텀 포착을 위한 사용 가능한 Feature 분석 및 선정 방식
- SHAP로 Feature importance 확인
!pip install ta==0.9.0
!pip install shap
import datetime
import sys
import os
import re
import io
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ta
import sys
sys.path.append('/aiffel/aiffel/fnguide/data/')
from libs.feature_importance import importance as imp
from sklearn.feature_selection import SequentialFeatureSelector, RFECV
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier, VotingClassifier
DATA_PATH = '/aiffel/aiffel/fnguide/data/'
anno_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick_label.pkl')
target_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick.csv')
df_modify_data = pd.read_csv(target_file_name, index_col=0, parse_dates=True)
df_label_data = pd.read_pickle(anno_file_name)
df_sub_modify_data = df_modify_data.loc[df_label_data.index]
# 1000개만 가져오기
df_sub_modify_data = df_sub_modify_data.iloc[:1000]
df_sub_modify_data
df_label_data
df_label_data.value_counts()
Technical Index
구분 | 설명 | 지표 종류 |
---|---|---|
추세 지표 | 주가 방향성 포착 지표 | MACD, ADX, TRIX, DPO, AROON... |
변동성 지표 | 일정 기간 내 주가 변동폭 측정을 위한 지표 | 표준편차, ATR, UI... |
거래량 지표 | 거래량 유입/유출 등 변화 측정 | CMF, MFI, FI, SMA EM, VPT... |
모멘텀 지표 | 현재 주가 상대적 강도 측정 | RSI, WR... |
mt = 1
fillna = False
df_ = df_sub_modify_data.copy()
open, high, low, close, volume = 'open', 'high', 'low', 'close', 'volume'
cols = [open, high, low, close, volume]
## Volume Index
# Chaikin Money Flow
df_["volume_cmf"] = ta.volume.ChaikinMoneyFlowIndicator(
high=df_[high], low=df_[low], close=df_[close], volume=df_[volume], window=20*mt, fillna=fillna
).chaikin_money_flow()
# Force Index
df_["volume_fi"] = ta.volume.ForceIndexIndicator(
close=df_[close], volume=df_[volume], window=15*mt, fillna=fillna
).force_index()
# Money Flow Indicator
df_["volume_mfi"] = ta.volume.MFIIndicator(
high=df_[high],
low=df_[low],
close=df_[close],
volume=df_[volume],
window=15*mt,
fillna=fillna,
).money_flow_index()
# Ease of Movement
df_["volume_sma_em"] = ta.volume.EaseOfMovementIndicator(
high=df_[high], low=df_[low], volume=df_[volume], window=15*mt, fillna=fillna
).sma_ease_of_movement()
# Volume Price Trend
df_["volume_vpt"] = ta.volume.VolumePriceTrendIndicator(
close=df_[close], volume=df_[volume], fillna=fillna
).volume_price_trend()
## volatility index
# Average True Range
df_["volatility_atr"] = ta.volatility.AverageTrueRange(
close=df_[close], high=df_[high], low=df_[low], window=10*mt, fillna=fillna
).average_true_range()
# Ulcer Index
df_["volatility_ui"] = ta.volatility.UlcerIndex(
close=df_[close], window=15*mt, fillna=fillna
).ulcer_index()
## trend index
# MACD
df_["trend_macd_diff"] = ta.trend.MACD(
close=df_[close], window_slow=25*mt, window_fast=10*mt, window_sign=9, fillna=fillna
).macd_diff()
# Average Directional Movement Index (ADX)
df_["trend_adx"] = ta.trend.ADXIndicator(
high=df_[high], low=df_[low], close=df_[close], window=15*mt, fillna=fillna
).adx()
# TRIX Indicator
df_["trend_trix"] = ta.trend.TRIXIndicator(
close=df_[close], window=15*mt, fillna=fillna
).trix()
# Mass Index
df_["trend_mass_index"] = ta.trend.MassIndex(
high=df_[high], low=df_[low], window_fast=10*mt, window_slow=25*mt, fillna=fillna
).mass_index()
# DPO Indicator
df_["trend_dpo"] = ta.trend.DPOIndicator(
close=df_[close], window=20*mt, fillna=fillna
).dpo()
# Aroon Indicator
df_["trend_aroon_ind"] = ta.trend.AroonIndicator(close=df_[close], window=20, fillna=fillna).aroon_indicator()
## momentum index
# Relative Strength Index (RSI)
df_["momentum_rsi"] = ta.momentum.RSIIndicator(close=df_[close], window=15*mt, fillna=fillna).rsi()
# Williams R Indicator
df_["momentum_wr"] = ta.momentum.WilliamsRIndicator(
high=df_[high], low=df_[low], close=df_[close], lbp=15*mt, fillna=fillna
).williams_r()
# result
df_
수익률, 변화율 지표
종가
, 직전 n-영업일 가격
비율금일 거래량
, 직전 n-영업일 거래량
비율변동성 지표
종가
의 n-영업일 이동표준편차거래량
의 n-영업일 이동표준편차windows_mom = [5, 10, 20]
windows_std = [30]
for i in windows_mom:
df_[f'vol_change_{i}'] = df_.volume.pct_change(i).round(6)
df_[f'ret_{i}'] = df_.close.pct_change(i).round(6)
for i in windows_std:
df_[f'std_{i}'] = df_.close.rolling(i).std()
df_[f'vol_std_{i}'] = df_.volume.rolling(i).std()
# result
df_
df_tmp_data = df_.join(df_label_data).dropna()
X, y = df_tmp_data.iloc[:, 5:-1], df_tmp_data.iloc[:, -1]
sc = StandardScaler()
X_sc = sc.fit_transform(X)
X_sc = pd.DataFrame(X_sc, index=X.index, columns=X.columns)
X_sc.head()
rfc = RandomForestClassifier(class_weight='balanced')
rfc.fit(X_sc, y)
Feature Selection methods
- Feature Selection
- MDI
- MDA
- Sequential Feature Selection
- RFE CV
- SFS
- Shapley Additive explanations
- SHAP
feat_imp = imp.mean_decrease_impurity(rfc, X.columns)
feat_imp
svc_rbf = SVC(kernel='rbf', probability=True)
cv = KFold(n_splits=5)
feat_imp_mda = imp.mean_decrease_accuracy(svc_rbf, X_sc, y, cv_gen=cv)
plot_feature_importance
함수def plot_feature_importance(importance_df, save_fig=False, output_path=None):
# Plot mean imp bars with std
plt.figure(figsize=(10, importance_df.shape[0] / 5))
importance_df.sort_values('mean', ascending=True, inplace=True)
importance_df['mean'].plot(kind='barh', color='b', alpha=0.25, xerr=importance_df['std'], error_kw={'ecolor': 'r'})
if save_fig:
plt.savefig(output_path)
else:
plt.show()
plot_feature_importance(feat_imp)
plot_feature_importance(feat_imp_mda)
svc_rbf = SVC(kernel='linear', probability=True)
rfe_cv = RFECV(svc_rbf, cv=cv)
rfe_fitted = rfe_cv.fit(X_sc, y)
rfe_df = pd.DataFrame([rfe_fitted.support_, rfe_fitted.ranking_], columns=X_sc.columns).T.rename(columns={0:"Optimal_Features", 1:"Ranking"})
rfe_df
rfe_df[rfe_df["Optimal_Features"]==True]
n = 2
sfs_forward = SequentialFeatureSelector(svc_rbf, n_features_to_select=n, direction='forward')
sfs_fitted = sfs_forward.fit(X_sc, y)
sfs_rank = sfs_fitted.get_support()
sfs_df = pd.DataFrame(sfs_rank, index=X_sc.columns, columns={"Optimal_Features"})
sfs_df [sfs_df ["Optimal_Features"]==True].index
Shapley Value?
- 다수의 플레이어에게 공정한 상, 벌을 배분하기 위한 솔루션
- 플레이어별 공헌도는 상이하나 상호간 이득 및 손실을 주고받는 상황에 적용
- 최대 성과를 위한 의사결정 -> 연쇄적 수행 -> 이를 통해 얻은 게임 결과에서 그 플레이어에게 기대할 수 있는 평균 한계 공헌도를 측정 -> 이것을 Shapley Value라고 함
import shap
explainer = shap.TreeExplainer(rfc)
shap_value = explainer.shap_values(X_sc)
shap.summary_plot(shap_value, X_sc)
output_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_feature_labels.pkl')
df_tmp_data.to_pickle(output_file_name)
import datetime
import sys
import os
import re
import io
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ta
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold
from sklearn.ensemble import RandomForestClassifier, BaggingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, f1_score, roc_auc_score, roc_curve
sys.path.append('/aiffel/aiffel/fnguide/data/')
from libs.mlutil.pkfold import PKFold
DATA_PATH = '/aiffel/aiffel/fnguide/data/'
data_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_feature_labels.pkl')
df_data = pd.read_pickle(data_file_name)
df_data['t_value'] = df_data['t_value'].apply(lambda x: x if x == 1 else 0)
df_data['t_value'].value_counts()
train_ratio, test_ratio = 0.7, 0.2
n_train = int(np.round(len(df_data) * train_ratio))
n_test = int(np.round(len(df_data) * test_ratio))
X, y = df_data.iloc[:, 5:-1], df_data.iloc[:, -1]
sc = StandardScaler<()
X_sc = sc.fit_transform(X)
train_x, test_x, train_y, test_y = X_sc[:n_train, :], X_sc[-n_test:, :], y.iloc[:n_train], y.iloc[-n_test:]
train_x = pd.DataFrame(train_x, index=train_y.index, columns=X.columns)
train_y = pd.Series(train_y, index=train_y.index)
test_x = pd.DataFrame(test_x, index=test_y.index, columns=X.columns)
test_y = pd.Series(test_y, index=test_y.index)
train_x = train_x[:1000]
train_y = train_y[:1000]
기존 K-fold CV 단점
- 관측치가 IID 상태에서 추출되었다고 보기 어려움
- 테스트 집합도 모델 개발 과정에서 반복 사용되었을 가능성이 매우 높음 ➡️ 여러 편향이 반영되었을 수 있음
- 이로 인해, 학습 데이터와 검증 데이터의 유사 패턴이 점점 많이 담기게 되어 Leakage(정보의 누수) 정도가 심해지는 경향을 보임(분류기 성능의 객관성이 크게 떨어지는 것)
n_cv = 4
t1 = pd.Series(train_y.index.values, index=train_y.index)
cv = PKFold(n_cv, t1, 0)
검증 오차
과대적합 및 과소적합
이러한 오차 분산을 줄이기 위해 Bagging을 쓰는 것
하이퍼파라미터 튜닝 수행(18분 소요됨)
bc_params = {'n_estimators': [5, 10, 20],
'max_features': [0.5, 0.7],
'base_estimator__max_depth': [3,5,10,20],
'base_estimator__max_features': [None, 'auto'],
'base_estimator__min_samples_leaf': [3, 5, 10],
'bootstrap_features': [False, True]
}
rfc = RandomForestClassifier(class_weight='balanced')
bag_rfc = BaggingClassifier(rfc)
gs_rfc = GridSearchCV(bag_rfc, bc_params, cv=cv, n_jobs=-1, verbose=1)
gs_rfc.fit(train_x, train_y)
gs_rfc_best = gs_rfc.best_estimator_
gs_rfc_best.fit(train_x, train_y)
pred_y = gs_rfc_best.predict(test_x)
prob_y = gs_rfc_best.predict_proba(test_x)
confusion = confusion_matrix(test_y, pred_y)
accuracy = accuracy_score(test_y, pred_y)
precision = precision_score(test_y, pred_y)
recall = recall_score(test_y, pred_y)
print('================= confusion matrix ====================')
print(confusion)
print('=======================================================')
print(f'정확도:{accuracy}, 정밀도:{precision}, 재현율:{recall}')
fpr, tpr, thresholds = roc_curve(test_y, pred_y)
auc = roc_auc_score(test_y, pred_y)
plt.plot(fpr, tpr, linewidth=2)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('fpr')
plt.ylabel('tpr')
print(f'auc:{auc}')