Module trainer

Expand source code
import json
import math
import os
import shutil
import sys
from typing import Type, Dict

import torch
import transformers

try:
    from torch.utils.tensorboard import SummaryWriter
except ImportError:
    from tensorboardX import SummaryWriter
from torch import nn
from torch.utils.data import DataLoader
from torch.utils import data
from torch.optim.optimizer import Optimizer
from tqdm import trange, tqdm

from dateutil.relativedelta import relativedelta

import random
import numpy as np
import logging
from model import EmptyHeads

logging.basicConfig(
    format=logging.BASIC_FORMAT,
    datefmt='%Y-%m-%d %H:%M:%S',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

from datetime import datetime

try:
    import wandb

    wandb.ensure_configured()
    if wandb.api.api_key is None:
        _has_wandb = False
        wandb.termwarn("W&B installed but not logged in.  Run `wandb login` or set the WANDB_API_KEY env variable.")
    else:
        _has_wandb = False if os.getenv("WANDB_DISABLED") else True
except ImportError:
    _has_wandb = False


def set_seed(seed, n_gpu):
    logger.info(f"   see seed for random, numpy and torch {seed}")
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if n_gpu > 0:
        torch.cuda.manual_seed_all(seed)


def print_model_state_dict(model):
    for param_tensor in model.state_dict():
        logger.info(f"{param_tensor}\t{model.state_dict()[param_tensor].size()}")


def print_optimizer_state_dict(optimizer):
    for var_name in optimizer.state_dict():
        logger.info(f"{var_name}\t{optimizer.state_dict()[var_name]}")


def count_params(model: torch.nn.Module, print_details: bool = False):
    trainable_count = 0
    total_count = 0
    if isinstance(model, torch.nn.Sequential):
        for index in model._modules:
            if print_details:
                print_model_state_dict(model._modules[index])
                logger.info(model._modules[index])
            trainable_count += sum(p.numel() for p in model._modules[index].parameters() if p.requires_grad)
            total_count += sum(p.numel() for p in model._modules[index].parameters())
    else:
        if print_details:
            print_model_state_dict(model)
            logger.info(model)
        total_count = sum(p.numel() for p in model.parameters())
        trainable_count = sum(p.numel() for p in model.parameters() if p.requires_grad)
    logger.info(f'  Total params: {total_count}')
    logger.info(f'  Trainable params: {trainable_count}')
    logger.info(f'  Non-trainable params: {total_count - trainable_count}')


def batch_to_device(batch, device, keep_label=False):
    features = batch['features']
    if isinstance(features, dict):
        for feature_name in features:
            features[feature_name] = features[feature_name].to(device)
    else:
        for inx in range(len(features)):
            for feature_name in features[inx]:
                features[inx][feature_name] = features[inx][feature_name].to(device)

    label_space = batch['labels']
    if label_space == None:  # for tasks like lm, labels are none.
        return features, None
    if not keep_label:
        labels = {"label_space_" + str(inx): label_space[inx].to(device) if torch.is_tensor(label_space[inx]) else
        label_space[inx] for inx in range(len(label_space))}
    else:
        labels = label_space
    return features, labels


def is_wandb_available():
    return _has_wandb


class CollateFunction():
    def __init__(self, up_model):
        self.up_model = up_model

    def __call__(self, batch):
        if isinstance(batch[0], dict):
            padded_features = self.up_model.padding_features(batch)
            return {'features': padded_features,
                    "labels": None}  # label_ids are in features, this task does not need labels, we set


class ModelTrainer():
    def __init__(self, up_model: nn.Module, down_layer: nn.Module = None, train_dataset=None,
                 dev_dataset=None, dev_evaluator=None,
                 epochs: int = 1,
                 visiable_device: str = "0",
                 scheduler: str = 'warmuplinear',
                 warmup_ratio: float = 0.1,
                 optimizer_class: Type[Optimizer] = transformers.AdamW,
                 optimizer_params: Dict[str, object] = {'lr': 5e-5, 'eps': 1e-6, 'correct_bias': False},
                 weight_decay: float = 0.01,
                 early_stop: int = 20,
                 # 20 evaluation steps without improving on the early_stop_on metric as specified in dev_evaluator
                 evaluation_steps: int = 500,
                 output_path: str = None,
                 save_best_model: bool = True,
                 max_grad_norm: float = 1,
                 fp16: bool = False,
                 accumulation_steps=1,
                 fp16_opt_level: str = 'O1',
                 seed: int = 122,
                 data_loader_shuffle=True,
                 device: str = None,
                 dev_batch_size: int = -1,  # the same as train_batch_size
                 n_gpu: int = None,
                 report_model: bool = True,
                 per_gpu_train_batch_size: int = 8,
                 restore_training: bool = False,
                 local_rank: int = -1,
                 wandb_config=None):
        """
        this trainer is written for training a sequential model that contains an upstream_layer (usually transformers)
        and a downstream_layer (usually task-specific heads like FF, RNN, CNN for encoding the output of upstram_layer)

        :param up_model: transformers like transformers.GPT2LMHeadModel or transformers.BERTModel
        :param down_layer: None if up_model already wraps up with an output encoder such as LMHead in GPT2LMHeadModel, else nn.Module for encoding the output of up_model
        :param train_dataset: train_dataset, it can be either instance of torch.data.Dataset or IterableDataset (defined in data.py)
        :param dev_dataset: dev_dataset, it can be either instance of torch.data.Dataset or IterableDataset
        :param dev_evaluator: dev_evaluator, evaluator on dev_dataset for early stop and performance tracking during training (defined in evaluate.py)
        :param epochs: number of epoches for training
        :param visiable_device: devices chosen to perform training
        :param scheduler: scheduler specially from transformers: see options in self._get_scheduler
        :param warmup_ratio: warmup_ratio ratio for learning rate over total training steps
        :param optimizer_class: transformers.AdamW de byfault
        :param optimizer_params: optimizer params
        :param weight_decay:weight decay
        :param early_stop:early stop steps
        :param evaluation_steps:logging steps
        :param output_path: path to save the checkpoint with the best performance as specified in early_stop_on in dev_evaluator instance
        :param save_best_model:save best checkpoint or the latest checkpoint
        :param max_grad_norm:max grad norm
        :param fp16: fp16 training
        :param accumulation_steps:accumulation steps
        :param fp16_opt_level:fp16 opt level
        :param seed:random seed for reproducibility
        :param data_loader_shuffle:Whether to shuffle data_loader of training dataset and dev dataset after epoch ends
        :param device: device for training, None or gpu for gpu training, cpu for gpu training
        :param dev_batch_size: development batch size, usually larger than training batch size due to no grads calculation and hence less burden on memory
        :param n_gpu: number of gpus for training
        :param report_model:if report model's structure and number of trainable params in logging
        :param per_gpu_train_batch_size: what it means literally
        :param restore_training: if restore training if the training process is interupped due to some accidents
        :param local_rank:for distributed training
        :param wandb_config: wandb logging if not none, else without wandb logging
        """

        self.up_model = up_model
        if down_layer == None:
            # In this example, the upstream_layer already integrate the downstream head (namely, simple LM head as in transformers.GPT2LMHeadModel)
            # EmptyHeads is created here only for placeholder purpose
            down_layer = EmptyHeads()

        self.down_layer = down_layer
        assert output_path != None
        output_path = os.path.join("tmp", output_path)
        # os.makedirs(output_path,exist_ok=True)
        if restore_training:
            if not os.listdir(output_path):
                raise ValueError(f"no checkpoint found in {output_path}")
            else:
                logger.info("   loading embedding weights from saved checkpoint")
                self.up_model = self.up_model.reload(
                    output_path)  # for other transformers (apart from bert), the load_saved function has not been added

                logger.info("   loading downstream weights from saved checkpoint")
                self.down_layer.load_saved(output_path)
                with open(output_path + "/ck_report.json") as f:
                    self.ck_report = json.load(f)

        self.model = torch.nn.Sequential(self.up_model, self.down_layer)

        if is_wandb_available() and wandb_config != None:
            # keep track of model topology and gradients if is_wandb_available and args!=None
            wandb.init(project=wandb_config.wandb_project_name, config=wandb_config, name=wandb_config.wandb_run_name)
            wandb.watch(
                (self.up_model, self.down_layer), log_freq=max(100, evaluation_steps)
            )
        self.wandb_config = wandb_config

        self._restore_training = restore_training
        self.early_stop = early_stop

        self._dev_evaluator = dev_evaluator

        self._evaluation_steps = evaluation_steps
        self._save_best_model = save_best_model
        self._max_grad_norm = max_grad_norm

        os.makedirs(output_path, exist_ok=True)
        if os.listdir(output_path) and not restore_training:
            out = input(
                "Output directory ({}) already exists and is not empty, you wanna remove it before start? (y/n)".format(
                    output_path))
            if out == "y":
                shutil.rmtree(output_path)
                os.makedirs(output_path, exist_ok=True)
            else:
                raise ValueError("Output directory ({}) already exists and is not empty".format(
                    output_path))

        logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        fileHandler = logging.FileHandler(os.path.join(output_path, "log.out"), mode="a")
        fileHandler.setFormatter(logFormatter)
        logger.addHandler(fileHandler)
        self._dev_evaluator.reset_logger(output_path)

        self.output_path = output_path

        if device is None or device == "cuda":
            if torch.cuda.is_available():
                device = torch.device("cuda")
                n_gpu = 1 if n_gpu == 1 else torch.cuda.device_count()
            else:
                logger.warning("no cuda is found in your machine, now use cpu")
                device = torch.device("cpu")
                n_gpu = 0
        elif device == "cpu":
            device = torch.device("cpu")
            n_gpu = 0
        else:
            raise ValueError("set device to be None, cuda or cpu")
        assert n_gpu <= torch.cuda.device_count()

        logger.info("Use pytorch device: {}, with gpu_number={}".format(device, n_gpu))

        self._train_batch_size = per_gpu_train_batch_size * max(1, n_gpu)
        self._dev_batch_size = dev_batch_size if dev_batch_size != -1 else self._train_batch_size

        if isinstance(train_dataset, data.IterableDataset):
            self._train_dataloader = DataLoader(train_dataset, batch_size=None)
            self._steps_per_epoch = len(self._train_dataloader.dataset)
        else:
            self._train_dataloader = DataLoader(train_dataset, shuffle=data_loader_shuffle,
                                                batch_size=self._train_batch_size)
            self._steps_per_epoch = len(self._train_dataloader)

        if isinstance(dev_dataset, data.IterableDataset):
            dev_dataloader = DataLoader(dev_dataset, batch_size=None)
        else:
            dev_dataloader = DataLoader(dev_dataset, shuffle=data_loader_shuffle, batch_size=self._dev_batch_size)

        if accumulation_steps > 1:
            self._steps_per_epoch = self._steps_per_epoch // accumulation_steps

        self._dev_data = dev_dataset
        self._dev_evaluator.reset_dataloader(dev_dataloader)

        self.collate_fn = CollateFunction(self.up_model)
        # Use customize batching
        self._train_dataloader.collate_fn = self.collate_fn

        self._train_data = train_dataset
        self._per_gpu_train_batch_size = per_gpu_train_batch_size

        set_seed(seed, n_gpu)

        if n_gpu > 1:
            self.model = torch.nn.DataParallel(self.model, device_ids=[int(i) for i in visiable_device.split(',')])
            self.model = self.model.to(f'cuda:{self.model.device_ids[0]}')

        elif n_gpu == 1:
            self.model = self.model.to(device)

        self._device = device
        self._n_gpu = n_gpu

        self._total_train_steps = int(self._steps_per_epoch * epochs)
        self._epochs = epochs

        if report_model:
            count_params(self.model, print_details=True)

        param_optimizer = list(self.model.named_parameters())

        no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay': weight_decay},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
        ]
        if local_rank != -1:
            self._total_train_steps = self._total_train_steps // torch.distributed.get_world_size()

        self._optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params)

        warmup_steps = math.ceil(self._total_train_steps * warmup_ratio)  # by default 20% of train data for warm-up
        logger.info(f"   Warmup-steps: {warmup_steps}")

        self._scheduler = self._get_scheduler(self._optimizer, scheduler=scheduler, warmup_steps=warmup_steps,
                                              num_total=self._total_train_steps)

        if fp16:
            try:
                from apex import amp
            except ImportError:
                raise ImportError("Please install apex from https://www.github.com/nvidia/apex to use fp16 training.")

            model, optimizer = amp.initialize(self.model, self._optimizer, opt_level=fp16_opt_level)
            self.model = model
            self._optimizer = optimizer

        self._fp16 = fp16
        tb_writer = None
        if local_rank in [-1, 0]:
            tb_writer = SummaryWriter()
        self._tb_writer = tb_writer
        self._local_rank = local_rank
        self._best_score = -float("inf")
        self._early_stop_count = 0
        self.last_time = datetime.now()
        self.accumulation_steps = accumulation_steps
        # assert evaluation_steps % accumulation_steps == 0, "evaluation_steps should be divisable by accumulation_steps"

    def _train_epoch(self, epoch: int, global_steps: int):
        epoch_steps = 0
        epoch_loss = 0.0

        self.model.zero_grad()
        for step, data in enumerate(
                tqdm(self._train_dataloader, desc="training", total=self._steps_per_epoch * self.accumulation_steps)):

            self.model.train()
            if data["labels"] != "skip-device":
                input, labels = batch_to_device(data, self._device)
                # add labels to input for training where this step is ignored when inference
                if isinstance(labels, dict):
                    for idx in range(len(input)):
                        input[idx].update(labels)
            else:
                input = data["features"]
            loss_value, _ = self.model(input)

            if self._n_gpu > 1:
                loss_value = loss_value.mean()
            if self.accumulation_steps > 1:
                loss_value = loss_value / self.accumulation_steps

            if self._fp16:
                try:
                    from apex import amp
                except ImportError:
                    raise ImportError(
                        "Please install apex from https://www.github.com/nvidia/apex to use fp16 training.")
                with amp.scale_loss(loss_value, self._optimizer) as scaled_loss:
                    scaled_loss.backward()
                torch.nn.utils.clip_grad_norm_(amp.master_params(self._optimizer), self._max_grad_norm)
            else:
                loss_value.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self._max_grad_norm)
            epoch_loss += loss_value

            if (step + 1) % self.accumulation_steps == 0:

                self._optimizer.step()
                self._scheduler.step()
                self.model.zero_grad()

                epoch_steps += 1
                total_global = epoch_steps + global_steps

                if self._evaluation_steps > 0 and (total_global) % self._evaluation_steps == 0:
                    dev_loss, eval_scores = self._dev_eval_in_training(epoch, epoch_steps)
                    logger.info("   ***** Evaluation report *****")
                    logger.info(f"  Output path (short): {self.output_path}")
                    logger.info(f"  Early stop on: {self._dev_evaluator.early_stop_on}")
                    logger.info(f"  Early stop count = {self._early_stop_count}/{self.early_stop}")
                    logger.info(
                        f"  Eval steps = {self._evaluation_steps} or (iterations = {self._evaluation_steps * self.accumulation_steps})")
                    logger.info(f"  Best score ({self._dev_evaluator.early_stop_on}) = {self._best_score}")
                    logger.info(f"  Gradient Accumulation steps = {self.accumulation_steps}")

                    logger.info(
                        f"  Num of training examples (actually no. of iterations per epoch for Iterable Dataset)  = {len(self._train_data)}")
                    logger.info(
                        f"  Num of development examples (actually no. of iterations per epoch for Iterable Dataset) = {len(self._dev_data)}")
                    now_time = datetime.now()
                    logger.info(f"  Time spent since last evaluation = {self.time_diff(self.last_time, now_time)}")
                    self.last_time = now_time

                    logger.info(f"  Epoch = {epoch + 1}/{self._epochs}")
                    logger.info(f"  Steps = {total_global}/{self._total_train_steps}")
                    logger.info(
                        f"  Instantaneous batch size per GPU = {self._per_gpu_train_batch_size} and n_gpu = {self._n_gpu} so the input batch size = {self._train_batch_size}")
                    if dev_loss != None:
                        logger.info(f"  dev_loss = {dev_loss:.6f}\t||\t dev_eval_scores = {eval_scores}")
                    else:
                        logger.info(f"  dev_eval_scores = {eval_scores}")

                    train_loss = epoch_loss / epoch_steps
                    logger.info(f"  train_loss = {train_loss}")
                    logger.info("\n********************************************")

                    if is_wandb_available() and self.wandb_config != None:
                        if dev_loss != None:
                            wandb.log(
                                {"loss_dev": dev_loss,
                                 f"best_score_for_{self._dev_evaluator.early_stop_on}": self._best_score,
                                 "loss_train": train_loss, "lr": self._scheduler.get_lr()[0]},
                                step=total_global)
                        else:
                            wandb.log({"loss_train": train_loss,
                                       f"best_score_for_{self._dev_evaluator.early_stop_on}": self._best_score,
                                       "lr": self._scheduler.get_lr()[0]},
                                      step=total_global)

                    for key, value in eval_scores.items():
                        if is_wandb_available() and self.wandb_config != None:
                            wandb.log({f"eval_{key}_dev": value}, step=total_global)
                        self._tb_writer.add_scalar(f"eval_{key}_dev", value, total_global)

                    self._tb_writer.add_scalar("lr", self._scheduler.get_lr()[0], total_global)
                    if dev_loss != None:
                        self._tb_writer.add_scalar("loss_dev", dev_loss, total_global)

                    self._tb_writer.add_scalar("loss_train", train_loss, total_global)

                    if self._early_stop_count >= self.early_stop:
                        logger.info(
                            f"  Continuous {self.early_stop} evaluation steps without loss reduction, so early stopped...")
                        sys.exit(0)

        return epoch_loss, epoch_steps

    def train(self):
        if self._restore_training:
            logger.info(f"***** restoring training from the previous checkpoint: {self.ck_report}*****")
        else:
            logger.info("***** Running training *****")
        logger.info(
            f"  Num of training examples (actually iterations per epoch for Iterable Dataset) = {len(self._train_data)}")
        logger.info(f"  Output path (short): {self.output_path}")
        logger.info(
            f"  Steps per Epoch = {self._steps_per_epoch} or iterations per epoch = {self._steps_per_epoch * self.accumulation_steps}")
        logger.info(f"  Num of Epochs = {self._epochs}")
        logger.info(f"  Best score ({self._dev_evaluator.early_stop_on}) = {self._best_score}")
        logger.info(
            f"  Eval every {self._evaluation_steps} steps or every {self._evaluation_steps * self.accumulation_steps} iterations")
        logger.info(f"  Early stop = {self.early_stop}")
        logger.info(f"  Gradient Accumulation steps = {self.accumulation_steps}")

        logger.info(f"  Total optimization steps = {self._total_train_steps}")
        logger.info(
            f"  Instantaneous batch size per GPU = {self._per_gpu_train_batch_size} and n_gpu = {self._n_gpu} so the input batch size = {self._train_batch_size}")
        global_loss = 0.0
        global_steps = 0
        self.last_time = datetime.now()
        for epoch in trange(self._epochs, desc="Epoch"):
            epoch_loss, epoch_steps = self._train_epoch(epoch, global_steps)
            global_loss += epoch_loss
            global_steps += epoch_steps
            logger.info(f"epoch {epoch + 1} ends, {self._epochs - epoch - 1} epoches left")
            logger.info(
                f"\nglobal_average_loss={global_loss / global_steps},global_steps={global_steps} on training set")

        if self._local_rank in [-1, 0]:
            self._tb_writer.close()

    def _dev_eval_in_training(self, epoch, steps):
        return_scores = {}
        if self._dev_evaluator is not None:

            return_scores = self._dev_evaluator(self.model, self.collate_fn,
                                                output_path=self.output_path, epoch=epoch, steps=steps)

            early_stop_on = self._dev_evaluator.early_stop_on

            check_score = -return_scores[early_stop_on] if early_stop_on == "loss" or early_stop_on == "perplexity" else \
                return_scores[early_stop_on]
            if check_score >= self._best_score and self._save_best_model:
                eval_scores_transformed = {key:
                                               return_scores[key].item() if torch.is_tensor(return_scores[key]) else
                                               return_scores[key]
                                           for key in return_scores.keys()}
                self.save(self.output_path,
                          {"training_examples (when pos_num=1 for ranking)": len(self._train_data),
                           "evaluation_steps": self._evaluation_steps,
                           "train_batch_size": self._train_batch_size, "epoch": epoch + 1, "total_epochs": self._epochs,
                           "steps": steps,
                           "saved_at_total_steps": steps + epoch * self._steps_per_epoch,
                           "steps_per_epoch": self._steps_per_epoch, "eval_scores_on_dev": eval_scores_transformed})

                self._best_score = check_score

                logger.info(f"  Save check-point at epoch={epoch} step={steps}")
                self._early_stop_count = 0
            else:
                self._early_stop_count += 1

        return return_scores.pop("loss").item() if "loss" in return_scores else None, return_scores

    def save(self, path, eval_details):
        if path is None:
            return
        logger.info(f"   Save model to {path}")
        contained_modules = []

        to_iterate = self.model.module._modules if self._n_gpu > 1 else self.model._modules

        for idx, name in enumerate(to_iterate):
            module = to_iterate[str(name)]

            model_path = os.path.join(path, str(idx) + "_" + type(module).__name__)
            os.makedirs(model_path, exist_ok=True)
            module.save(model_path)
            contained_modules.append(
                {'idx': idx, 'name': name, 'path': os.path.basename(model_path), 'type': type(module).__module__})

        if self.wandb_config != None:
            with open(os.path.join(path, 'hyperparams.json'), 'w') as f:
                json.dump(self.wandb_config.__dict__, f, indent=2)

        with open(os.path.join(path, 'modules.json'), 'w') as fOut:
            json.dump(contained_modules, fOut, indent=2)
        with open(os.path.join(path, 'ck_report.json'), 'w') as fOut:
            json.dump(eval_details, fOut, indent=2)

    def _get_scheduler(self, optimizer, scheduler: str, warmup_steps: int, num_total: int):
        assert scheduler in ["constantlr", "warmuplinear", "warmupconstant", "warmupcosine",
                             "warmupcosinewithhardrestarts"], (
            'scheduler should be one of ["constantlr","warmupconstant","warmupcosine","warmupcosinewithhardrestarts"]')
        if scheduler == 'constantlr':
            return transformers.get_constant_schedule(optimizer)
        elif scheduler == 'warmupconstant':
            return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps)
        elif scheduler == 'warmuplinear':
            return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
                                                                num_training_steps=num_total)
        elif scheduler == 'warmupcosine':
            return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
                                                                num_training_steps=num_total)
        elif scheduler == 'warmupcosinewithhardrestarts':
            return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer,
                                                                                   num_warmup_steps=warmup_steps,
                                                                                   num_training_steps=num_total)

    def time_diff(self, t_a, t_b):
        t_diff = relativedelta(t_b, t_a)  # later/end time comes first!
        return '{h}h {m}m {s}s'.format(h=t_diff.hours, m=t_diff.minutes, s=t_diff.seconds)

Functions

def batch_to_device(batch, device, keep_label=False)
Expand source code
def batch_to_device(batch, device, keep_label=False):
    features = batch['features']
    if isinstance(features, dict):
        for feature_name in features:
            features[feature_name] = features[feature_name].to(device)
    else:
        for inx in range(len(features)):
            for feature_name in features[inx]:
                features[inx][feature_name] = features[inx][feature_name].to(device)

    label_space = batch['labels']
    if label_space == None:  # for tasks like lm, labels are none.
        return features, None
    if not keep_label:
        labels = {"label_space_" + str(inx): label_space[inx].to(device) if torch.is_tensor(label_space[inx]) else
        label_space[inx] for inx in range(len(label_space))}
    else:
        labels = label_space
    return features, labels
def count_params(model: torch.nn.modules.module.Module, print_details: bool = False)
Expand source code
def count_params(model: torch.nn.Module, print_details: bool = False):
    trainable_count = 0
    total_count = 0
    if isinstance(model, torch.nn.Sequential):
        for index in model._modules:
            if print_details:
                print_model_state_dict(model._modules[index])
                logger.info(model._modules[index])
            trainable_count += sum(p.numel() for p in model._modules[index].parameters() if p.requires_grad)
            total_count += sum(p.numel() for p in model._modules[index].parameters())
    else:
        if print_details:
            print_model_state_dict(model)
            logger.info(model)
        total_count = sum(p.numel() for p in model.parameters())
        trainable_count = sum(p.numel() for p in model.parameters() if p.requires_grad)
    logger.info(f'  Total params: {total_count}')
    logger.info(f'  Trainable params: {trainable_count}')
    logger.info(f'  Non-trainable params: {total_count - trainable_count}')
def is_wandb_available()
Expand source code
def is_wandb_available():
    return _has_wandb
def print_model_state_dict(model)
Expand source code
def print_model_state_dict(model):
    for param_tensor in model.state_dict():
        logger.info(f"{param_tensor}\t{model.state_dict()[param_tensor].size()}")
def print_optimizer_state_dict(optimizer)
Expand source code
def print_optimizer_state_dict(optimizer):
    for var_name in optimizer.state_dict():
        logger.info(f"{var_name}\t{optimizer.state_dict()[var_name]}")
def set_seed(seed, n_gpu)
Expand source code
def set_seed(seed, n_gpu):
    logger.info(f"   see seed for random, numpy and torch {seed}")
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if n_gpu > 0:
        torch.cuda.manual_seed_all(seed)

Classes

class CollateFunction (up_model)
Expand source code
class CollateFunction():
    def __init__(self, up_model):
        self.up_model = up_model

    def __call__(self, batch):
        if isinstance(batch[0], dict):
            padded_features = self.up_model.padding_features(batch)
            return {'features': padded_features,
                    "labels": None}  # label_ids are in features, this task does not need labels, we set
class ModelTrainer (up_model: torch.nn.modules.module.Module, down_layer: torch.nn.modules.module.Module = None, train_dataset=None, dev_dataset=None, dev_evaluator=None, epochs: int = 1, visiable_device: str = '0', scheduler: str = 'warmuplinear', warmup_ratio: float = 0.1, optimizer_class: Type[torch.optim.optimizer.Optimizer] = transformers.optimization.AdamW, optimizer_params: Dict[str, object] = {'lr': 5e-05, 'eps': 1e-06, 'correct_bias': False}, weight_decay: float = 0.01, early_stop: int = 20, evaluation_steps: int = 500, output_path: str = None, save_best_model: bool = True, max_grad_norm: float = 1, fp16: bool = False, accumulation_steps=1, fp16_opt_level: str = 'O1', seed: int = 122, data_loader_shuffle=True, device: str = None, dev_batch_size: int = -1, n_gpu: int = None, report_model: bool = True, per_gpu_train_batch_size: int = 8, restore_training: bool = False, local_rank: int = -1, wandb_config=None)

this trainer is written for training a sequential model that contains an upstream_layer (usually transformers) and a downstream_layer (usually task-specific heads like FF, RNN, CNN for encoding the output of upstram_layer)

:param up_model: transformers like transformers.GPT2LMHeadModel or transformers.BERTModel :param down_layer: None if up_model already wraps up with an output encoder such as LMHead in GPT2LMHeadModel, else nn.Module for encoding the output of up_model :param train_dataset: train_dataset, it can be either instance of torch.data.Dataset or IterableDataset (defined in data.py) :param dev_dataset: dev_dataset, it can be either instance of torch.data.Dataset or IterableDataset :param dev_evaluator: dev_evaluator, evaluator on dev_dataset for early stop and performance tracking during training (defined in evaluate.py) :param epochs: number of epoches for training :param visiable_device: devices chosen to perform training :param scheduler: scheduler specially from transformers: see options in self._get_scheduler :param warmup_ratio: warmup_ratio ratio for learning rate over total training steps :param optimizer_class: transformers.AdamW de byfault :param optimizer_params: optimizer params :param weight_decay:weight decay :param early_stop:early stop steps :param evaluation_steps:logging steps :param output_path: path to save the checkpoint with the best performance as specified in early_stop_on in dev_evaluator instance :param save_best_model:save best checkpoint or the latest checkpoint :param max_grad_norm:max grad norm :param fp16: fp16 training :param accumulation_steps:accumulation steps :param fp16_opt_level:fp16 opt level :param seed:random seed for reproducibility :param data_loader_shuffle:Whether to shuffle data_loader of training dataset and dev dataset after epoch ends :param device: device for training, None or gpu for gpu training, cpu for gpu training :param dev_batch_size: development batch size, usually larger than training batch size due to no grads calculation and hence less burden on memory :param n_gpu: number of gpus for training :param report_model:if report model's structure and number of trainable params in logging :param per_gpu_train_batch_size: what it means literally :param restore_training: if restore training if the training process is interupped due to some accidents :param local_rank:for distributed training :param wandb_config: wandb logging if not none, else without wandb logging

Expand source code
class ModelTrainer():
    def __init__(self, up_model: nn.Module, down_layer: nn.Module = None, train_dataset=None,
                 dev_dataset=None, dev_evaluator=None,
                 epochs: int = 1,
                 visiable_device: str = "0",
                 scheduler: str = 'warmuplinear',
                 warmup_ratio: float = 0.1,
                 optimizer_class: Type[Optimizer] = transformers.AdamW,
                 optimizer_params: Dict[str, object] = {'lr': 5e-5, 'eps': 1e-6, 'correct_bias': False},
                 weight_decay: float = 0.01,
                 early_stop: int = 20,
                 # 20 evaluation steps without improving on the early_stop_on metric as specified in dev_evaluator
                 evaluation_steps: int = 500,
                 output_path: str = None,
                 save_best_model: bool = True,
                 max_grad_norm: float = 1,
                 fp16: bool = False,
                 accumulation_steps=1,
                 fp16_opt_level: str = 'O1',
                 seed: int = 122,
                 data_loader_shuffle=True,
                 device: str = None,
                 dev_batch_size: int = -1,  # the same as train_batch_size
                 n_gpu: int = None,
                 report_model: bool = True,
                 per_gpu_train_batch_size: int = 8,
                 restore_training: bool = False,
                 local_rank: int = -1,
                 wandb_config=None):
        """
        this trainer is written for training a sequential model that contains an upstream_layer (usually transformers)
        and a downstream_layer (usually task-specific heads like FF, RNN, CNN for encoding the output of upstram_layer)

        :param up_model: transformers like transformers.GPT2LMHeadModel or transformers.BERTModel
        :param down_layer: None if up_model already wraps up with an output encoder such as LMHead in GPT2LMHeadModel, else nn.Module for encoding the output of up_model
        :param train_dataset: train_dataset, it can be either instance of torch.data.Dataset or IterableDataset (defined in data.py)
        :param dev_dataset: dev_dataset, it can be either instance of torch.data.Dataset or IterableDataset
        :param dev_evaluator: dev_evaluator, evaluator on dev_dataset for early stop and performance tracking during training (defined in evaluate.py)
        :param epochs: number of epoches for training
        :param visiable_device: devices chosen to perform training
        :param scheduler: scheduler specially from transformers: see options in self._get_scheduler
        :param warmup_ratio: warmup_ratio ratio for learning rate over total training steps
        :param optimizer_class: transformers.AdamW de byfault
        :param optimizer_params: optimizer params
        :param weight_decay:weight decay
        :param early_stop:early stop steps
        :param evaluation_steps:logging steps
        :param output_path: path to save the checkpoint with the best performance as specified in early_stop_on in dev_evaluator instance
        :param save_best_model:save best checkpoint or the latest checkpoint
        :param max_grad_norm:max grad norm
        :param fp16: fp16 training
        :param accumulation_steps:accumulation steps
        :param fp16_opt_level:fp16 opt level
        :param seed:random seed for reproducibility
        :param data_loader_shuffle:Whether to shuffle data_loader of training dataset and dev dataset after epoch ends
        :param device: device for training, None or gpu for gpu training, cpu for gpu training
        :param dev_batch_size: development batch size, usually larger than training batch size due to no grads calculation and hence less burden on memory
        :param n_gpu: number of gpus for training
        :param report_model:if report model's structure and number of trainable params in logging
        :param per_gpu_train_batch_size: what it means literally
        :param restore_training: if restore training if the training process is interupped due to some accidents
        :param local_rank:for distributed training
        :param wandb_config: wandb logging if not none, else without wandb logging
        """

        self.up_model = up_model
        if down_layer == None:
            # In this example, the upstream_layer already integrate the downstream head (namely, simple LM head as in transformers.GPT2LMHeadModel)
            # EmptyHeads is created here only for placeholder purpose
            down_layer = EmptyHeads()

        self.down_layer = down_layer
        assert output_path != None
        output_path = os.path.join("tmp", output_path)
        # os.makedirs(output_path,exist_ok=True)
        if restore_training:
            if not os.listdir(output_path):
                raise ValueError(f"no checkpoint found in {output_path}")
            else:
                logger.info("   loading embedding weights from saved checkpoint")
                self.up_model = self.up_model.reload(
                    output_path)  # for other transformers (apart from bert), the load_saved function has not been added

                logger.info("   loading downstream weights from saved checkpoint")
                self.down_layer.load_saved(output_path)
                with open(output_path + "/ck_report.json") as f:
                    self.ck_report = json.load(f)

        self.model = torch.nn.Sequential(self.up_model, self.down_layer)

        if is_wandb_available() and wandb_config != None:
            # keep track of model topology and gradients if is_wandb_available and args!=None
            wandb.init(project=wandb_config.wandb_project_name, config=wandb_config, name=wandb_config.wandb_run_name)
            wandb.watch(
                (self.up_model, self.down_layer), log_freq=max(100, evaluation_steps)
            )
        self.wandb_config = wandb_config

        self._restore_training = restore_training
        self.early_stop = early_stop

        self._dev_evaluator = dev_evaluator

        self._evaluation_steps = evaluation_steps
        self._save_best_model = save_best_model
        self._max_grad_norm = max_grad_norm

        os.makedirs(output_path, exist_ok=True)
        if os.listdir(output_path) and not restore_training:
            out = input(
                "Output directory ({}) already exists and is not empty, you wanna remove it before start? (y/n)".format(
                    output_path))
            if out == "y":
                shutil.rmtree(output_path)
                os.makedirs(output_path, exist_ok=True)
            else:
                raise ValueError("Output directory ({}) already exists and is not empty".format(
                    output_path))

        logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        fileHandler = logging.FileHandler(os.path.join(output_path, "log.out"), mode="a")
        fileHandler.setFormatter(logFormatter)
        logger.addHandler(fileHandler)
        self._dev_evaluator.reset_logger(output_path)

        self.output_path = output_path

        if device is None or device == "cuda":
            if torch.cuda.is_available():
                device = torch.device("cuda")
                n_gpu = 1 if n_gpu == 1 else torch.cuda.device_count()
            else:
                logger.warning("no cuda is found in your machine, now use cpu")
                device = torch.device("cpu")
                n_gpu = 0
        elif device == "cpu":
            device = torch.device("cpu")
            n_gpu = 0
        else:
            raise ValueError("set device to be None, cuda or cpu")
        assert n_gpu <= torch.cuda.device_count()

        logger.info("Use pytorch device: {}, with gpu_number={}".format(device, n_gpu))

        self._train_batch_size = per_gpu_train_batch_size * max(1, n_gpu)
        self._dev_batch_size = dev_batch_size if dev_batch_size != -1 else self._train_batch_size

        if isinstance(train_dataset, data.IterableDataset):
            self._train_dataloader = DataLoader(train_dataset, batch_size=None)
            self._steps_per_epoch = len(self._train_dataloader.dataset)
        else:
            self._train_dataloader = DataLoader(train_dataset, shuffle=data_loader_shuffle,
                                                batch_size=self._train_batch_size)
            self._steps_per_epoch = len(self._train_dataloader)

        if isinstance(dev_dataset, data.IterableDataset):
            dev_dataloader = DataLoader(dev_dataset, batch_size=None)
        else:
            dev_dataloader = DataLoader(dev_dataset, shuffle=data_loader_shuffle, batch_size=self._dev_batch_size)

        if accumulation_steps > 1:
            self._steps_per_epoch = self._steps_per_epoch // accumulation_steps

        self._dev_data = dev_dataset
        self._dev_evaluator.reset_dataloader(dev_dataloader)

        self.collate_fn = CollateFunction(self.up_model)
        # Use customize batching
        self._train_dataloader.collate_fn = self.collate_fn

        self._train_data = train_dataset
        self._per_gpu_train_batch_size = per_gpu_train_batch_size

        set_seed(seed, n_gpu)

        if n_gpu > 1:
            self.model = torch.nn.DataParallel(self.model, device_ids=[int(i) for i in visiable_device.split(',')])
            self.model = self.model.to(f'cuda:{self.model.device_ids[0]}')

        elif n_gpu == 1:
            self.model = self.model.to(device)

        self._device = device
        self._n_gpu = n_gpu

        self._total_train_steps = int(self._steps_per_epoch * epochs)
        self._epochs = epochs

        if report_model:
            count_params(self.model, print_details=True)

        param_optimizer = list(self.model.named_parameters())

        no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay': weight_decay},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
        ]
        if local_rank != -1:
            self._total_train_steps = self._total_train_steps // torch.distributed.get_world_size()

        self._optimizer = optimizer_class(optimizer_grouped_parameters, **optimizer_params)

        warmup_steps = math.ceil(self._total_train_steps * warmup_ratio)  # by default 20% of train data for warm-up
        logger.info(f"   Warmup-steps: {warmup_steps}")

        self._scheduler = self._get_scheduler(self._optimizer, scheduler=scheduler, warmup_steps=warmup_steps,
                                              num_total=self._total_train_steps)

        if fp16:
            try:
                from apex import amp
            except ImportError:
                raise ImportError("Please install apex from https://www.github.com/nvidia/apex to use fp16 training.")

            model, optimizer = amp.initialize(self.model, self._optimizer, opt_level=fp16_opt_level)
            self.model = model
            self._optimizer = optimizer

        self._fp16 = fp16
        tb_writer = None
        if local_rank in [-1, 0]:
            tb_writer = SummaryWriter()
        self._tb_writer = tb_writer
        self._local_rank = local_rank
        self._best_score = -float("inf")
        self._early_stop_count = 0
        self.last_time = datetime.now()
        self.accumulation_steps = accumulation_steps
        # assert evaluation_steps % accumulation_steps == 0, "evaluation_steps should be divisable by accumulation_steps"

    def _train_epoch(self, epoch: int, global_steps: int):
        epoch_steps = 0
        epoch_loss = 0.0

        self.model.zero_grad()
        for step, data in enumerate(
                tqdm(self._train_dataloader, desc="training", total=self._steps_per_epoch * self.accumulation_steps)):

            self.model.train()
            if data["labels"] != "skip-device":
                input, labels = batch_to_device(data, self._device)
                # add labels to input for training where this step is ignored when inference
                if isinstance(labels, dict):
                    for idx in range(len(input)):
                        input[idx].update(labels)
            else:
                input = data["features"]
            loss_value, _ = self.model(input)

            if self._n_gpu > 1:
                loss_value = loss_value.mean()
            if self.accumulation_steps > 1:
                loss_value = loss_value / self.accumulation_steps

            if self._fp16:
                try:
                    from apex import amp
                except ImportError:
                    raise ImportError(
                        "Please install apex from https://www.github.com/nvidia/apex to use fp16 training.")
                with amp.scale_loss(loss_value, self._optimizer) as scaled_loss:
                    scaled_loss.backward()
                torch.nn.utils.clip_grad_norm_(amp.master_params(self._optimizer), self._max_grad_norm)
            else:
                loss_value.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self._max_grad_norm)
            epoch_loss += loss_value

            if (step + 1) % self.accumulation_steps == 0:

                self._optimizer.step()
                self._scheduler.step()
                self.model.zero_grad()

                epoch_steps += 1
                total_global = epoch_steps + global_steps

                if self._evaluation_steps > 0 and (total_global) % self._evaluation_steps == 0:
                    dev_loss, eval_scores = self._dev_eval_in_training(epoch, epoch_steps)
                    logger.info("   ***** Evaluation report *****")
                    logger.info(f"  Output path (short): {self.output_path}")
                    logger.info(f"  Early stop on: {self._dev_evaluator.early_stop_on}")
                    logger.info(f"  Early stop count = {self._early_stop_count}/{self.early_stop}")
                    logger.info(
                        f"  Eval steps = {self._evaluation_steps} or (iterations = {self._evaluation_steps * self.accumulation_steps})")
                    logger.info(f"  Best score ({self._dev_evaluator.early_stop_on}) = {self._best_score}")
                    logger.info(f"  Gradient Accumulation steps = {self.accumulation_steps}")

                    logger.info(
                        f"  Num of training examples (actually no. of iterations per epoch for Iterable Dataset)  = {len(self._train_data)}")
                    logger.info(
                        f"  Num of development examples (actually no. of iterations per epoch for Iterable Dataset) = {len(self._dev_data)}")
                    now_time = datetime.now()
                    logger.info(f"  Time spent since last evaluation = {self.time_diff(self.last_time, now_time)}")
                    self.last_time = now_time

                    logger.info(f"  Epoch = {epoch + 1}/{self._epochs}")
                    logger.info(f"  Steps = {total_global}/{self._total_train_steps}")
                    logger.info(
                        f"  Instantaneous batch size per GPU = {self._per_gpu_train_batch_size} and n_gpu = {self._n_gpu} so the input batch size = {self._train_batch_size}")
                    if dev_loss != None:
                        logger.info(f"  dev_loss = {dev_loss:.6f}\t||\t dev_eval_scores = {eval_scores}")
                    else:
                        logger.info(f"  dev_eval_scores = {eval_scores}")

                    train_loss = epoch_loss / epoch_steps
                    logger.info(f"  train_loss = {train_loss}")
                    logger.info("\n********************************************")

                    if is_wandb_available() and self.wandb_config != None:
                        if dev_loss != None:
                            wandb.log(
                                {"loss_dev": dev_loss,
                                 f"best_score_for_{self._dev_evaluator.early_stop_on}": self._best_score,
                                 "loss_train": train_loss, "lr": self._scheduler.get_lr()[0]},
                                step=total_global)
                        else:
                            wandb.log({"loss_train": train_loss,
                                       f"best_score_for_{self._dev_evaluator.early_stop_on}": self._best_score,
                                       "lr": self._scheduler.get_lr()[0]},
                                      step=total_global)

                    for key, value in eval_scores.items():
                        if is_wandb_available() and self.wandb_config != None:
                            wandb.log({f"eval_{key}_dev": value}, step=total_global)
                        self._tb_writer.add_scalar(f"eval_{key}_dev", value, total_global)

                    self._tb_writer.add_scalar("lr", self._scheduler.get_lr()[0], total_global)
                    if dev_loss != None:
                        self._tb_writer.add_scalar("loss_dev", dev_loss, total_global)

                    self._tb_writer.add_scalar("loss_train", train_loss, total_global)

                    if self._early_stop_count >= self.early_stop:
                        logger.info(
                            f"  Continuous {self.early_stop} evaluation steps without loss reduction, so early stopped...")
                        sys.exit(0)

        return epoch_loss, epoch_steps

    def train(self):
        if self._restore_training:
            logger.info(f"***** restoring training from the previous checkpoint: {self.ck_report}*****")
        else:
            logger.info("***** Running training *****")
        logger.info(
            f"  Num of training examples (actually iterations per epoch for Iterable Dataset) = {len(self._train_data)}")
        logger.info(f"  Output path (short): {self.output_path}")
        logger.info(
            f"  Steps per Epoch = {self._steps_per_epoch} or iterations per epoch = {self._steps_per_epoch * self.accumulation_steps}")
        logger.info(f"  Num of Epochs = {self._epochs}")
        logger.info(f"  Best score ({self._dev_evaluator.early_stop_on}) = {self._best_score}")
        logger.info(
            f"  Eval every {self._evaluation_steps} steps or every {self._evaluation_steps * self.accumulation_steps} iterations")
        logger.info(f"  Early stop = {self.early_stop}")
        logger.info(f"  Gradient Accumulation steps = {self.accumulation_steps}")

        logger.info(f"  Total optimization steps = {self._total_train_steps}")
        logger.info(
            f"  Instantaneous batch size per GPU = {self._per_gpu_train_batch_size} and n_gpu = {self._n_gpu} so the input batch size = {self._train_batch_size}")
        global_loss = 0.0
        global_steps = 0
        self.last_time = datetime.now()
        for epoch in trange(self._epochs, desc="Epoch"):
            epoch_loss, epoch_steps = self._train_epoch(epoch, global_steps)
            global_loss += epoch_loss
            global_steps += epoch_steps
            logger.info(f"epoch {epoch + 1} ends, {self._epochs - epoch - 1} epoches left")
            logger.info(
                f"\nglobal_average_loss={global_loss / global_steps},global_steps={global_steps} on training set")

        if self._local_rank in [-1, 0]:
            self._tb_writer.close()

    def _dev_eval_in_training(self, epoch, steps):
        return_scores = {}
        if self._dev_evaluator is not None:

            return_scores = self._dev_evaluator(self.model, self.collate_fn,
                                                output_path=self.output_path, epoch=epoch, steps=steps)

            early_stop_on = self._dev_evaluator.early_stop_on

            check_score = -return_scores[early_stop_on] if early_stop_on == "loss" or early_stop_on == "perplexity" else \
                return_scores[early_stop_on]
            if check_score >= self._best_score and self._save_best_model:
                eval_scores_transformed = {key:
                                               return_scores[key].item() if torch.is_tensor(return_scores[key]) else
                                               return_scores[key]
                                           for key in return_scores.keys()}
                self.save(self.output_path,
                          {"training_examples (when pos_num=1 for ranking)": len(self._train_data),
                           "evaluation_steps": self._evaluation_steps,
                           "train_batch_size": self._train_batch_size, "epoch": epoch + 1, "total_epochs": self._epochs,
                           "steps": steps,
                           "saved_at_total_steps": steps + epoch * self._steps_per_epoch,
                           "steps_per_epoch": self._steps_per_epoch, "eval_scores_on_dev": eval_scores_transformed})

                self._best_score = check_score

                logger.info(f"  Save check-point at epoch={epoch} step={steps}")
                self._early_stop_count = 0
            else:
                self._early_stop_count += 1

        return return_scores.pop("loss").item() if "loss" in return_scores else None, return_scores

    def save(self, path, eval_details):
        if path is None:
            return
        logger.info(f"   Save model to {path}")
        contained_modules = []

        to_iterate = self.model.module._modules if self._n_gpu > 1 else self.model._modules

        for idx, name in enumerate(to_iterate):
            module = to_iterate[str(name)]

            model_path = os.path.join(path, str(idx) + "_" + type(module).__name__)
            os.makedirs(model_path, exist_ok=True)
            module.save(model_path)
            contained_modules.append(
                {'idx': idx, 'name': name, 'path': os.path.basename(model_path), 'type': type(module).__module__})

        if self.wandb_config != None:
            with open(os.path.join(path, 'hyperparams.json'), 'w') as f:
                json.dump(self.wandb_config.__dict__, f, indent=2)

        with open(os.path.join(path, 'modules.json'), 'w') as fOut:
            json.dump(contained_modules, fOut, indent=2)
        with open(os.path.join(path, 'ck_report.json'), 'w') as fOut:
            json.dump(eval_details, fOut, indent=2)

    def _get_scheduler(self, optimizer, scheduler: str, warmup_steps: int, num_total: int):
        assert scheduler in ["constantlr", "warmuplinear", "warmupconstant", "warmupcosine",
                             "warmupcosinewithhardrestarts"], (
            'scheduler should be one of ["constantlr","warmupconstant","warmupcosine","warmupcosinewithhardrestarts"]')
        if scheduler == 'constantlr':
            return transformers.get_constant_schedule(optimizer)
        elif scheduler == 'warmupconstant':
            return transformers.get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps)
        elif scheduler == 'warmuplinear':
            return transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
                                                                num_training_steps=num_total)
        elif scheduler == 'warmupcosine':
            return transformers.get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps,
                                                                num_training_steps=num_total)
        elif scheduler == 'warmupcosinewithhardrestarts':
            return transformers.get_cosine_with_hard_restarts_schedule_with_warmup(optimizer,
                                                                                   num_warmup_steps=warmup_steps,
                                                                                   num_training_steps=num_total)

    def time_diff(self, t_a, t_b):
        t_diff = relativedelta(t_b, t_a)  # later/end time comes first!
        return '{h}h {m}m {s}s'.format(h=t_diff.hours, m=t_diff.minutes, s=t_diff.seconds)

Methods

def save(self, path, eval_details)
Expand source code
def save(self, path, eval_details):
    if path is None:
        return
    logger.info(f"   Save model to {path}")
    contained_modules = []

    to_iterate = self.model.module._modules if self._n_gpu > 1 else self.model._modules

    for idx, name in enumerate(to_iterate):
        module = to_iterate[str(name)]

        model_path = os.path.join(path, str(idx) + "_" + type(module).__name__)
        os.makedirs(model_path, exist_ok=True)
        module.save(model_path)
        contained_modules.append(
            {'idx': idx, 'name': name, 'path': os.path.basename(model_path), 'type': type(module).__module__})

    if self.wandb_config != None:
        with open(os.path.join(path, 'hyperparams.json'), 'w') as f:
            json.dump(self.wandb_config.__dict__, f, indent=2)

    with open(os.path.join(path, 'modules.json'), 'w') as fOut:
        json.dump(contained_modules, fOut, indent=2)
    with open(os.path.join(path, 'ck_report.json'), 'w') as fOut:
        json.dump(eval_details, fOut, indent=2)
def time_diff(self, t_a, t_b)
Expand source code
def time_diff(self, t_a, t_b):
    t_diff = relativedelta(t_b, t_a)  # later/end time comes first!
    return '{h}h {m}m {s}s'.format(h=t_diff.hours, m=t_diff.minutes, s=t_diff.seconds)
def train(self)
Expand source code
def train(self):
    if self._restore_training:
        logger.info(f"***** restoring training from the previous checkpoint: {self.ck_report}*****")
    else:
        logger.info("***** Running training *****")
    logger.info(
        f"  Num of training examples (actually iterations per epoch for Iterable Dataset) = {len(self._train_data)}")
    logger.info(f"  Output path (short): {self.output_path}")
    logger.info(
        f"  Steps per Epoch = {self._steps_per_epoch} or iterations per epoch = {self._steps_per_epoch * self.accumulation_steps}")
    logger.info(f"  Num of Epochs = {self._epochs}")
    logger.info(f"  Best score ({self._dev_evaluator.early_stop_on}) = {self._best_score}")
    logger.info(
        f"  Eval every {self._evaluation_steps} steps or every {self._evaluation_steps * self.accumulation_steps} iterations")
    logger.info(f"  Early stop = {self.early_stop}")
    logger.info(f"  Gradient Accumulation steps = {self.accumulation_steps}")

    logger.info(f"  Total optimization steps = {self._total_train_steps}")
    logger.info(
        f"  Instantaneous batch size per GPU = {self._per_gpu_train_batch_size} and n_gpu = {self._n_gpu} so the input batch size = {self._train_batch_size}")
    global_loss = 0.0
    global_steps = 0
    self.last_time = datetime.now()
    for epoch in trange(self._epochs, desc="Epoch"):
        epoch_loss, epoch_steps = self._train_epoch(epoch, global_steps)
        global_loss += epoch_loss
        global_steps += epoch_steps
        logger.info(f"epoch {epoch + 1} ends, {self._epochs - epoch - 1} epoches left")
        logger.info(
            f"\nglobal_average_loss={global_loss / global_steps},global_steps={global_steps} on training set")

    if self._local_rank in [-1, 0]:
        self._tb_writer.close()