From fd136b02d19ea7c046b04e8aa8ae65805efb0579 Mon Sep 17 00:00:00 2001 From: Xueqing Liu Date: Tue, 23 Nov 2021 14:26:39 -0500 Subject: [PATCH] bug fix for TransformerEstimator (#293) * fix checkpoint naming + trial id for non-ray mode, fix the bug in running test mode, delete all the checkpoints in non-ray mode * finished testing for checkpoint naming, delete checkpoint, ray, max iter = 1 * adding predict_proba, address PR 293's comments close #293 #291 --- .github/workflows/python-package.yml | 2 + docs/index.rst | 13 -- flaml/automl.py | 27 +++- flaml/data.py | 11 +- flaml/model.py | 183 ++++++++++++++++++++------- flaml/nlp/__init__.py | 1 - flaml/nlp/utils.py | 79 +++++++++++- test/nlp/test_autohf.py | 11 +- test/nlp/test_autohf_maxiter1.py | 63 +++++++++ test/nlp/test_autohf_regression.py | 9 +- 10 files changed, 316 insertions(+), 83 deletions(-) delete mode 100644 flaml/nlp/__init__.py create mode 100644 test/nlp/test_autohf_maxiter1.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 499c11499..0aa22130a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -39,6 +39,8 @@ jobs: - name: Install packages and dependencies run: | python -m pip install --upgrade pip wheel + pip install -e . + python -c "import flaml" pip install -e .[test] - name: If linux or mac, install ray if: (matrix.os == 'macOS-latest' || matrix.os == 'ubuntu-latest') && matrix.python-version != '3.9' diff --git a/docs/index.rst b/docs/index.rst index e6acdb9f5..1cc75f09a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -43,16 +43,3 @@ Online AutoML .. autoclass:: flaml.AutoVW :members: - -NLP ---- - -.. autoclass:: flaml.nlp.HPOArgs - :members: - -.. Indices and tables -.. ================== - -.. * :ref:`genindex` -.. * :ref:`modindex` -.. * :ref:`search` diff --git a/flaml/automl.py b/flaml/automl.py index 3d48575df..7d80d9d37 100644 --- a/flaml/automl.py +++ b/flaml/automl.py @@ -22,7 +22,7 @@ import pandas as pd import logging from typing import List, Union from pandas import DataFrame -from .nlp.utils import _is_nlp_task +from .data import _is_nlp_task from .ml import ( compute_estimator, @@ -161,6 +161,8 @@ class SearchState: self.trained_estimator.cleanup() if trained_estimator: self.trained_estimator = trained_estimator + elif trained_estimator: + trained_estimator.cleanup() self.metric_for_logging = metric_for_logging self.val_loss, self.config = obj, config @@ -349,6 +351,9 @@ class AutoMLState: estimator, train_time = result["estimator"], result["train_time"] else: + if _is_nlp_task(self.task): + use_ray = self.fit_kwargs.get("use_ray") + self.fit_kwargs["use_ray"] = False estimator, train_time = train_estimator( X_train=sampled_X_train, y_train=sampled_y_train, @@ -360,6 +365,11 @@ class AutoMLState: budget=budget, fit_kwargs=self.fit_kwargs, ) + if _is_nlp_task(self.task): + if use_ray: + self.fit_kwargs["use_ray"] = use_ray + else: + del self.fit_kwargs["use_ray"] if sampled_weight is not None: self.fit_kwargs["sample_weight"] = weight return estimator, train_time @@ -753,10 +763,14 @@ class AutoML(BaseEstimator): if isinstance(X[0], List): X = [x for x in zip(*X)] X = DataFrame( - { - self._transformer._str_columns[idx]: X[idx] - for idx in range(len(X)) - } + dict( + [ + (self._transformer._str_columns[idx], X[idx]) + if isinstance(X[0], List) + else (self._transformer._str_columns[idx], [X[idx]]) + for idx in range(len(X)) + ] + ) ) except IndexError: raise IndexError( @@ -1942,6 +1956,7 @@ class AutoML(BaseEstimator): if _is_nlp_task(self._state.task): self._state.fit_kwargs["metric"] = metric + self._state.fit_kwargs["use_ray"] = self._use_ray self._sample = ( sample @@ -2164,6 +2179,8 @@ class AutoML(BaseEstimator): num_samples=self._max_iter, verbose=max(self.verbose - 2, 0), raise_on_failed_trial=False, + keep_checkpoints_num=1, + checkpoint_score_attr="min-val_loss", ) # logger.info([trial.last_result for trial in analysis.trials]) trials = sorted( diff --git a/flaml/data.py b/flaml/data.py index 270cd048d..e3d923b0b 100644 --- a/flaml/data.py +++ b/flaml/data.py @@ -22,6 +22,13 @@ TS_VALUE_COL = "y" FORECAST = "forecast" +def _is_nlp_task(task): + if task in [SEQCLASSIFICATION, SEQREGRESSION]: + return True + else: + return False + + def load_openml_dataset( dataset_id, data_dir=None, random_state=0, dataset_format="dataframe" ): @@ -225,8 +232,6 @@ class DataTransformer: X: Processed numpy array or pandas dataframe of training data. y: Processed numpy array or pandas series of labels. """ - from .nlp.utils import _is_nlp_task - if _is_nlp_task(task): # if the mode is NLP, check the type of input, each column must be either string or # ids (input ids, token type id, attention mask, etc.) @@ -359,8 +364,6 @@ class DataTransformer: """ X = X.copy() - from .nlp.utils import _is_nlp_task - if _is_nlp_task(self._task): # if the mode is NLP, check the type of input, each column must be either string or # ids (input ids, token type id, attention mask, etc.) diff --git a/flaml/model.py b/flaml/model.py index d0867819f..42d66d5ec 100644 --- a/flaml/model.py +++ b/flaml/model.py @@ -15,6 +15,7 @@ from sklearn.linear_model import LogisticRegression from sklearn.dummy import DummyClassifier, DummyRegressor from scipy.sparse import issparse import logging +import shutil from . import tune from .data import ( group_counts, @@ -26,6 +27,7 @@ from .data import ( import pandas as pd from pandas import DataFrame, Series +import sys try: import psutil @@ -287,6 +289,9 @@ class TransformersEstimator(BaseEstimator): def __init__(self, task="seq-classification", **config): super().__init__(task, **config) + import uuid + + self.trial_id = str(uuid.uuid1().hex)[:8] def _join(self, X_train, y_train): y_train = DataFrame(y_train, columns=["label"], index=X_train.index) @@ -295,29 +300,32 @@ class TransformersEstimator(BaseEstimator): @classmethod def search_space(cls, **params): - import sys - return { "learning_rate": { "domain": tune.loguniform(lower=1e-6, upper=1e-3), + "init_value": 1e-5, }, "num_train_epochs": { "domain": tune.loguniform(lower=0.1, upper=10.0), }, "per_device_train_batch_size": { "domain": tune.choice([4, 8, 16, 32]), + "init_value": 32, }, "warmup_ratio": { "domain": tune.uniform(lower=0.0, upper=0.3), + "init_value": 0.0, }, "weight_decay": { "domain": tune.uniform(lower=0.0, upper=0.3), + "init_value": 0.0, }, "adam_epsilon": { "domain": tune.loguniform(lower=1e-8, upper=1e-6), + "init_value": 1e-6, }, - "seed": {"domain": tune.choice(list(range(40, 45)))}, - "global_max_steps": {"domain": sys.maxsize}, + "seed": {"domain": tune.choice(list(range(40, 45))), "init_value": 42}, + "global_max_steps": {"domain": sys.maxsize, "init_value": sys.maxsize}, } def _init_hpo_args(self, automl_fit_kwargs: dict = None): @@ -342,8 +350,20 @@ class TransformersEstimator(BaseEstimator): return X def fit(self, X_train: DataFrame, y_train: Series, budget=None, **kwargs): - # TODO: when self.param = {}, ie max_iter = 1, fix the bug from transformers import EarlyStoppingCallback + from transformers.trainer_utils import set_seed + from transformers import AutoTokenizer, TrainingArguments + import transformers + from datasets import Dataset + from .nlp.utils import ( + get_num_labels, + separate_config, + load_model, + compute_checkpoint_freq, + get_trial_fold_name, + date_str, + ) + from .nlp.huggingface.trainer import TrainerForAuto this_params = self.params @@ -373,26 +393,17 @@ class TransformersEstimator(BaseEstimator): def on_epoch_end(self, args, state, control, **callback_kwargs): if ( control.should_training_stop - or state.epoch + 1 >= this_params["num_train_epochs"] + or state.epoch + 1 >= args.num_train_epochs ): control.should_save = True control.should_evaluate = True - import transformers - from transformers import TrainingArguments - from transformers.trainer_utils import set_seed - from transformers import AutoTokenizer - from .nlp.utils import ( - separate_config, - load_model, - get_num_labels, - compute_checkpoint_freq, - ) - from .nlp.huggingface.trainer import TrainerForAuto - from datasets import Dataset + set_seed(self.params.get("seed", TrainingArguments.seed)) self._init_hpo_args(kwargs) self._metric_name = kwargs["metric"] + if hasattr(self, "use_ray") is False: + self.use_ray = kwargs["use_ray"] X_val = kwargs.get("X_val") y_val = kwargs.get("y_val") @@ -408,7 +419,6 @@ class TransformersEstimator(BaseEstimator): tokenizer = AutoTokenizer.from_pretrained( self.custom_hpo_args.model_path, use_fast=True ) - set_seed(self.params["seed"]) num_labels = get_num_labels(self._task, y_train) @@ -422,13 +432,31 @@ class TransformersEstimator(BaseEstimator): ckpt_freq = compute_checkpoint_freq( train_data_size=len(X_train), custom_hpo_args=self.custom_hpo_args, - num_train_epochs=self.params["num_train_epochs"], - batch_size=self.params["per_device_train_batch_size"], + num_train_epochs=training_args_config.get( + "num_train_epochs", TrainingArguments.num_train_epochs + ), + batch_size=training_args_config.get( + "per_device_train_batch_size", + TrainingArguments.per_device_train_batch_size, + ), ) + local_dir = os.path.join( + self.custom_hpo_args.output_dir, "train_{}".format(date_str()) + ) + + if not self.use_ray: + # if self.params = {}, don't include configuration in trial fold name + trial_dir = get_trial_fold_name(local_dir, self.params, self.trial_id) + else: + import ray + + trial_dir = ray.tune.get_trial_dir() + if transformers.__version__.startswith("3"): training_args = TrainingArguments( - output_dir=self.custom_hpo_args.output_dir, + report_to=[], + output_dir=trial_dir, do_train=True, do_eval=True, eval_steps=ckpt_freq, @@ -443,7 +471,8 @@ class TransformersEstimator(BaseEstimator): from transformers import IntervalStrategy training_args = TrainingArguments( - output_dir=self.custom_hpo_args.output_dir, + report_to=[], + output_dir=trial_dir, do_train=True, do_eval=True, per_device_eval_batch_size=1, @@ -464,7 +493,7 @@ class TransformersEstimator(BaseEstimator): per_model_config=per_model_config, ) - trainer = TrainerForAuto( + self._model = TrainerForAuto( model=this_model, args=training_args, model_init=_model_init, @@ -475,25 +504,45 @@ class TransformersEstimator(BaseEstimator): callbacks=[EarlyStoppingCallbackForAuto], ) - trainer.train() + setattr(self._model, "_use_ray", self.use_ray) + self._model.train() - self.params[self.ITER_HP] = trainer.state.global_step - self._checkpoint_path = self._select_checkpoint(trainer) + self.params[self.ITER_HP] = self._model.state.global_step + self._checkpoint_path = self._select_checkpoint(self._model) self._kwargs = kwargs self._num_labels = num_labels self._per_model_config = per_model_config + self._ckpt_remains = list(self._model.ckpt_to_metric.keys()) + + def _delete_one_ckpt(self, ckpt_location): + if self.use_ray is False: + try: + shutil.rmtree(ckpt_location) + except FileNotFoundError: + logger.warning("checkpoint {} not found".format(ckpt_location)) + + def cleanup(self): + if hasattr(self, "_ckpt_remains"): + for each_ckpt in self._ckpt_remains: + self._delete_one_ckpt(each_ckpt) + def _select_checkpoint(self, trainer): + from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR + if trainer.ckpt_to_metric: best_ckpt, _ = min( - trainer.ckpt_to_metric.items(), key=lambda x: x[1][self._metric_name] + trainer.ckpt_to_metric.items(), key=lambda x: x[1]["val_loss"] ) best_ckpt_global_step = trainer.ckpt_to_global_step[best_ckpt] + for each_ckpt in list(trainer.ckpt_to_metric): + if each_ckpt != best_ckpt: + del trainer.ckpt_to_metric[each_ckpt] + del trainer.ckpt_to_global_step[each_ckpt] + self._delete_one_ckpt(each_ckpt) else: best_ckpt_global_step = trainer.state.global_step - from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR - best_ckpt = os.path.join( trainer.args.output_dir, f"{PREFIX_CHECKPOINT_DIR}-{best_ckpt_global_step}", @@ -506,6 +555,8 @@ class TransformersEstimator(BaseEstimator): def _compute_metrics_by_dataset_name(self, eval_pred): from .ml import sklearn_metric_loss_score from .data import SEQREGRESSION + import datasets + from .nlp.utils import load_default_huggingface_metric_for_task predictions, labels = eval_pred predictions = ( @@ -513,21 +564,39 @@ class TransformersEstimator(BaseEstimator): if self._task == SEQREGRESSION else np.argmax(predictions, axis=1) ) - return { - self._metric_name: sklearn_metric_loss_score( - metric_name=self._metric_name, y_predict=predictions, y_true=labels - ) - } - def predict(self, X_test): + if isinstance(self._metric_name, str): + return { + "val_loss": sklearn_metric_loss_score( + metric_name=self._metric_name, y_predict=predictions, y_true=labels + ) + } + else: + ( + default_metric_name, + default_metric_mode, + ) = load_default_huggingface_metric_for_task(self._task) + metric = datasets.load_metric(default_metric_name) + multiplier = -1 if default_metric_mode == "max" else 1 + return { + "val_loss": metric.compute(predictions=predictions, references=labels)[ + default_metric_name + ] + * multiplier + } + + def predict_proba(self, X_test): from datasets import Dataset - from .nlp.utils import load_model - from transformers import TrainingArguments from .nlp.huggingface.trainer import TrainerForAuto + from transformers import TrainingArguments + from .nlp.utils import load_model - if X_test.dtypes[0] == "string": - X_test = self._preprocess(X_test, self._task, **self._kwargs) - test_dataset = Dataset.from_pandas(X_test) + assert ( + self._task in CLASSIFICATION + ), "predict_proba is only available in classification tasks" + + X_test = self._preprocess(X_test, self._task, **self._kwargs) + test_dataset = Dataset.from_pandas(X_test) best_model = load_model( checkpoint_path=self._checkpoint_path, @@ -539,8 +608,31 @@ class TransformersEstimator(BaseEstimator): per_device_eval_batch_size=1, output_dir=self.custom_hpo_args.output_dir, ) - test_trainer = TrainerForAuto(model=best_model, args=training_args) - predictions = test_trainer.predict(test_dataset) + self._model = TrainerForAuto(model=best_model, args=training_args) + predictions = self._model.predict(test_dataset) + return predictions.predictions + + def predict(self, X_test): + from datasets import Dataset + from transformers import TrainingArguments + from .nlp.utils import load_model + from .nlp.huggingface.trainer import TrainerForAuto + + X_test = self._preprocess(X_test, self._task, **self._kwargs) + test_dataset = Dataset.from_pandas(X_test) + + best_model = load_model( + checkpoint_path=self._checkpoint_path, + task=self._task, + num_labels=self._num_labels, + per_model_config=self._per_model_config, + ) + training_args = TrainingArguments( + per_device_eval_batch_size=1, + output_dir=self.custom_hpo_args.output_dir, + ) + self._model = TrainerForAuto(model=best_model, args=training_args) + predictions = self._model.predict(test_dataset) return np.argmax(predictions.predictions, axis=1) @@ -621,6 +713,9 @@ class LGBMEstimator(BaseEstimator): params = config.copy() if "log_max_bin" in params: params["max_bin"] = (1 << params.pop("log_max_bin")) - 1 + params[TransformersEstimator.ITER_HP] = params.get( + TransformersEstimator.ITER_HP, sys.maxsize + ) return params @classmethod @@ -1212,8 +1307,6 @@ class CatBoostEstimator(BaseEstimator): self.estimator_class = CatBoostClassifier def fit(self, X_train, y_train, budget=None, **kwargs): - import shutil - start_time = time.time() deadline = start_time + budget if budget else np.inf train_dir = f"catboost_{str(start_time)}" diff --git a/flaml/nlp/__init__.py b/flaml/nlp/__init__.py deleted file mode 100644 index 9170ba321..000000000 --- a/flaml/nlp/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .utils import HPOArgs diff --git a/flaml/nlp/utils.py b/flaml/nlp/utils.py index 39fbc841d..8a02a4c43 100644 --- a/flaml/nlp/utils.py +++ b/flaml/nlp/utils.py @@ -1,20 +1,22 @@ import argparse from dataclasses import dataclass, field -from ..data import SEQCLASSIFICATION, SEQREGRESSION +from typing import Dict, Any -def _is_nlp_task(task): - if task in [SEQCLASSIFICATION, SEQREGRESSION]: - return True - else: - return False +def load_default_huggingface_metric_for_task(task): + from ..data import SEQCLASSIFICATION, SEQREGRESSION + + if task == SEQCLASSIFICATION: + return "accuracy", "max" + elif task == SEQREGRESSION: + return "rmse", "max" global tokenized_column_names def tokenize_text(X, task, custom_hpo_task): - from ..data import SEQCLASSIFICATION + from ..data import SEQCLASSIFICATION, SEQREGRESSION if task in (SEQCLASSIFICATION, SEQREGRESSION): return tokenize_text_seqclassification(X, custom_hpo_task) @@ -71,18 +73,81 @@ def separate_config(config): def get_num_labels(task, y_train): + from ..data import SEQCLASSIFICATION, SEQREGRESSION + if task == SEQREGRESSION: return 1 elif task == SEQCLASSIFICATION: return len(set(y_train)) +def _clean_value(value: Any) -> str: + if isinstance(value, float): + return "{:.5}".format(value) + else: + return str(value).replace("/", "_") + + +def format_vars(resolved_vars: Dict) -> str: + """Formats the resolved variable dict into a single string.""" + out = [] + for path, value in sorted(resolved_vars.items()): + if path[0] in ["run", "env", "resources_per_trial"]: + continue # TrialRunner already has these in the experiment_tag + pieces = [] + last_string = True + for k in path[::-1]: + if isinstance(k, int): + pieces.append(str(k)) + elif last_string: + last_string = False + pieces.append(k) + pieces.reverse() + out.append(_clean_value("_".join(pieces)) + "=" + _clean_value(value)) + return ",".join(out) + + +counter = 0 + + +def date_str(): + from datetime import datetime + + return datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + + +def _generate_dirname(experiment_tag, trial_id): + generated_dirname = f"train_{str(trial_id)}_{experiment_tag}" + generated_dirname = generated_dirname[:130] + generated_dirname += f"_{date_str()}" + return generated_dirname.replace("/", "_") + + +def get_logdir_name(dirname, local_dir): + import os + + local_dir = os.path.expanduser(local_dir) + logdir = os.path.join(local_dir, dirname) + return logdir + + +def get_trial_fold_name(local_dir, trial_config, trial_id): + global counter + counter = counter + 1 + experiment_tag = "{0}_{1}".format(str(counter), format_vars(trial_config)) + logdir = get_logdir_name( + _generate_dirname(experiment_tag, trial_id=trial_id), local_dir + ) + return logdir + + def load_model(checkpoint_path, task, num_labels, per_model_config=None): from transformers import AutoConfig from .huggingface.switch_head_auto import ( AutoSeqClassificationHead, MODEL_CLASSIFICATION_HEAD_MAPPING, ) + from ..data import SEQCLASSIFICATION, SEQREGRESSION this_model_type = AutoConfig.from_pretrained(checkpoint_path).model_type this_vocab_size = AutoConfig.from_pretrained(checkpoint_path).vocab_size diff --git a/test/nlp/test_autohf.py b/test/nlp/test_autohf.py index d734d10ac..e7436bcc2 100644 --- a/test/nlp/test_autohf.py +++ b/test/nlp/test_autohf.py @@ -4,10 +4,6 @@ import pytest @pytest.mark.skipif(os.name == "posix", reason="do not run on mac os") def test_hf_data(): - try: - import ray - except ImportError: - return from flaml import AutoML from datasets import load_dataset @@ -73,6 +69,9 @@ def test_hf_data(): ] ) + automl.predict_proba(X_test) + print(automl.classes_) + def _test_custom_data(): from flaml import AutoML @@ -122,3 +121,7 @@ def _test_custom_data(): ["test test", "test test"], ] ) + + +if __name__ == "__main__": + test_hf_data() diff --git a/test/nlp/test_autohf_maxiter1.py b/test/nlp/test_autohf_maxiter1.py new file mode 100644 index 000000000..0fe72b189 --- /dev/null +++ b/test/nlp/test_autohf_maxiter1.py @@ -0,0 +1,63 @@ +import os +import pytest + + +@pytest.mark.skipif(os.name == "posix", reason="do not run on mac os") +def test_max_iter_1(): + from flaml import AutoML + + from datasets import load_dataset + + train_dataset = load_dataset("glue", "mrpc", split="train").to_pandas().iloc[0:4] + dev_dataset = load_dataset("glue", "mrpc", split="train").to_pandas().iloc[0:4] + + custom_sent_keys = ["sentence1", "sentence2"] + label_key = "label" + + X_train = train_dataset[custom_sent_keys] + y_train = train_dataset[label_key] + + X_val = dev_dataset[custom_sent_keys] + y_val = dev_dataset[label_key] + + automl = AutoML() + + def toy_metric( + X_test, + y_test, + estimator, + labels, + X_train, + y_train, + weight_test=None, + weight_train=None, + config=None, + groups_test=None, + groups_train=None, + ): + return 0, { + "test_loss": 0, + "train_loss": 0, + "pred_time": 0, + } + + automl_settings = { + "gpu_per_trial": 0, + "max_iter": 1, + "time_budget": 5, + "task": "seq-classification", + "metric": toy_metric, + "log_file_name": "seqclass.log", + } + + automl_settings["custom_hpo_args"] = { + "model_path": "google/electra-small-discriminator", + "output_dir": "data/output/", + "ckpt_per_epoch": 5, + "fp16": False, + } + + automl.fit( + X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, **automl_settings + ) + del automl diff --git a/test/nlp/test_autohf_regression.py b/test/nlp/test_autohf_regression.py index b2c4cd6bf..afd24a410 100644 --- a/test/nlp/test_autohf_regression.py +++ b/test/nlp/test_autohf_regression.py @@ -4,6 +4,10 @@ import pytest @pytest.mark.skipif(os.name == "posix", reason="do not run on mac os") def test_regression(): + try: + import ray + except ImportError: + return from flaml import AutoML from datasets import load_dataset @@ -33,6 +37,7 @@ def test_regression(): "task": "seq-regression", "metric": "rmse", "starting_points": {"transformer": {"num_train_epochs": 1}}, + "use_ray": True, } automl_settings["custom_hpo_args"] = { @@ -45,7 +50,3 @@ def test_regression(): automl.fit( X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, **automl_settings ) - - -if __name__ == "main": - test_regression()