diff --git a/lib/trade_models/quant_rs_transformer.py b/lib/trade_models/quant_rs_transformer.py new file mode 100644 index 0000000..97dd880 --- /dev/null +++ b/lib/trade_models/quant_rs_transformer.py @@ -0,0 +1,356 @@ +################################################## +# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021 # +################################################## +from __future__ import division +from __future__ import print_function + +import os, math, random +from collections import OrderedDict +import numpy as np +import pandas as pd +import copy +from functools import partial +from typing import Optional, Text + +from qlib.utils import ( + unpack_archive_with_buffer, + save_multiple_parts_file, + get_or_create_path, + drop_nan_by_y_index, +) +from qlib.log import get_module_logger + +import torch +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data as th_data + +from log_utils import AverageMeter +from utils import count_parameters + +from xlayers import super_core +from .transformers import DEFAULT_NET_CONFIG +from .transformers import get_transformer + + +from qlib.model.base import Model +from qlib.data.dataset import DatasetH +from qlib.data.dataset.handler import DataHandlerLP + + +DEFAULT_OPT_CONFIG = dict( + epochs=200, + lr=0.001, + batch_size=2000, + early_stop=20, + loss="mse", + optimizer="adam", + num_workers=4, +) + + +class QuantRandomSearchTransformer(Model): + """Transformer-based Quant Model""" + + def __init__( + self, + net_config=None, + opt_config=None, + rs_times=10, + metric="", + GPU=0, + seed=None, + **kwargs + ): + # Set logger. + self.logger = get_module_logger("RS-Transformer") + self.logger.info("QuantTransformer PyTorch version...") + + # set hyper-parameters. + self.net_config = net_config or DEFAULT_NET_CONFIG + self.opt_config = opt_config or DEFAULT_OPT_CONFIG + self.rs_times = rs_times + self.metric = metric + self.device = torch.device( + "cuda:{:}".format(GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu" + ) + self.seed = seed + + self.logger.info( + "Transformer parameters setting:" + "\nnet_config : {:}" + "\nopt_config : {:}" + "\nrs_times : {:}" + "\nmetric : {:}" + "\ndevice : {:}" + "\nseed : {:}".format( + self.net_config, + self.opt_config, + self.rs_times, + self.metric, + self.device, + self.seed, + ) + ) + + if self.seed is not None: + random.seed(self.seed) + np.random.seed(self.seed) + torch.manual_seed(self.seed) + if self.use_gpu: + torch.cuda.manual_seed(self.seed) + torch.cuda.manual_seed_all(self.seed) + + """ + # self.model = get_transformer(self.net_config) + # self.model.set_super_run_type(super_core.SuperRunMode.FullModel) + # self.logger.info("model: {:}".format(self.model)) + # self.logger.info("model size: {:.3f} MB".format(count_parameters(self.model))) + + if self.opt_config["optimizer"] == "adam": + self.train_optimizer = optim.Adam( + self.model.parameters(), lr=self.opt_config["lr"] + ) + elif self.opt_config["optimizer"] == "adam": + self.train_optimizer = optim.SGD( + self.model.parameters(), lr=self.opt_config["lr"] + ) + else: + raise NotImplementedError( + "optimizer {:} is not supported!".format(optimizer) + ) + self.model.to(self.device) + """ + + self.fitted = False + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + if self.opt_config["loss"] == "mse": + return F.mse_loss(pred[mask], label[mask]) + else: + raise ValueError("unknown loss `{:}`".format(self.loss)) + + def metric_fn(self, pred, label): + # the metric score : higher is better + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred, label) + else: + raise ValueError("unknown metric `{:}`".format(self.metric)) + + def train_or_test_epoch( + self, xloader, model, loss_fn, metric_fn, is_train, optimizer=None + ): + if is_train: + model.train() + else: + model.eval() + score_meter, loss_meter = AverageMeter(), AverageMeter() + for ibatch, (feats, labels) in enumerate(xloader): + feats = feats.to(self.device, non_blocking=True) + labels = labels.to(self.device, non_blocking=True) + # forward the network + preds = model(feats) + loss = loss_fn(preds, labels) + with torch.no_grad(): + score = self.metric_fn(preds, labels) + loss_meter.update(loss.item(), feats.size(0)) + score_meter.update(score.item(), feats.size(0)) + # optimize the network + if is_train and optimizer is not None: + optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(model.parameters(), 3.0) + optimizer.step() + return loss_meter.avg, score_meter.avg + + def search(self, rs_times, train_loader, valid_loader, save_dir): + for index in range(rs_times): + import pdb + + pdb.set_trace() + print("---") + + def fit( + self, + dataset: DatasetH, + save_dir: Optional[Text] = None, + ): + def _prepare_dataset(df_data): + return th_data.TensorDataset( + torch.from_numpy(df_data["feature"].values).float(), + torch.from_numpy(df_data["label"].values).squeeze().float(), + ) + + def _prepare_loader(dataset, shuffle): + return th_data.DataLoader( + dataset, + batch_size=self.opt_config["batch_size"], + drop_last=False, + pin_memory=True, + num_workers=self.opt_config["num_workers"], + shuffle=shuffle, + ) + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, + ) + train_dataset, valid_dataset, test_dataset = ( + _prepare_dataset(df_train), + _prepare_dataset(df_valid), + _prepare_dataset(df_test), + ) + train_loader, valid_loader, test_loader = ( + _prepare_loader(train_dataset, True), + _prepare_loader(valid_dataset, False), + _prepare_loader(test_dataset, False), + ) + + save_dir = get_or_create_path(save_dir, return_dir=True) + self.logger.info( + "Fit procedure for [{:}] with save path={:}".format( + self.__class__.__name__, save_dir + ) + ) + + model = search(self.rs_times, train_loader, valid_loader, save_dir) + + def _internal_test(ckp_epoch=None, results_dict=None): + with torch.no_grad(): + train_loss, train_score = self.train_or_test_epoch( + train_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + valid_loss, valid_score = self.train_or_test_epoch( + valid_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + test_loss, test_score = self.train_or_test_epoch( + test_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + xstr = ( + "train-score={:.6f}, valid-score={:.6f}, test-score={:.6f}".format( + train_score, valid_score, test_score + ) + ) + if ckp_epoch is not None and isinstance(results_dict, dict): + results_dict["train"][ckp_epoch] = train_score + results_dict["valid"][ckp_epoch] = valid_score + results_dict["test"][ckp_epoch] = test_score + return dict(train=train_score, valid=valid_score, test=test_score), xstr + + # Pre-fetch the potential checkpoints + ckp_path = os.path.join(save_dir, "{:}.pth".format(self.__class__.__name__)) + if os.path.exists(ckp_path): + ckp_data = torch.load(ckp_path) + stop_steps, best_score, best_epoch = ( + ckp_data["stop_steps"], + ckp_data["best_score"], + ckp_data["best_epoch"], + ) + start_epoch, best_param = ckp_data["start_epoch"], ckp_data["best_param"] + results_dict = ckp_data["results_dict"] + self.model.load_state_dict(ckp_data["net_state_dict"]) + self.train_optimizer.load_state_dict(ckp_data["opt_state_dict"]) + self.logger.info("Resume from existing checkpoint: {:}".format(ckp_path)) + else: + stop_steps, best_score, best_epoch = 0, -np.inf, -1 + start_epoch, best_param = 0, None + results_dict = dict( + train=OrderedDict(), valid=OrderedDict(), test=OrderedDict() + ) + _, eval_str = _internal_test(-1, results_dict) + self.logger.info( + "Training from scratch, metrics@start: {:}".format(eval_str) + ) + + for iepoch in range(start_epoch, self.opt_config["epochs"]): + self.logger.info( + "Epoch={:03d}/{:03d} ::==>> Best valid @{:03d} ({:.6f})".format( + iepoch, self.opt_config["epochs"], best_epoch, best_score + ) + ) + train_loss, train_score = self.train_or_test_epoch( + train_loader, + self.model, + self.loss_fn, + self.metric_fn, + True, + self.train_optimizer, + ) + self.logger.info( + "Training :: loss={:.6f}, score={:.6f}".format(train_loss, train_score) + ) + + current_eval_scores, eval_str = _internal_test(iepoch, results_dict) + self.logger.info("Evaluating :: {:}".format(eval_str)) + + if current_eval_scores["valid"] > best_score: + stop_steps, best_epoch, best_score = ( + 0, + iepoch, + current_eval_scores["valid"], + ) + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.opt_config["early_stop"]: + self.logger.info( + "early stop at {:}-th epoch, where the best is @{:}".format( + iepoch, best_epoch + ) + ) + break + save_info = dict( + net_config=self.net_config, + opt_config=self.opt_config, + net_state_dict=self.model.state_dict(), + opt_state_dict=self.train_optimizer.state_dict(), + best_param=best_param, + stop_steps=stop_steps, + best_score=best_score, + best_epoch=best_epoch, + results_dict=results_dict, + start_epoch=iepoch + 1, + ) + torch.save(save_info, ckp_path) + self.logger.info( + "The best score: {:.6f} @ {:02d}-th epoch".format(best_score, best_epoch) + ) + self.model.load_state_dict(best_param) + _, eval_str = _internal_test("final", results_dict) + self.logger.info("Reload the best parameter :: {:}".format(eval_str)) + + if self.use_gpu: + torch.cuda.empty_cache() + self.fitted = True + + def predict(self, dataset): + if not self.fitted: + raise ValueError("The model is not fitted yet!") + x_test = dataset.prepare("test", col_set="feature") + index = x_test.index + + self.model.eval() + x_values = x_test.values + sample_num, batch_size = x_values.shape[0], self.opt_config["batch_size"] + preds = [] + + for begin in range(sample_num)[::batch_size]: + + if sample_num - begin < batch_size: + end = sample_num + else: + end = begin + batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(x_batch).detach().cpu().numpy() + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index)