From c2b25310fc64367d116980729287d89c86e7ab1d Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Fri, 23 May 2025 10:19:31 +0800 Subject: [PATCH] Sync Fabric till 2cd1c3da (#1433) * Sync Fabric till 2cd1c3da * Remove synapseml from tag names * Fix 'NoneType' object has no attribute 'DataFrame' * Deprecated 3.8 support * Fix 'NoneType' object has no attribute 'DataFrame' * Still use python 3.8 for pydoc * Don't run tests in parallel * Remove autofe and lowcode --- .github/workflows/python-package.yml | 2 +- Dockerfile | 2 +- README.md | 2 +- flaml/automl/automl.py | 58 ++- flaml/automl/data.py | 337 +++++++++++++++- flaml/automl/logger.py | 34 +- flaml/fabric/mlflow.py | 441 +++++++++++++++++---- flaml/tune/logger.py | 37 ++ flaml/tune/spark/utils.py | 22 + flaml/tune/tune.py | 16 +- pytest.ini | 3 + setup.py | 3 +- test/automl/test_extra_models.py | 2 + test/automl/test_forecast.py | 5 +- test/automl/test_mlflow.py | 12 + test/nlp/test_autohf_classificationhead.py | 2 + test/nlp/test_autohf_cv.py | 2 + test/nlp/test_default.py | 4 + test/spark/test_0sparkml.py | 81 +++- test/spark/test_automl.py | 2 +- test/spark/test_ensemble.py | 3 + test/spark/test_exceptions.py | 2 +- test/spark/test_mlflow.py | 1 + test/spark/test_multiclass.py | 2 + test/spark/test_notebook.py | 2 +- test/spark/test_overtime.py | 2 +- test/spark/test_performance.py | 2 +- test/spark/test_tune.py | 2 +- test/spark/test_utils.py | 2 +- 29 files changed, 976 insertions(+), 109 deletions(-) create mode 100644 flaml/tune/logger.py create mode 100644 pytest.ini diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 507567f68..56009f8ab 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -30,7 +30,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-2019] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/Dockerfile b/Dockerfile index 35d4eed39..47a246fe5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # basic setup -FROM mcr.microsoft.com/devcontainers/python:3.8 +FROM mcr.microsoft.com/devcontainers/python:3.10 RUN apt-get update && apt-get -y update RUN apt-get install -y sudo git npm diff --git a/README.md b/README.md index 302880408..0998931dd 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ FLAML has a .NET implementation in [ML.NET](http://dot.net/ml), an open-source, ## Installation -FLAML requires **Python version >= 3.8**. It can be installed from pip: +FLAML requires **Python version >= 3.9**. It can be installed from pip: ```bash pip install flaml diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index 3233a1cff..0f85046d5 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -10,6 +10,7 @@ import os import random import sys import time +from concurrent.futures import as_completed from functools import partial from typing import Callable, List, Optional, Union @@ -187,7 +188,8 @@ class AutoML(BaseEstimator): mem_thres: A float of the memory size constraint in bytes. pred_time_limit: A float of the prediction latency constraint in seconds. It refers to the average prediction time per row in validation data. - train_time_limit: A float of the training time constraint in seconds. + train_time_limit: None or a float of the training time constraint in seconds for each trial. + Only valid for sequential search. verbose: int, default=3 | Controls the verbosity, higher means more messages. retrain_full: bool or str, default=True | whether to retrain the @@ -1334,7 +1336,8 @@ class AutoML(BaseEstimator): mem_thres: A float of the memory size constraint in bytes. pred_time_limit: A float of the prediction latency constraint in seconds. It refers to the average prediction time per row in validation data. - train_time_limit: None or a float of the training time constraint in seconds. + train_time_limit: None or a float of the training time constraint in seconds for each trial. + Only valid for sequential search. X_val: None or a numpy array or a pandas dataframe of validation data. y_val: None or a numpy array or a pandas series of validation labels. sample_weight_val: None or a numpy array of the sample weight of @@ -1625,6 +1628,13 @@ class AutoML(BaseEstimator): _ch.setFormatter(logger_formatter) logger.addHandler(_ch) + if model_history: + logger.warning( + "With `model_history` set to `True` by default, all intermediate models are retained in memory, " + "which may significantly increase memory usage and slow down training. " + "Consider setting `model_history=False` to optimize memory and accelerate the training process." + ) + if not use_ray and not use_spark and n_concurrent_trials > 1: if ray_available: logger.warning( @@ -2717,16 +2727,42 @@ class AutoML(BaseEstimator): ): if mlflow.active_run() is None: mlflow.start_run(run_id=self.mlflow_integration.parent_run_id) - self.mlflow_integration.log_model( - self._trained_estimator.model, - self.best_estimator, - signature=self.estimator_signature, - ) - self.mlflow_integration.pickle_and_log_automl_artifacts( - self, self.model, self.best_estimator, signature=self.pipeline_signature - ) + if self.best_estimator.endswith("_spark"): + self.mlflow_integration.log_model( + self._trained_estimator.model, + self.best_estimator, + signature=self.estimator_signature, + run_id=self.mlflow_integration.parent_run_id, + ) + else: + self.mlflow_integration.pickle_and_log_automl_artifacts( + self, + self.model, + self.best_estimator, + signature=self.pipeline_signature, + run_id=self.mlflow_integration.parent_run_id, + ) else: - logger.info("not retraining because the time budget is too small.") + logger.warning("not retraining because the time budget is too small.") + if self.mlflow_integration is not None: + logger.debug("Collecting results from submitted record_state tasks") + t1 = time.perf_counter() + for future in as_completed(self.mlflow_integration.futures): + _task = self.mlflow_integration.futures[future] + try: + result = future.result() + logger.debug(f"Result for record_state task {_task}: {result}") + except Exception as e: + logger.warning(f"Exception for record_state task {_task}: {e}") + for future in as_completed(self.mlflow_integration.futures_log_model): + _task = self.mlflow_integration.futures_log_model[future] + try: + result = future.result() + logger.debug(f"Result for log_model task {_task}: {result}") + except Exception as e: + logger.warning(f"Exception for log_model task {_task}: {e}") + t2 = time.perf_counter() + logger.debug(f"Collecting results from tasks submitted to executors costs {t2-t1} seconds.") def __del__(self): if ( diff --git a/flaml/automl/data.py b/flaml/automl/data.py index 747236dad..4c473963f 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -2,13 +2,17 @@ # * Copyright (c) Microsoft Corporation. All rights reserved. # * Licensed under the MIT License. See LICENSE file in the # * project root for license information. +import json import os -from datetime import datetime +import random +import uuid +from datetime import datetime, timedelta +from decimal import ROUND_HALF_UP, Decimal from typing import TYPE_CHECKING, Union import numpy as np -from flaml.automl.spark import DataFrame, Series, pd, ps, psDataFrame, psSeries +from flaml.automl.spark import DataFrame, F, Series, T, pd, ps, psDataFrame, psSeries from flaml.automl.training_log import training_log_reader try: @@ -19,6 +23,7 @@ except ImportError: if TYPE_CHECKING: from flaml.automl.task import Task + TS_TIMESTAMP_COL = "ds" TS_VALUE_COL = "y" @@ -445,3 +450,331 @@ class DataTransformer: def group_counts(groups): _, i, c = np.unique(groups, return_counts=True, return_index=True) return c[np.argsort(i)] + + +def get_random_dataframe(n_rows: int = 200, ratio_none: float = 0.1, seed: int = 42) -> DataFrame: + """Generate a random pandas DataFrame with various data types for testing. + This function creates a DataFrame with multiple column types including: + - Timestamps + - Integers + - Floats + - Categorical values + - Booleans + - Lists (tags) + - Decimal strings + - UUIDs + - Binary data (as hex strings) + - JSON blobs + - Nullable text fields + Parameters + ---------- + n_rows : int, default=200 + Number of rows in the generated DataFrame + ratio_none : float, default=0.1 + Probability of generating None values in applicable columns + seed : int, default=42 + Random seed for reproducibility + Returns + ------- + pd.DataFrame + A DataFrame with 14 columns of various data types + Examples + -------- + >>> df = get_random_dataframe(100, 0.05, 123) + >>> df.shape + (100, 14) + >>> df.dtypes + timestamp datetime64[ns] + id int64 + score float64 + status object + flag object + count object + value object + tags object + rating object + uuid object + binary object + json_blob object + category category + nullable_text object + dtype: object + """ + + np.random.seed(seed) + random.seed(seed) + + def random_tags(): + tags = ["AI", "ML", "data", "robotics", "vision"] + return random.sample(tags, k=random.randint(1, 3)) if random.random() > ratio_none else None + + def random_decimal(): + return ( + str(Decimal(random.uniform(1, 5)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) + if random.random() > ratio_none + else None + ) + + def random_json_blob(): + blob = {"a": random.randint(1, 10), "b": random.random()} + return json.dumps(blob) if random.random() > ratio_none else None + + def random_binary(): + return bytes(random.randint(0, 255) for _ in range(4)).hex() if random.random() > ratio_none else None + + data = { + "timestamp": [ + datetime(2020, 1, 1) + timedelta(days=np.random.randint(0, 1000)) if np.random.rand() > ratio_none else None + for _ in range(n_rows) + ], + "id": range(1, n_rows + 1), + "score": np.random.uniform(0, 100, n_rows), + "status": np.random.choice( + ["active", "inactive", "pending", None], + size=n_rows, + p=[(1 - ratio_none) / 3, (1 - ratio_none) / 3, (1 - ratio_none) / 3, ratio_none], + ), + "flag": np.random.choice( + [True, False, None], size=n_rows, p=[(1 - ratio_none) / 2, (1 - ratio_none) / 2, ratio_none] + ), + "count": [np.random.randint(0, 100) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "value": [round(np.random.normal(50, 15), 2) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "tags": [random_tags() for _ in range(n_rows)], + "rating": [random_decimal() for _ in range(n_rows)], + "uuid": [str(uuid.uuid4()) if np.random.rand() > ratio_none else None for _ in range(n_rows)], + "binary": [random_binary() for _ in range(n_rows)], + "json_blob": [random_json_blob() for _ in range(n_rows)], + "category": pd.Categorical( + np.random.choice( + ["A", "B", "C", None], + size=n_rows, + p=[(1 - ratio_none) / 3, (1 - ratio_none) / 3, (1 - ratio_none) / 3, ratio_none], + ) + ), + "nullable_text": [random.choice(["Good", "Bad", "Average", None]) for _ in range(n_rows)], + } + + return pd.DataFrame(data) + + +def auto_convert_dtypes_spark( + df: psDataFrame, + na_values: list = None, + category_threshold: float = 0.3, + convert_threshold: float = 0.6, + sample_ratio: float = 0.1, +) -> tuple[psDataFrame, dict]: + """Automatically convert data types in a PySpark DataFrame using heuristics. + + This function analyzes a sample of the DataFrame to infer appropriate data types + and applies the conversions. It handles timestamps, numeric values, booleans, + and categorical fields. + + Args: + df: A PySpark DataFrame to convert. + na_values: List of strings to be considered as NA/NaN. Defaults to + ['NA', 'na', 'NULL', 'null', '']. + category_threshold: Maximum ratio of unique values to total values + to consider a column categorical. Defaults to 0.3. + convert_threshold: Minimum ratio of successfully converted values required + to apply a type conversion. Defaults to 0.6. + sample_ratio: Fraction of data to sample for type inference. Defaults to 0.1. + + Returns: + tuple: (The DataFrame with converted types, A dictionary mapping column names to + their inferred types as strings) + + Note: + - 'category' in the schema dict is conceptual as PySpark doesn't have a true + category type like pandas + - The function uses sampling for efficiency with large datasets + """ + n_rows = df.count() + if na_values is None: + na_values = ["NA", "na", "NULL", "null", ""] + + # Normalize NA-like values + for colname, coltype in df.dtypes: + if coltype == "string": + df = df.withColumn( + colname, + F.when(F.trim(F.lower(F.col(colname))).isin([v.lower() for v in na_values]), None).otherwise( + F.col(colname) + ), + ) + + schema = {} + for colname in df.columns: + # Sample once at an appropriate ratio + sample_ratio_to_use = min(1.0, sample_ratio if n_rows * sample_ratio > 100 else 100 / n_rows) + col_sample = df.select(colname).sample(withReplacement=False, fraction=sample_ratio_to_use).dropna() + sample_count = col_sample.count() + + inferred_type = "string" # Default + + if col_sample.dtypes[0][1] != "string": + schema[colname] = col_sample.dtypes[0][1] + continue + + if sample_count == 0: + schema[colname] = "string" + continue + + # Check if timestamp + ts_col = col_sample.withColumn("parsed", F.to_timestamp(F.col(colname))) + + # Check numeric + if ( + col_sample.withColumn("n", F.col(colname).cast("double")).filter("n is not null").count() + >= sample_count * convert_threshold + ): + # All whole numbers? + all_whole = ( + col_sample.withColumn("n", F.col(colname).cast("double")) + .filter("n is not null") + .withColumn("frac", F.abs(F.col("n") % 1)) + .filter("frac > 0.000001") + .count() + == 0 + ) + inferred_type = "int" if all_whole else "double" + + # Check low-cardinality (category-like) + elif ( + sample_count > 0 + and col_sample.select(F.countDistinct(F.col(colname))).collect()[0][0] / sample_count <= category_threshold + ): + inferred_type = "category" # Will just be string, but marked as such + + # Check if timestamp + elif ts_col.filter(F.col("parsed").isNotNull()).count() >= sample_count * convert_threshold: + inferred_type = "timestamp" + + schema[colname] = inferred_type + + # Apply inferred schema + for colname, inferred_type in schema.items(): + if inferred_type == "int": + df = df.withColumn(colname, F.col(colname).cast(T.IntegerType())) + elif inferred_type == "double": + df = df.withColumn(colname, F.col(colname).cast(T.DoubleType())) + elif inferred_type == "boolean": + df = df.withColumn( + colname, + F.when(F.lower(F.col(colname)).isin("true", "yes", "1"), True) + .when(F.lower(F.col(colname)).isin("false", "no", "0"), False) + .otherwise(None), + ) + elif inferred_type == "timestamp": + df = df.withColumn(colname, F.to_timestamp(F.col(colname))) + elif inferred_type == "category": + df = df.withColumn(colname, F.col(colname).cast(T.StringType())) # Marked conceptually + + # otherwise keep as string (or original type) + + return df, schema + + +def auto_convert_dtypes_pandas( + df: DataFrame, + na_values: list = None, + category_threshold: float = 0.3, + convert_threshold: float = 0.6, + sample_ratio: float = 1.0, +) -> tuple[DataFrame, dict]: + """Automatically convert data types in a pandas DataFrame using heuristics. + + This function analyzes the DataFrame to infer appropriate data types + and applies the conversions. It handles timestamps, timedeltas, numeric values, + and categorical fields. + + Args: + df: A pandas DataFrame to convert. + na_values: List of strings to be considered as NA/NaN. Defaults to + ['NA', 'na', 'NULL', 'null', '']. + category_threshold: Maximum ratio of unique values to total values + to consider a column categorical. Defaults to 0.3. + convert_threshold: Minimum ratio of successfully converted values required + to apply a type conversion. Defaults to 0.6. + sample_ratio: Fraction of data to sample for type inference. Not used in pandas version + but included for API compatibility. Defaults to 1.0. + + Returns: + tuple: (The DataFrame with converted types, A dictionary mapping column names to + their inferred types as strings) + """ + if na_values is None: + na_values = {"NA", "na", "NULL", "null", ""} + + df_converted = df.convert_dtypes() + schema = {} + + # Sample if needed (for API compatibility) + if sample_ratio < 1.0: + df = df.sample(frac=sample_ratio) + + n_rows = len(df) + + for col in df.columns: + series = df[col] + # Replace NA-like values if string + series_cleaned = series.map(lambda x: np.nan if isinstance(x, str) and x.strip() in na_values else x) + + # Skip conversion if already non-object data type, except bool which can potentially be categorical + if ( + not isinstance(series_cleaned.dtype, pd.BooleanDtype) + and not isinstance(series_cleaned.dtype, pd.StringDtype) + and series_cleaned.dtype != "object" + ): + # Keep the original data type for non-object dtypes + df_converted[col] = series + schema[col] = str(series_cleaned.dtype) + continue + + # print(f"type: {series_cleaned.dtype}, column: {series_cleaned.name}") + + if not isinstance(series_cleaned.dtype, pd.BooleanDtype): + # Try numeric (int or float) + numeric = pd.to_numeric(series_cleaned, errors="coerce") + if numeric.notna().sum() >= n_rows * convert_threshold: + if (numeric.dropna() % 1 == 0).all(): + try: + df_converted[col] = numeric.astype("int") # Nullable integer + schema[col] = "int" + continue + except Exception: + pass + df_converted[col] = numeric.astype("double") + schema[col] = "double" + continue + + # Try datetime + datetime_converted = pd.to_datetime(series_cleaned, errors="coerce") + if datetime_converted.notna().sum() >= n_rows * convert_threshold: + df_converted[col] = datetime_converted + schema[col] = "timestamp" + continue + + # Try timedelta + try: + timedelta_converted = pd.to_timedelta(series_cleaned, errors="coerce") + if timedelta_converted.notna().sum() >= n_rows * convert_threshold: + df_converted[col] = timedelta_converted + schema[col] = "timedelta" + continue + except TypeError: + pass + + # Try category + try: + unique_ratio = series_cleaned.nunique(dropna=True) / n_rows if n_rows > 0 else 1.0 + if unique_ratio <= category_threshold: + df_converted[col] = series_cleaned.astype("category") + schema[col] = "category" + continue + except Exception: + pass + df_converted[col] = series_cleaned.astype("string") + schema[col] = "string" + + return df_converted, schema diff --git a/flaml/automl/logger.py b/flaml/automl/logger.py index 1085b5aae..959337b25 100644 --- a/flaml/automl/logger.py +++ b/flaml/automl/logger.py @@ -1,7 +1,37 @@ import logging +import os + + +class ColoredFormatter(logging.Formatter): + # ANSI escape codes for colors + COLORS = { + # logging.DEBUG: "\033[36m", # Cyan + # logging.INFO: "\033[32m", # Green + logging.WARNING: "\033[33m", # Yellow + logging.ERROR: "\033[31m", # Red + logging.CRITICAL: "\033[1;31m", # Bright Red + } + RESET = "\033[0m" # Reset to default + + def __init__(self, fmt, datefmt, use_color=True): + super().__init__(fmt, datefmt) + self.use_color = use_color + + def format(self, record): + formatted = super().format(record) + if self.use_color: + color = self.COLORS.get(record.levelno, "") + if color: + return f"{color}{formatted}{self.RESET}" + return formatted + logger = logging.getLogger(__name__) -logger_formatter = logging.Formatter( - "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" +use_color = True +if os.getenv("FLAML_LOG_NO_COLOR"): + use_color = False + +logger_formatter = ColoredFormatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S", use_color ) logger.propagate = False diff --git a/flaml/fabric/mlflow.py b/flaml/fabric/mlflow.py index 442c3709b..106fa1973 100644 --- a/flaml/fabric/mlflow.py +++ b/flaml/fabric/mlflow.py @@ -1,10 +1,14 @@ +import atexit +import functools import json +import logging import os import pickle import random -import sys import tempfile import time +import warnings +from concurrent.futures import ThreadPoolExecutor, wait from typing import MutableMapping import mlflow @@ -12,14 +16,15 @@ import pandas as pd from mlflow.entities import Metric, Param, RunTag from mlflow.exceptions import MlflowException from mlflow.utils.autologging_utils import AUTOLOGGING_INTEGRATIONS, autologging_is_disabled +from packaging.requirements import Requirement from scipy.sparse import issparse from sklearn import tree try: - from pyspark.ml import Pipeline as SparkPipeline + from pyspark.ml import PipelineModel as SparkPipelineModel except ImportError: - class SparkPipeline: + class SparkPipelineModel: pass @@ -32,6 +37,84 @@ from flaml.version import __version__ SEARCH_MAX_RESULTS = 5000 # Each train should not have more than 5000 trials IS_RENAME_CHILD_RUN = os.environ.get("FLAML_IS_RENAME_CHILD_RUN", "false").lower() == "true" +REMOVE_REQUIREMENT_LIST = [ + "synapseml-cognitive", + "synapseml-core", + "synapseml-deep-learning", + "synapseml-internal", + "synapseml-mlflow", + "synapseml-opencv", + "synapseml-vw", + "synapseml-lightgbm", + "synapseml-utils", + "nni", + "optuna", +] +OPTIONAL_REMOVE_REQUIREMENT_LIST = ["pytorch-lightning", "transformers"] + +os.environ["MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR"] = os.environ.get("MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR", "false") + +MLFLOW_NUM_WORKERS = int(os.environ.get("FLAML_MLFLOW_NUM_WORKERS", os.cpu_count() * 4 if os.cpu_count() else 2)) +executor = ThreadPoolExecutor(max_workers=MLFLOW_NUM_WORKERS) +atexit.register(lambda: executor.shutdown(wait=True)) + +IS_CLEAN_LOGS = os.environ.get("FLAML_IS_CLEAN_LOGS", "1") +if IS_CLEAN_LOGS == "1": + logging.getLogger("synapse.ml").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.utils").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.utils.environment").setLevel(logging.CRITICAL) + logging.getLogger("mlflow.models.model").setLevel(logging.CRITICAL) + warnings.simplefilter("ignore", category=FutureWarning) + warnings.simplefilter("ignore", category=UserWarning) + + +def convert_requirement(requirement_list: list[str]): + ret = ( + [Requirement(s.strip().lower()) for s in requirement_list] + if mlflow.__version__ <= "2.17.0" + else requirement_list + ) + return ret + + +def time_it(func_or_code=None): + """ + Decorator or function that measures execution time. + + Can be used in three ways: + 1. As a decorator with no arguments: @time_it + 2. As a decorator with arguments: @time_it() + 3. As a function call with a string of code to execute and time: time_it("some_code()") + + Args: + func_or_code (callable or str, optional): Either a function to decorate or + a string of code to execute and time. + + Returns: + callable or None: Returns a decorated function if used as a decorator, + or None if used to execute a string of code. + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + logger.debug(f"Execution of {func.__name__} took {end_time - start_time:.4f} seconds") + return result + + return wrapper + + if callable(func_or_code): + return decorator(func_or_code) + elif func_or_code is None: + return decorator + else: + start_time = time.time() + exec(func_or_code) + end_time = time.time() + logger.debug(f"Execution\n```\n{func_or_code}\n```\ntook {end_time - start_time:.4f} seconds") def flatten_dict(d: MutableMapping, sep: str = ".") -> MutableMapping: @@ -49,23 +132,28 @@ def is_autolog_enabled(): return not all(autologging_is_disabled(k) for k in AUTOLOGGING_INTEGRATIONS.keys()) -def get_mlflow_log_latency(model_history=False): +def get_mlflow_log_latency(model_history=False, delete_run=True): + try: + FLAML_MLFLOW_LOG_LATENCY = float(os.getenv("FLAML_MLFLOW_LOG_LATENCY", 0)) + except ValueError: + FLAML_MLFLOW_LOG_LATENCY = 0 + if FLAML_MLFLOW_LOG_LATENCY >= 0.1: + return FLAML_MLFLOW_LOG_LATENCY st = time.time() with mlflow.start_run(nested=True, run_name="get_mlflow_log_latency") as run: if model_history: sk_model = tree.DecisionTreeClassifier() - mlflow.sklearn.log_model(sk_model, "sk_models") - mlflow.sklearn.log_model(Pipeline([("estimator", sk_model)]), "sk_pipeline") + mlflow.sklearn.log_model(sk_model, "model") with tempfile.TemporaryDirectory() as tmpdir: - pickle_fpath = os.path.join(tmpdir, f"tmp_{int(time.time()*1000)}") + pickle_fpath = os.path.join(tmpdir, f"tmp_{int(time.time() * 1000)}") with open(pickle_fpath, "wb") as f: pickle.dump(sk_model, f) - mlflow.log_artifact(pickle_fpath, "sk_model1") - mlflow.log_artifact(pickle_fpath, "sk_model2") + mlflow.log_artifact(pickle_fpath, "sk_model") mlflow.set_tag("synapseml.ui.visible", "false") # not shown inline in fabric - mlflow.delete_run(run.info.run_id) + if delete_run: + mlflow.delete_run(run.info.run_id) et = time.time() - return et - st + return 3 * (et - st) def infer_signature(X_train=None, y_train=None, dataframe=None, label=None): @@ -98,12 +186,76 @@ def infer_signature(X_train=None, y_train=None, dataframe=None, label=None): ) +def update_and_install_requirements( + run_id=None, + model_name=None, + model_version=None, + remove_list=None, + artifact_path="model", + dst_path=None, + install_with_ipython=False, +): + if not (run_id or (model_name and model_version)): + raise ValueError( + "Please provide `run_id` or both `model_name` and `model_version`. If all three are provided, `run_id` will be used." + ) + + if install_with_ipython: + from IPython import get_ipython + + if not remove_list: + remove_list = [ + "synapseml-cognitive", + "synapseml-core", + "synapseml-deep-learning", + "synapseml-internal", + "synapseml-mlflow", + "synapseml-opencv", + "synapseml-vw", + "synapseml-lightgbm", + "synapseml-utils", + "flaml", # flaml is needed for AutoML models, should be pre-installed in the runtime + "pyspark", # fabric internal pyspark should be pre-installed in the runtime + ] + + # Download model artifacts + client = mlflow.MlflowClient() + if not run_id: + run_id = client.get_model_version(model_name, model_version).run_id + if not dst_path: + dst_path = os.path.join(tempfile.gettempdir(), "model_artifacts") + os.makedirs(dst_path, exist_ok=True) + client.download_artifacts(run_id, artifact_path, dst_path) + requirements_path = os.path.join(dst_path, artifact_path, "requirements.txt") + with open(requirements_path) as f: + reqs = f.read().splitlines() + old_reqs = [Requirement(req) for req in reqs if req] + old_reqs_dict = {req.name: str(req) for req in old_reqs} + for req in remove_list: + req = Requirement(req) + if req.name in old_reqs_dict: + old_reqs_dict.pop(req.name, None) + new_reqs_list = list(old_reqs_dict.values()) + + with open(requirements_path, "w") as f: + f.write("\n".join(new_reqs_list)) + + if install_with_ipython: + get_ipython().run_line_magic("pip", f"install -r {requirements_path} -q") + else: + logger.info(f"You can run `pip install -r {requirements_path}` to install dependencies.") + return requirements_path + + def _mlflow_wrapper(evaluation_func, mlflow_exp_id, mlflow_config=None, extra_tags=None, autolog=False): def wrapped(*args, **kwargs): if mlflow_config is not None: - from synapse.ml.mlflow import set_mlflow_env_config + try: + from synapse.ml.mlflow import set_mlflow_env_config - set_mlflow_env_config(mlflow_config) + set_mlflow_env_config(mlflow_config) + except Exception: + pass import mlflow if mlflow_exp_id is not None: @@ -124,7 +276,20 @@ def _mlflow_wrapper(evaluation_func, mlflow_exp_id, mlflow_config=None, extra_ta def _get_notebook_name(): - return None + try: + import re + + from synapse.ml.mlflow import get_mlflow_env_config + from synapse.ml.mlflow.shared_platform_utils import get_artifact + + notebook_id = get_mlflow_env_config(False).artifact_id + current_notebook = get_artifact(notebook_id) + notebook_name = re.sub("\\W+", "-", current_notebook.displayName).strip() + + return notebook_name + except Exception as e: + logger.debug(f"Failed to get notebook name: {e}") + return None def safe_json_dumps(obj): @@ -163,6 +328,8 @@ class MLflowIntegration: self.has_model = False self.only_history = False self._do_log_model = True + self.futures = {} + self.futures_log_model = {} self.extra_tag = ( extra_tag @@ -170,6 +337,9 @@ class MLflowIntegration: else {"extra_tag.sid": f"flaml_{__version__}_{int(time.time())}_{random.randint(1001, 9999)}"} ) self.start_time = time.time() + self.experiment_type = experiment_type + self.update_autolog_state() + self.mlflow_client = mlflow.tracking.MlflowClient() parent_run_info = mlflow.active_run().info if mlflow.active_run() is not None else None if parent_run_info: @@ -188,8 +358,6 @@ class MLflowIntegration: mlflow.set_experiment(experiment_name=mlflow_exp_name) self.experiment_id = mlflow.tracking.fluent._active_experiment_id self.experiment_name = mlflow.get_experiment(self.experiment_id).name - self.experiment_type = experiment_type - self.update_autolog_state() if self.autolog: # only end user created parent run in autolog scenario @@ -197,9 +365,12 @@ class MLflowIntegration: def set_mlflow_config(self): if self.driver_mlflow_env_config is not None: - from synapse.ml.mlflow import set_mlflow_env_config + try: + from synapse.ml.mlflow import set_mlflow_env_config - set_mlflow_env_config(self.driver_mlflow_env_config) + set_mlflow_env_config(self.driver_mlflow_env_config) + except Exception: + pass def wrap_evaluation_function(self, evaluation_function): wrapped_evaluation_function = _mlflow_wrapper( @@ -267,6 +438,7 @@ class MLflowIntegration: else: _tags = [] self.mlflow_client.log_batch(run_id=target_id, metrics=_metrics, params=[], tags=_tags) + return f"Successfully copy_mlflow_run run_id {src_id} to run_id {target_id}" def record_trial(self, result, trial, metric): if isinstance(result, dict): @@ -334,12 +506,26 @@ class MLflowIntegration: self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id) self.has_summary = True - def log_model(self, model, estimator, signature=None): + def log_model(self, model, estimator, signature=None, run_id=None): if not self._do_log_model: return logger.debug(f"logging model {estimator}") + ret_message = f"Successfully log_model {estimator} to run_id {run_id}" + optional_remove_list = ( + [] if estimator in ["transformer", "transformer_ms", "tcn", "tft"] else OPTIONAL_REMOVE_REQUIREMENT_LIST + ) + run = mlflow.active_run() + if run and run.info.run_id == self.parent_run_id: + mlflow.start_run(run_id=run_id, nested=True) + elif run and run.info.run_id != run_id: + ret_message = ( + f"Error: Should log_model {estimator} to run_id {run_id}, but logged to run_id {run.info.run_id}" + ) + logger.error(ret_message) + else: + mlflow.start_run(run_id=run_id) if estimator.endswith("_spark"): - mlflow.spark.log_model(model, estimator, signature=signature) + # mlflow.spark.log_model(model, estimator, signature=signature) mlflow.spark.log_model(model, "model", signature=signature) elif estimator in ["lgbm"]: mlflow.lightgbm.log_model(model, estimator, signature=signature) @@ -352,42 +538,84 @@ class MLflowIntegration: elif estimator in ["prophet"]: mlflow.prophet.log_model(model, estimator, signature=signature) elif estimator in ["orbit"]: - pass + logger.warning(f"Unsupported model: {estimator}. No model logged.") else: mlflow.sklearn.log_model(model, estimator, signature=signature) + future = executor.submit( + lambda: mlflow.models.model.update_model_requirements( + model_uri=f"runs:/{run_id}/{'model' if estimator.endswith('_spark') else estimator}", + operation="remove", + requirement_list=convert_requirement(REMOVE_REQUIREMENT_LIST + optional_remove_list), + ) + ) + self.futures[future] = f"run_{run_id}_requirements_updated" + if not run or run.info.run_id == self.parent_run_id: + mlflow.end_run() + return ret_message - def _pickle_and_log_artifact(self, obj, artifact_name, pickle_fname="temp_.pkl"): + def _pickle_and_log_artifact(self, obj, artifact_name, pickle_fname="temp_.pkl", run_id=None): if not self._do_log_model: - return + return True with tempfile.TemporaryDirectory() as tmpdir: pickle_fpath = os.path.join(tmpdir, pickle_fname) try: with open(pickle_fpath, "wb") as f: pickle.dump(obj, f) - mlflow.log_artifact(pickle_fpath, artifact_name) + mlflow.log_artifact(pickle_fpath, artifact_name, run_id) + return True except Exception as e: - logger.debug(f"Failed to pickle and log artifact {artifact_name}, error: {e}") + logger.debug(f"Failed to pickle and log {artifact_name}, error: {e}") + return False - def pickle_and_log_automl_artifacts(self, automl, model, estimator, signature=None): + def _log_pipeline(self, pipeline, flavor_name, pipeline_name, signature, run_id, estimator=None): + logger.debug(f"logging pipeline {flavor_name}:{pipeline_name}:{estimator}") + ret_message = f"Successfully _log_pipeline {flavor_name}:{pipeline_name}:{estimator} to run_id {run_id}" + optional_remove_list = ( + [] if estimator in ["transformer", "transformer_ms", "tcn", "tft"] else OPTIONAL_REMOVE_REQUIREMENT_LIST + ) + run = mlflow.active_run() + if run and run.info.run_id == self.parent_run_id: + mlflow.start_run(run_id=run_id, nested=True) + elif run and run.info.run_id != run_id: + ret_message = f"Error: Should _log_pipeline {flavor_name}:{pipeline_name}:{estimator} model to run_id {run_id}, but logged to run_id {run.info.run_id}" + logger.error(ret_message) + else: + mlflow.start_run(run_id=run_id) + if flavor_name == "sklearn": + mlflow.sklearn.log_model(pipeline, pipeline_name, signature=signature) + elif flavor_name == "spark": + mlflow.spark.log_model(pipeline, pipeline_name, signature=signature) + else: + logger.warning(f"Unsupported pipeline flavor: {flavor_name}. No model logged.") + future = executor.submit( + lambda: mlflow.models.model.update_model_requirements( + model_uri=f"runs:/{run_id}/{pipeline_name}", + operation="remove", + requirement_list=convert_requirement(REMOVE_REQUIREMENT_LIST + optional_remove_list), + ) + ) + self.futures[future] = f"run_{run_id}_requirements_updated" + if not run or run.info.run_id == self.parent_run_id: + mlflow.end_run() + return ret_message + + def pickle_and_log_automl_artifacts(self, automl, model, estimator, signature=None, run_id=None): """log automl artifacts to mlflow load back with `automl = mlflow.pyfunc.load_model(model_run_id_or_uri)`, then do prediction with `automl.predict(X)` """ - logger.debug(f"logging automl artifacts {estimator}") - self._pickle_and_log_artifact(automl.feature_transformer, "feature_transformer", "feature_transformer.pkl") - self._pickle_and_log_artifact(automl.label_transformer, "label_transformer", "label_transformer.pkl") - # Test test_mlflow 1 and 4 will get error: TypeError: cannot pickle '_io.TextIOWrapper' object - # try: - # self._pickle_and_log_artifact(automl, "automl", "automl.pkl") - # except TypeError: - # pass + logger.debug(f"logging automl estimator {estimator}") + # self._pickle_and_log_artifact( + # automl.feature_transformer, "feature_transformer", "feature_transformer.pkl", run_id + # ) + # self._pickle_and_log_artifact(automl.label_transformer, "label_transformer", "label_transformer.pkl", run_id) if estimator.endswith("_spark"): # spark pipeline is not supported yet return feature_transformer = automl.feature_transformer - if isinstance(feature_transformer, Pipeline): + if isinstance(feature_transformer, Pipeline) and not estimator.endswith("_spark"): pipeline = feature_transformer pipeline.steps.append(("estimator", model)) - elif isinstance(feature_transformer, SparkPipeline): + elif isinstance(feature_transformer, SparkPipelineModel) and estimator.endswith("_spark"): pipeline = feature_transformer pipeline.stages.append(model) elif not estimator.endswith("_spark"): @@ -395,24 +623,26 @@ class MLflowIntegration: steps.append(("estimator", model)) pipeline = Pipeline(steps) else: - stages = [feature_transformer] + stages = [] + if feature_transformer is not None: + stages.append(feature_transformer) stages.append(model) - pipeline = SparkPipeline(stages=stages) - if isinstance(pipeline, SparkPipeline): + pipeline = SparkPipelineModel(stages=stages) + if isinstance(pipeline, SparkPipelineModel): logger.debug(f"logging spark pipeline {estimator}") - mlflow.spark.log_model(pipeline, "automl_pipeline", signature=signature) + self._log_pipeline(pipeline, "spark", "model", signature, run_id, estimator) else: # Add a log named "model" to fit default settings logger.debug(f"logging sklearn pipeline {estimator}") - mlflow.sklearn.log_model(pipeline, "automl_pipeline", signature=signature) - mlflow.sklearn.log_model(pipeline, "model", signature=signature) + self._log_pipeline(pipeline, "sklearn", "model", signature, run_id, estimator) + return f"Successfully pickle_and_log_automl_artifacts {estimator} to run_id {run_id}" + @time_it def record_state(self, automl, search_state, estimator): _st = time.time() automl_metric_name = ( automl._state.metric if isinstance(automl._state.metric, str) else automl._state.error_metric ) - if automl._state.error_metric.startswith("1-"): automl_metric_value = 1 - search_state.val_loss elif automl._state.error_metric.startswith("-"): @@ -425,6 +655,8 @@ class MLflowIntegration: else: config = search_state.config + self.automl_user_configurations = safe_json_dumps(automl._automl_user_configurations) + info = { "metrics": { "iter_counter": automl._track_iter, @@ -445,7 +677,7 @@ class MLflowIntegration: "flaml.meric": automl_metric_name, "flaml.run_source": "flaml-automl", "flaml.log_type": self.log_type, - "flaml.automl_user_configurations": safe_json_dumps(automl._automl_user_configurations), + "flaml.automl_user_configurations": self.automl_user_configurations, }, "params": { "sample_size": search_state.sample_size, @@ -472,37 +704,70 @@ class MLflowIntegration: run_name = f"{self.parent_run_name}_child_{self.child_counter}" else: run_name = None + _t1 = time.time() + wait(self.futures_log_model) + _t2 = time.time() - _t1 + logger.debug(f"wait futures_log_model in record_state took {_t2} seconds") with mlflow.start_run(nested=True, run_name=run_name) as child_run: - self._log_info_to_run(info, child_run.info.run_id, log_params=True) + future = executor.submit(lambda: self._log_info_to_run(info, child_run.info.run_id, log_params=True)) + self.futures[future] = f"iter_{automl._track_iter}_log_info_to_run" + future = executor.submit(lambda: self._log_automl_configurations(child_run.info.run_id)) + self.futures[future] = f"iter_{automl._track_iter}_log_automl_configurations" if automl._state.model_history: - self.log_model( - search_state.trained_estimator._model, estimator, signature=automl.estimator_signature - ) - self.pickle_and_log_automl_artifacts( - automl, search_state.trained_estimator, estimator, signature=automl.pipeline_signature - ) + if estimator.endswith("_spark"): + future = executor.submit( + lambda: self.log_model( + search_state.trained_estimator._model, + estimator, + automl.estimator_signature, + child_run.info.run_id, + ) + ) + self.futures_log_model[future] = f"record_state-log_model_{estimator}" + else: + future = executor.submit( + lambda: self.pickle_and_log_automl_artifacts( + automl, + search_state.trained_estimator, + estimator, + automl.pipeline_signature, + child_run.info.run_id, + ) + ) + self.futures_log_model[future] = f"record_state-pickle_and_log_automl_artifacts_{estimator}" self.manual_run_ids.append(child_run.info.run_id) self.child_counter += 1 + return f"Successfully record_state iteration {automl._track_iter}" + @time_it def log_automl(self, automl): self.set_best_iter(automl) if self.autolog: if self.parent_run_id is not None: mlflow.start_run(run_id=self.parent_run_id, experiment_id=self.experiment_id) - mlflow.log_metric("best_validation_loss", automl._state.best_loss) - mlflow.log_metric("best_iteration", automl._best_iteration) - mlflow.log_metric("num_child_runs", len(self.infos)) + mlflow.log_metrics( + { + "best_validation_loss": automl._state.best_loss, + "best_iteration": automl._best_iteration, + "num_child_runs": len(self.infos), + } + ) if ( automl._trained_estimator is not None and not self.has_model and automl._trained_estimator._model is not None ): - self.log_model( - automl._trained_estimator._model, automl.best_estimator, signature=automl.estimator_signature - ) - self.pickle_and_log_automl_artifacts( - automl, automl.model, automl.best_estimator, signature=automl.pipeline_signature - ) + if automl.best_estimator.endswith("_spark"): + self.log_model( + automl._trained_estimator._model, + automl.best_estimator, + automl.estimator_signature, + self.parent_run_id, + ) + else: + self.pickle_and_log_automl_artifacts( + automl, automl.model, automl.best_estimator, automl.pipeline_signature, self.parent_run_id + ) self.has_model = True self.adopt_children(automl) @@ -519,34 +784,65 @@ class MLflowIntegration: if "ml" in conf.keys(): conf = conf["ml"] - mlflow.log_params(conf) - mlflow.log_param("best_learner", automl._best_estimator) + mlflow.log_params({**conf, "best_learner": automl._best_estimator}, run_id=self.parent_run_id) if not self.has_summary: logger.info(f"logging best model {automl.best_estimator}") - self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id) + future = executor.submit(lambda: self.copy_mlflow_run(best_mlflow_run_id, self.parent_run_id)) + self.futures[future] = "log_automl_copy_mlflow_run" + future = executor.submit(lambda: self._log_automl_configurations(self.parent_run_id)) + self.futures[future] = "log_automl_log_automl_configurations" self.has_summary = True + _t1 = time.time() + wait(self.futures_log_model) + _t2 = time.time() - _t1 + logger.debug(f"wait futures_log_model in log_automl took {_t2} seconds") if ( automl._trained_estimator is not None and not self.has_model and automl._trained_estimator._model is not None ): - self.log_model( - automl._trained_estimator._model, - automl.best_estimator, - signature=automl.estimator_signature, - ) - self.pickle_and_log_automl_artifacts( - automl, automl.model, automl.best_estimator, signature=automl.pipeline_signature - ) + if automl.best_estimator.endswith("_spark"): + future = executor.submit( + lambda: self.log_model( + automl._trained_estimator._model, + automl.best_estimator, + signature=automl.estimator_signature, + run_id=self.parent_run_id, + ) + ) + self.futures_log_model[future] = f"log_automl-log_model_{automl.best_estimator}" + else: + future = executor.submit( + lambda: self.pickle_and_log_automl_artifacts( + automl, + automl.model, + automl.best_estimator, + signature=automl.pipeline_signature, + run_id=self.parent_run_id, + ) + ) + self.futures_log_model[ + future + ] = f"log_automl-pickle_and_log_automl_artifacts_{automl.best_estimator}" self.has_model = True def resume_mlflow(self): if len(self.resume_params) > 0: mlflow.autolog(**self.resume_params) + def _log_automl_configurations(self, run_id): + self.mlflow_client.log_text( + run_id=run_id, + text=self.automl_user_configurations, + artifact_file="automl_configurations/automl_user_configurations.json", + ) + return f"Successfully _log_automl_configurations to run_id {run_id}" + def _log_info_to_run(self, info, run_id, log_params=False): _metrics = [Metric(key, value, int(time.time() * 1000), 0) for key, value in info["metrics"].items()] - _tags = [RunTag(key, str(value)) for key, value in info["tags"].items()] + _tags = [ + RunTag(key, str(value)[:5000]) for key, value in info["tags"].items() + ] # AML will raise error if value length > 5000 _params = [ Param(key, str(value)) for key, value in info["params"].items() @@ -562,6 +858,7 @@ class MLflowIntegration: _tags = [RunTag("mlflow.parentRunId", run_id)] self.mlflow_client.log_batch(run_id=run.info.run_id, metrics=_metrics, params=[], tags=_tags) del info["submetrics"]["values"] + return f"Successfully _log_info_to_run to run_id {run_id}" def adopt_children(self, result=None): """ diff --git a/flaml/tune/logger.py b/flaml/tune/logger.py new file mode 100644 index 000000000..959337b25 --- /dev/null +++ b/flaml/tune/logger.py @@ -0,0 +1,37 @@ +import logging +import os + + +class ColoredFormatter(logging.Formatter): + # ANSI escape codes for colors + COLORS = { + # logging.DEBUG: "\033[36m", # Cyan + # logging.INFO: "\033[32m", # Green + logging.WARNING: "\033[33m", # Yellow + logging.ERROR: "\033[31m", # Red + logging.CRITICAL: "\033[1;31m", # Bright Red + } + RESET = "\033[0m" # Reset to default + + def __init__(self, fmt, datefmt, use_color=True): + super().__init__(fmt, datefmt) + self.use_color = use_color + + def format(self, record): + formatted = super().format(record) + if self.use_color: + color = self.COLORS.get(record.levelno, "") + if color: + return f"{color}{formatted}{self.RESET}" + return formatted + + +logger = logging.getLogger(__name__) +use_color = True +if os.getenv("FLAML_LOG_NO_COLOR"): + use_color = False + +logger_formatter = ColoredFormatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S", use_color +) +logger.propagate = False diff --git a/flaml/tune/spark/utils.py b/flaml/tune/spark/utils.py index 5113549c1..959fb373f 100644 --- a/flaml/tune/spark/utils.py +++ b/flaml/tune/spark/utils.py @@ -162,6 +162,10 @@ def broadcast_code(custom_code="", file_name="mylearner"): assert isinstance(MyLargeLGBM(), LGBMEstimator) ``` """ + # Check if Spark is available + spark_available, _ = check_spark() + + # Write to local driver file system flaml_path = os.path.dirname(os.path.abspath(__file__)) custom_code = textwrap.dedent(custom_code) custom_path = os.path.join(flaml_path, file_name + ".py") @@ -169,6 +173,24 @@ def broadcast_code(custom_code="", file_name="mylearner"): with open(custom_path, "w") as f: f.write(custom_code) + # If using Spark, broadcast the code content to executors + if spark_available: + spark = SparkSession.builder.getOrCreate() + bc_code = spark.sparkContext.broadcast(custom_code) + + # Execute a job to ensure the code is distributed to all executors + def _write_code(bc): + code = bc.value + import os + + module_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name + ".py") + os.makedirs(os.path.dirname(module_path), exist_ok=True) + with open(module_path, "w") as f: + f.write(code) + return True + + spark.sparkContext.parallelize(range(1)).map(lambda _: _write_code(bc_code)).collect() + return custom_path diff --git a/flaml/tune/tune.py b/flaml/tune/tune.py index 84c113349..08a45c72c 100644 --- a/flaml/tune/tune.py +++ b/flaml/tune/tune.py @@ -21,11 +21,11 @@ except (ImportError, AssertionError): from .analysis import ExperimentAnalysis as EA else: ray_available = True - import logging from flaml.tune.spark.utils import PySparkOvertimeMonitor, check_spark +from .logger import logger, logger_formatter from .result import DEFAULT_METRIC from .trial import Trial @@ -41,8 +41,6 @@ except ImportError: internal_mlflow = False -logger = logging.getLogger(__name__) -logger.propagate = False _use_ray = True _runner = None _verbose = 0 @@ -521,10 +519,6 @@ def run( elif not logger.hasHandlers(): # Add the console handler. _ch = logging.StreamHandler(stream=sys.stdout) - logger_formatter = logging.Formatter( - "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", - "%m-%d %H:%M:%S", - ) _ch.setFormatter(logger_formatter) logger.addHandler(_ch) if verbose <= 2: @@ -752,10 +746,16 @@ def run( max_concurrent = max(1, search_alg.max_concurrent) else: max_concurrent = max(1, max_spark_parallelism) + passed_in_n_concurrent_trials = max(n_concurrent_trials, max_concurrent) n_concurrent_trials = min( n_concurrent_trials if n_concurrent_trials > 0 else num_executors, max_concurrent, ) + if n_concurrent_trials < passed_in_n_concurrent_trials: + logger.warning( + f"The actual concurrent trials is {n_concurrent_trials}. You can set the environment " + f"variable `FLAML_MAX_CONCURRENT` to '{passed_in_n_concurrent_trials}' to override the detected num of executors." + ) with parallel_backend("spark"): with Parallel(n_jobs=n_concurrent_trials, verbose=max(0, (verbose - 1) * 50)) as parallel: try: @@ -777,7 +777,7 @@ def run( and num_failures < upperbound_num_failures ): if automl_info and automl_info[0] > 0 and time_budget_s < np.inf: - time_budget_s -= automl_info[0] + time_budget_s -= automl_info[0] * n_concurrent_trials logger.debug(f"Remaining time budget with mlflow log latency: {time_budget_s} seconds.") while len(_runner.running_trials) < n_concurrent_trials: # suggest trials for spark diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 000000000..2b1db9c48 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + spark: mark a test as requiring Spark diff --git a/setup.py b/setup.py index e30db68a2..31cc56372 100644 --- a/setup.py +++ b/setup.py @@ -170,10 +170,9 @@ setuptools.setup( "Operating System :: OS Independent", # Specify the Python versions you support here. "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ], - python_requires=">=3.8", + python_requires=">=3.9", ) diff --git a/test/automl/test_extra_models.py b/test/automl/test_extra_models.py index 6c5cac099..e19f2cb20 100644 --- a/test/automl/test_extra_models.py +++ b/test/automl/test_extra_models.py @@ -17,6 +17,8 @@ from flaml import AutoML from flaml.automl.ml import sklearn_metric_loss_score from flaml.tune.spark.utils import check_spark +pytestmark = pytest.mark.spark + leaderboard = defaultdict(dict) warnings.simplefilter(action="ignore") diff --git a/test/automl/test_forecast.py b/test/automl/test_forecast.py index 6e5d97d4f..21e8bacfe 100644 --- a/test/automl/test_forecast.py +++ b/test/automl/test_forecast.py @@ -477,7 +477,10 @@ def test_forecast_classification(budget=5): def get_stalliion_data(): from pytorch_forecasting.data.examples import get_stallion_data - data = get_stallion_data() + # data = get_stallion_data() + data = pd.read_parquet( + "https://raw.githubusercontent.com/sktime/pytorch-forecasting/refs/heads/main/examples/data/stallion.parquet" + ) # add time index - For datasets with no missing values, FLAML will automate this process data["time_idx"] = data["date"].dt.year * 12 + data["date"].dt.month data["time_idx"] -= data["time_idx"].min() diff --git a/test/automl/test_mlflow.py b/test/automl/test_mlflow.py index 58d749d37..5185da9c8 100644 --- a/test/automl/test_mlflow.py +++ b/test/automl/test_mlflow.py @@ -10,6 +10,18 @@ from flaml import AutoML class TestMLFlowLoggingParam: + def test_update_and_install_requirements(self): + import mlflow + from sklearn import tree + + from flaml.fabric.mlflow import update_and_install_requirements + + with mlflow.start_run(run_name="test") as run: + sk_model = tree.DecisionTreeClassifier() + mlflow.sklearn.log_model(sk_model, "model", registered_model_name="test") + + update_and_install_requirements(run_id=run.info.run_id) + def test_should_start_new_run_by_default(self, automl_settings): with mlflow.start_run() as parent_run: automl = AutoML() diff --git a/test/nlp/test_autohf_classificationhead.py b/test/nlp/test_autohf_classificationhead.py index 9ce023f96..325e4aeb0 100644 --- a/test/nlp/test_autohf_classificationhead.py +++ b/test/nlp/test_autohf_classificationhead.py @@ -24,6 +24,8 @@ model_path_list = [ if sys.platform.startswith("darwin") and sys.version_info[0] == 3 and sys.version_info[1] == 11: pytest.skip("skipping Python 3.11 on MacOS", allow_module_level=True) +pytestmark = pytest.mark.spark # set to spark as parallel testing raised RuntimeError + def test_switch_1_1(): data_idx, model_path_idx = 0, 0 diff --git a/test/nlp/test_autohf_cv.py b/test/nlp/test_autohf_cv.py index e5429a7f5..4ec611307 100644 --- a/test/nlp/test_autohf_cv.py +++ b/test/nlp/test_autohf_cv.py @@ -5,6 +5,8 @@ import sys import pytest from utils import get_automl_settings, get_toy_data_seqclassification +pytestmark = pytest.mark.spark # set to spark as parallel testing raised MlflowException of changing parameter + @pytest.mark.skipif(sys.platform in ["darwin", "win32"], reason="do not run on mac os or windows") def test_cv(): diff --git a/test/nlp/test_default.py b/test/nlp/test_default.py index b3c901648..239fce227 100644 --- a/test/nlp/test_default.py +++ b/test/nlp/test_default.py @@ -10,6 +10,10 @@ from flaml.default import portfolio if sys.platform.startswith("darwin") and sys.version_info[0] == 3 and sys.version_info[1] == 11: pytest.skip("skipping Python 3.11 on MacOS", allow_module_level=True) +pytestmark = ( + pytest.mark.spark +) # set to spark as parallel testing raised ValueError: Feature NonExisting not implemented. + def pop_args(fit_kwargs): fit_kwargs.pop("max_iter", None) diff --git a/test/spark/test_0sparkml.py b/test/spark/test_0sparkml.py index 3f2198241..cb51b292f 100644 --- a/test/spark/test_0sparkml.py +++ b/test/spark/test_0sparkml.py @@ -3,11 +3,13 @@ import sys import warnings import mlflow +import numpy as np import pytest import sklearn.datasets as skds from packaging.version import Version from flaml import AutoML +from flaml.automl.data import auto_convert_dtypes_pandas, auto_convert_dtypes_spark, get_random_dataframe from flaml.tune.spark.utils import check_spark warnings.simplefilter(action="ignore") @@ -58,7 +60,7 @@ if sys.version_info >= (3, 11): else: skip_py311 = False -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def _test_spark_synapseml_lightgbm(spark=None, task="classification"): @@ -296,11 +298,88 @@ def _test_spark_large_df(): print("time cost in minutes: ", (end_time - start_time) / 60) +def test_get_random_dataframe(): + # Test with default parameters + df = get_random_dataframe(n_rows=50, ratio_none=0.2, seed=123) + assert df.shape == (50, 14) # Default is 200 rows and 14 columns + + # Test column types + assert "timestamp" in df.columns and np.issubdtype(df["timestamp"].dtype, np.datetime64) + assert "id" in df.columns and np.issubdtype(df["id"].dtype, np.integer) + assert "score" in df.columns and np.issubdtype(df["score"].dtype, np.floating) + assert "category" in df.columns and df["category"].dtype.name == "category" + + +def test_auto_convert_dtypes_pandas(): + # Create a test DataFrame with various types + import pandas as pd + + test_df = pd.DataFrame( + { + "int_col": ["1", "2", "3", "4", "5", "6", "6"], + "float_col": ["1.1", "2.2", "3.3", "NULL", "5.5", "6.6", "6.6"], + "date_col": ["2021-01-01", "2021-02-01", "NA", "2021-04-01", "2021-05-01", "2021-06-01", "2021-06-01"], + "cat_col": ["A", "B", "A", "A", "B", "A", "B"], + "string_col": ["text1", "text2", "text3", "text4", "text5", "text6", "text7"], + } + ) + + # Convert dtypes + converted_df, schema = auto_convert_dtypes_pandas(test_df) + + # Check conversions + assert schema["int_col"] == "int" + assert schema["float_col"] == "double" + assert schema["date_col"] == "timestamp" + assert schema["cat_col"] == "category" + assert schema["string_col"] == "string" + + +def test_auto_convert_dtypes_spark(): + """Test auto_convert_dtypes_spark function with various data types.""" + import pandas as pd + + # Create a test DataFrame with various types + test_pdf = pd.DataFrame( + { + "int_col": ["1", "2", "3", "4", "NA"], + "float_col": ["1.1", "2.2", "3.3", "NULL", "5.5"], + "date_col": ["2021-01-01", "2021-02-01", "NA", "2021-04-01", "2021-05-01"], + "cat_col": ["A", "B", "A", "C", "B"], + "string_col": ["text1", "text2", "text3", "text4", "text5"], + } + ) + + # Convert pandas DataFrame to Spark DataFrame + test_df = spark.createDataFrame(test_pdf) + + # Convert dtypes + converted_df, schema = auto_convert_dtypes_spark(test_df) + + # Check conversions + assert schema["int_col"] == "int" + assert schema["float_col"] == "double" + assert schema["date_col"] == "timestamp" + assert schema["cat_col"] == "string" # Conceptual category in schema + assert schema["string_col"] == "string" + + # Verify the actual data types from the Spark DataFrame + spark_dtypes = dict(converted_df.dtypes) + assert spark_dtypes["int_col"] == "int" + assert spark_dtypes["float_col"] == "double" + assert spark_dtypes["date_col"] == "timestamp" + assert spark_dtypes["cat_col"] == "string" # In Spark, categories are still strings + assert spark_dtypes["string_col"] == "string" + + if __name__ == "__main__": test_spark_synapseml_classification() test_spark_synapseml_regression() test_spark_synapseml_rank() test_spark_input_df() + test_get_random_dataframe() + test_auto_convert_dtypes_pandas() + test_auto_convert_dtypes_spark() # import cProfile # import pstats diff --git a/test/spark/test_automl.py b/test/spark/test_automl.py index 269161711..8785a8bfd 100644 --- a/test/spark/test_automl.py +++ b/test/spark/test_automl.py @@ -25,7 +25,7 @@ os.environ["FLAML_MAX_CONCURRENT"] = "2" spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_parallel_xgboost(hpo_method=None, data_size=1000): diff --git a/test/spark/test_ensemble.py b/test/spark/test_ensemble.py index 74dd17fd4..9278791e8 100644 --- a/test/spark/test_ensemble.py +++ b/test/spark/test_ensemble.py @@ -1,6 +1,7 @@ import os import unittest +import pytest from sklearn.datasets import load_wine from flaml import AutoML @@ -24,6 +25,8 @@ if os.path.exists(os.path.join(os.getcwd(), "test", "spark", "custom_mylearner.p else: skip_my_learner = True +pytestmark = pytest.mark.spark + class TestEnsemble(unittest.TestCase): def setUp(self) -> None: diff --git a/test/spark/test_exceptions.py b/test/spark/test_exceptions.py index a20de7fc2..13c265d37 100644 --- a/test/spark/test_exceptions.py +++ b/test/spark/test_exceptions.py @@ -9,7 +9,7 @@ from flaml.tune.spark.utils import check_spark spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_mlflow.py b/test/spark/test_mlflow.py index 5a809d5ac..b61b97814 100644 --- a/test/spark/test_mlflow.py +++ b/test/spark/test_mlflow.py @@ -21,6 +21,7 @@ try: from pyspark.ml.feature import VectorAssembler except ImportError: pass +pytestmark = pytest.mark.spark warnings.filterwarnings("ignore") skip_spark = importlib.util.find_spec("pyspark") is None diff --git a/test/spark/test_multiclass.py b/test/spark/test_multiclass.py index c3cf579e1..45f6e5b45 100644 --- a/test/spark/test_multiclass.py +++ b/test/spark/test_multiclass.py @@ -2,6 +2,7 @@ import os import unittest import numpy as np +import pytest import scipy.sparse from sklearn.datasets import load_iris, load_wine @@ -12,6 +13,7 @@ from flaml.tune.spark.utils import check_spark spark_available, _ = check_spark() skip_spark = not spark_available +pytestmark = pytest.mark.spark os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_notebook.py b/test/spark/test_notebook.py index 88650e423..31071bec9 100644 --- a/test/spark/test_notebook.py +++ b/test/spark/test_notebook.py @@ -9,7 +9,7 @@ from flaml.tune.spark.utils import check_spark spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] here = os.path.abspath(os.path.dirname(__file__)) os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_overtime.py b/test/spark/test_overtime.py index 13fec32e9..82d1aa81f 100644 --- a/test/spark/test_overtime.py +++ b/test/spark/test_overtime.py @@ -25,7 +25,7 @@ try: except ImportError: skip_spark = True -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_overtime(): diff --git a/test/spark/test_performance.py b/test/spark/test_performance.py index 8e4da7efd..febd1a4b8 100644 --- a/test/spark/test_performance.py +++ b/test/spark/test_performance.py @@ -11,7 +11,7 @@ from flaml.tune.spark.utils import check_spark spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" diff --git a/test/spark/test_tune.py b/test/spark/test_tune.py index 117a4ad80..0f9b5a9a2 100644 --- a/test/spark/test_tune.py +++ b/test/spark/test_tune.py @@ -14,7 +14,7 @@ from flaml.tune.spark.utils import check_spark spark_available, _ = check_spark() skip_spark = not spark_available -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] os.environ["FLAML_MAX_CONCURRENT"] = "2" X, y = load_breast_cancer(return_X_y=True) diff --git a/test/spark/test_utils.py b/test/spark/test_utils.py index 0f772c7c4..1141972be 100644 --- a/test/spark/test_utils.py +++ b/test/spark/test_utils.py @@ -36,7 +36,7 @@ except ImportError: print("Spark is not installed. Skip all spark tests.") skip_spark = True -pytestmark = pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests.") +pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark] def test_with_parameters_spark():