diff --git a/.latent-data/qlib b/.latent-data/qlib index 91fd53a..73b7107 160000 --- a/.latent-data/qlib +++ b/.latent-data/qlib @@ -1 +1 @@ -Subproject commit 91fd53ab4d0724df73ccf8855ed83b6e1760bb08 +Subproject commit 73b7107ee89f890456f62fc222f71b2e9f5ed23e diff --git a/README.md b/README.md index 9638d09..82d8b77 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ If you find that this project helps your research, please consider citing the re If you want to contribute to this repo, please see [CONTRIBUTING.md](.github/CONTRIBUTING.md). Besides, please follow [CODE-OF-CONDUCT.md](.github/CODE-OF-CONDUCT.md). -We use `[black](https://github.com/psf/black)` for Python code formatter. +We use [`black`](https://github.com/psf/black) for Python code formatter. Please use `black . -l 120`. # License diff --git a/lib/procedures/basic_main.py b/lib/procedures/basic_main.py index 3eba35b..60c3264 100644 --- a/lib/procedures/basic_main.py +++ b/lib/procedures/basic_main.py @@ -2,7 +2,8 @@ # Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2019 # ################################################## import os, sys, time, torch -from log_utils import AverageMeter, time_string +from log_utils import AverageMeter +from log_utils import time_string from utils import obtain_accuracy diff --git a/lib/procedures/q_exps.py b/lib/procedures/q_exps.py index 7a9f74b..c69d570 100644 --- a/lib/procedures/q_exps.py +++ b/lib/procedures/q_exps.py @@ -2,6 +2,8 @@ # Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.02 # ##################################################### +import inspect + import qlib from qlib.utils import init_instance_by_config from qlib.workflow import R @@ -39,24 +41,29 @@ def update_market(config, market): def run_exp(task_config, dataset, experiment_name, recorder_name, uri): model = init_instance_by_config(task_config["model"]) + model_fit_kwargs = dict(dataset=dataset) - # start exp + # Let's start the experiment. with R.start(experiment_name=experiment_name, recorder_name=recorder_name, uri=uri): - - log_file = R.get_recorder().root_uri / "{:}.log".format(experiment_name) + # Setup log + recorder_root_dir = R.get_recorder().root_uri + log_file = recorder_root_dir / "{:}.log".format(experiment_name) set_log_basic_config(log_file) logger = get_module_logger("q.run_exp") logger.info("task_config={:}".format(task_config)) logger.info("[{:}] - [{:}]: {:}".format(experiment_name, recorder_name, uri)) logger.info("dataset={:}".format(dataset)) - # train model + # Train model R.log_params(**flatten_dict(task_config)) - model.fit(dataset) + if 'save_path' in inspect.getfullargspec(model.fit).args: + model_fit_kwargs['save_path'] = str(recorder_root_dir / 'model-ckps') + model.fit(**model_fit_kwargs) + # Get the recorder recorder = R.get_recorder() R.save_objects(**{"model.pkl": model}) - # generate records: prediction, backtest, and analysis + # Generate records: prediction, backtest, and analysis for record in task_config["record"]: record = record.copy() if record["class"] == "SignalRecord": diff --git a/lib/trade_models/quant_transformer.py b/lib/trade_models/quant_transformer.py index 55a92ec..ea236f3 100755 --- a/lib/trade_models/quant_transformer.py +++ b/lib/trade_models/quant_transformer.py @@ -6,11 +6,12 @@ from __future__ import print_function import os import math +from collections import OrderedDict import numpy as np import pandas as pd import copy from functools import partial -from typing import Optional +from typing import Optional, Text import logging from qlib.utils import ( @@ -28,6 +29,7 @@ import torch.optim as optim import torch.utils.data as th_data import layers as xlayers +from log_utils import AverageMeter from utils import count_parameters from qlib.model.base import Model @@ -107,15 +109,18 @@ class QuantTransformer(Model): raise ValueError("unknown loss `{:}`".format(self.loss)) def metric_fn(self, pred, label): - mask = torch.isfinite(label) + # the metric score : higher is better if self.metric == "" or self.metric == "loss": - return -self.loss_fn(pred[mask], label[mask]) + return -self.loss_fn(pred, label) else: raise ValueError("unknown metric `{:}`".format(self.metric)) - def train_epoch(self, xloader, model, loss_fn, optimizer): - model.train() - scores, losses = [], [] + def train_or_test_epoch(self, xloader, model, loss_fn, metric_fn, is_train, optimizer=None): + if is_train: + model.train() + else: + model.eval() + score_meter, loss_meter = AverageMeter(), AverageMeter() for ibatch, (feats, labels) in enumerate(xloader): feats = feats.to(self.device, non_blocking=True) labels = labels.to(self.device, non_blocking=True) @@ -124,36 +129,20 @@ class QuantTransformer(Model): loss = loss_fn(preds, labels) with torch.no_grad(): score = self.metric_fn(preds, labels) - losses.append(loss.item()) - scores.append(loss.item()) + loss_meter.update(loss.item(), feats.size(0)) + score_meter.update(score.item(), feats.size(0)) # optimize the network - optimizer.zero_grad() - loss.backward() - torch.nn.utils.clip_grad_value_(model.parameters(), 3.0) - optimizer.step() - return np.mean(losses), np.mean(scores) - - def test_epoch(self, xloader, model, loss_fn, metric_fn): - model.eval() - scores, losses = [], [] - with torch.no_grad(): - for ibatch, (feats, labels) in enumerate(xloader): - feats = feats.to(self.device, non_blocking=True) - labels = labels.to(self.device, non_blocking=True) - # forward the network - preds = model(feats) - loss = loss_fn(preds, labels) - score = self.metric_fn(preds, labels) - losses.append(loss.item()) - scores.append(loss.item()) - return np.mean(losses), np.mean(scores) + if is_train and optimizer is not None: + optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(model.parameters(), 3.0) + optimizer.step() + return loss_meter.avg, score_meter.avg def fit( self, dataset: DatasetH, - evals_result=dict(), - verbose=True, - save_path=None, + save_path: Optional[Text] = None, ): def _prepare_dataset(df_data): return th_data.TensorDataset( @@ -187,82 +176,107 @@ class QuantTransformer(Model): _prepare_loader(test_dataset, False), ) - if save_path == None: - save_path = create_save_path(save_path) - stop_steps, best_score, best_epoch = 0, -np.inf, 0 - train_loss = 0 - evals_result["train"] = [] - evals_result["valid"] = [] - - # train + save_path = create_save_path(save_path) self.logger.info("Fit procedure for [{:}] with save path={:}".format(self.__class__.__name__, save_path)) - def _internal_test(): - train_loss, train_score = self.test_epoch(train_loader, self.model, self.loss_fn, self.metric_fn) - valid_loss, valid_score = self.test_epoch(valid_loader, self.model, self.loss_fn, self.metric_fn) - test_loss, test_score = self.test_epoch(test_loader, self.model, self.loss_fn, self.metric_fn) - xstr = "train-score={:.6f}, valid-score={:.6f}, test-score={:.6f}".format( - train_score, valid_score, test_score + def _internal_test(ckp_epoch=None, results_dict=None): + with torch.no_grad(): + train_loss, train_score = self.train_or_test_epoch( + train_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + valid_loss, valid_score = self.train_or_test_epoch( + valid_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + test_loss, test_score = self.train_or_test_epoch( + test_loader, self.model, self.loss_fn, self.metric_fn, False, None + ) + xstr = "train-score={:.6f}, valid-score={:.6f}, test-score={:.6f}".format( + train_score, valid_score, test_score + ) + if ckp_epoch is not None and isinstance(results_dict, dict): + results_dict["train"][ckp_epoch] = train_score + results_dict["valid"][ckp_epoch] = valid_score + results_dict["test"][ckp_epoch] = test_score + return dict(train=train_score, valid=valid_score, test=test_score), xstr + + # Pre-fetch the potential checkpoints + ckp_path = os.path.join(save_path, "{:}.pth".format(self.__class__.__name__)) + if os.path.exists(ckp_path): + ckp_data = torch.load(ckp_path) + import pdb + + pdb.set_trace() + else: + stop_steps, best_score, best_epoch = 0, -np.inf, -1 + start_epoch = 0 + results_dict = dict(train=OrderedDict(), valid=OrderedDict(), test=OrderedDict()) + _, eval_str = _internal_test(-1, results_dict) + self.logger.info("Training from scratch, metrics@start: {:}".format(eval_str)) + + for iepoch in range(start_epoch, self.opt_config["epochs"]): + self.logger.info( + "Epoch={:03d}/{:03d} ::==>> Best valid @{:03d} ({:.6f})".format( + iepoch, self.opt_config["epochs"], best_epoch, best_score + ) ) - return dict(train=train_score, valid=valid_score, test=test_score), xstr - _, eval_str = _internal_test() - self.logger.info("Before Training: {:}".format(eval_str)) - for iepoch in range(self.opt_config["epochs"]): - self.logger.info("Epoch={:03d}/{:03d} ::==>>".format(iepoch, self.opt_config["epochs"])) - - train_loss, train_score = self.train_epoch(train_loader, self.model, self.loss_fn, self.train_optimizer) + train_loss, train_score = self.train_or_test_epoch( + train_loader, self.model, self.loss_fn, self.metric_fn, True, self.train_optimizer + ) self.logger.info("Training :: loss={:.6f}, score={:.6f}".format(train_loss, train_score)) - eval_score_dict, eval_str = _internal_test() + current_eval_scores, eval_str = _internal_test(iepoch, results_dict) self.logger.info("Evaluating :: {:}".format(eval_str)) - evals_result["train"].append(eval_score_dict["train"]) - evals_result["valid"].append(eval_score_dict["valid"]) - if eval_score_dict["valid"] > best_score: - stop_steps, best_epoch, best_score = 0, iepoch, eval_score_dict["valid"] + if current_eval_scores["valid"] > best_score: + stop_steps, best_epoch, best_score = 0, iepoch, current_eval_scores["valid"] best_param = copy.deepcopy(self.model.state_dict()) else: stop_steps += 1 if stop_steps >= self.opt_config["early_stop"]: self.logger.info("early stop at {:}-th epoch, where the best is @{:}".format(iepoch, best_epoch)) break - + save_info = dict( + net_config=self.net_config, + opt_config=self.opt_config, + net_state_dict=self.model.state_dict(), + opt_state_dict=self.train_optimizer.state_dict(), + best_param=best_param, + stop_steps=stop_steps, + best_score=best_score, + best_epoch=best_epoch, + start_epoch=iepoch + 1, + ) + torch.save(save_info, ckp_path) self.logger.info("The best score: {:.6f} @ {:02d}-th epoch".format(best_score, best_epoch)) self.model.load_state_dict(best_param) - torch.save(best_param, save_path) if self.use_gpu: torch.cuda.empty_cache() self.fitted = True def predict(self, dataset): - if not self.fitted: - raise ValueError("model is not fitted yet!") - + raise ValueError("The model is not fitted yet!") x_test = dataset.prepare("test", col_set="feature") index = x_test.index + self.model.eval() x_values = x_test.values - sample_num = x_values.shape[0] + sample_num, batch_size = x_values.shape[0], self.opt_config["batch_size"] preds = [] - for begin in range(sample_num)[:: self.batch_size]: + for begin in range(sample_num)[::batch_size]: - if sample_num - begin < self.batch_size: + if sample_num - begin < batch_size: end = sample_num else: - end = begin + self.batch_size + end = begin + batch_size x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) with torch.no_grad(): - if self.use_gpu: - pred = self.model(x_batch).detach().cpu().numpy() - else: - pred = self.model(x_batch).detach().numpy() - + pred = self.model(x_batch).detach().cpu().numpy() preds.append(pred) return pd.Series(np.concatenate(preds), index=index)