mirror of
https://github.com/microsoft/FLAML.git
synced 2026-02-09 02:09:16 +08:00
* Merged PR 1686010: Bump version to 2.3.5.post2, Distribute source and wheel, Fix license-file, Only log better models
- Fix license-file
- Bump version to 2.3.5.post2
- Distribute source and wheel
- Log better models only
- Add artifact_path to register_automl_pipeline
- Improve logging of _automl_user_configurations
----
This pull request fixes the project’s configuration by updating the license metadata for compliance with FLAML OSS 2.3.5.
The changes in `/pyproject.toml` update the project’s license and readme metadata by replacing deprecated keys with the new structured fields.
- `/pyproject.toml`: Replaced `license_file` with `license = { text = "MIT" }`.
- `/pyproject.toml`: Replaced `description-file` with `readme = "README.md"`.
<!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot -->
Related work items: #4252053
* Merged PR 1688479: Handle feature_importances_ is None, Catch RuntimeError and wait for spark cluster to recover
- Add warning message when feature_importances_ is None (#3982120)
- Catch RuntimeError and wait for spark cluster to recover (#3982133)
----
Bug fix.
This pull request prevents an AttributeError in the feature importance plotting function by adding a check for a `None` value with an informative warning message.
- `flaml/fabric/visualization.py`: Checks if `result.feature_importances_` is `None`, logs a warning with possible reasons, and returns early.
- `flaml/fabric/visualization.py`: Imports `logger` from `flaml.automl.logger` to support the warning message.
<!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot -->
Related work items: #3982120, #3982133
* Removed deprecated metadata section
* Fix log_params, log_artifact doesn't support run_id in mlflow 2.6.0
* Remove autogen
* Remove autogen
* Remove autogen
* Merged PR 1776547: Fix flaky test test_automl
Don't throw error when time budget is not enough
----
#### AI description (iteration 1)
#### PR Classification
Bug fix addressing a failing test in the AutoML notebook example.
#### PR Summary
This PR fixes a flaky test by adding a conditional check in the AutoML test that prints a message and exits early if no best estimator is set, thereby preventing unpredictable test failures.
- `test/automl/test_notebook_example.py`: Introduced a check to print "Training budget is not sufficient" and return if `automl.best_estimator` is not found.
<!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot -->
Related work items: #4573514
* Merged PR 1777952: Fix unrecognized or malformed field 'license-file' when uploading wheel to feed
Try to fix InvalidDistribution: Invalid distribution metadata: unrecognized or malformed field 'license-file'
----
Bug fix addressing package metadata configuration.
This pull request fixes the error with unrecognized or malformed license file fields during wheel uploads by updating the setup configuration.
- In `setup.py`, added `license="MIT"` and `license_files=["LICENSE"]` to provide proper license metadata.
<!-- GitOpsUserAgent=GitOps.Apps.Server.pullrequestcopilot -->
Related work items: #4560034
* Cherry-pick Merged PR 1879296: Add support to python 3.12 and spark 4.0
* Cherry-pick Merged PR 1890869: Improve time_budget estimation for mlflow logging
* Cherry-pick Merged PR 1879296: Add support to python 3.12 and spark 4.0
* Disable openai workflow
* Add python 3.12 to test envs
* Manually trigger openai
* Support markdown files with underscore-prefixed file names
* Improve save dependencies
* SynapseML is not installed
* Fix syntax error:Module !flaml/autogen was never imported
* macos 3.12 also hangs
* fix syntax error
* Update python version in actions
* Install setuptools for using pkg_resources
* Fix test_automl_performance in Github actions
* Fix test_nested_run
455 lines
14 KiB
Python
455 lines
14 KiB
Python
import atexit
|
|
import os
|
|
from functools import partial
|
|
from timeit import timeit
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
try:
|
|
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
|
|
import pyspark
|
|
import pyspark.pandas as ps
|
|
from pyspark.ml.linalg import Vectors
|
|
from pyspark.sql import SparkSession
|
|
|
|
from flaml.automl.ml import sklearn_metric_loss_score
|
|
from flaml.automl.spark import disable_spark_ansi_mode, restore_spark_ansi_mode
|
|
from flaml.automl.spark.metrics import spark_metric_loss_score
|
|
from flaml.automl.spark.utils import (
|
|
iloc_pandas_on_spark,
|
|
len_labels,
|
|
to_pandas_on_spark,
|
|
train_test_split_pyspark,
|
|
unique_pandas_on_spark,
|
|
unique_value_first_index,
|
|
)
|
|
from flaml.tune.spark.utils import (
|
|
_spark_major_minor_version,
|
|
check_spark,
|
|
get_broadcast_data,
|
|
get_n_cpus,
|
|
with_parameters,
|
|
)
|
|
|
|
spark_available, _ = check_spark()
|
|
skip_spark = not spark_available
|
|
except ImportError:
|
|
print("Spark is not installed. Skip all spark tests.")
|
|
skip_spark = True
|
|
_spark_major_minor_version = (0, 0)
|
|
|
|
|
|
pytestmark = [pytest.mark.skipif(skip_spark, reason="Spark is not installed. Skip all spark tests."), pytest.mark.spark]
|
|
|
|
|
|
@pytest.mark.skipif(_spark_major_minor_version[0] < 4, reason="Requires Spark 4.0+")
|
|
def test_to_pandas_on_spark_temp_override():
|
|
import pyspark.pandas as ps
|
|
from pyspark.sql import Row
|
|
|
|
from flaml.automl.spark.utils import to_pandas_on_spark
|
|
|
|
spark_session = SparkSession.builder.getOrCreate()
|
|
spark, ansi_conf, adjusted = disable_spark_ansi_mode()
|
|
atexit.register(restore_spark_ansi_mode, spark, ansi_conf, adjusted)
|
|
|
|
# Ensure we can toggle options
|
|
orig = ps.get_option("compute.fail_on_ansi_mode")
|
|
|
|
try:
|
|
spark_session.conf.set("spark.sql.ansi.enabled", "true")
|
|
ps.set_option("compute.fail_on_ansi_mode", True)
|
|
|
|
# create tiny spark df
|
|
sdf = spark_session.createDataFrame([Row(a=1, b=2)])
|
|
# Should not raise as our function temporarily disables fail_on_ansi_mode
|
|
pds = to_pandas_on_spark(sdf)
|
|
assert "a" in pds.columns
|
|
finally:
|
|
# restore test environment
|
|
ps.set_option("compute.fail_on_ansi_mode", orig)
|
|
spark_session.conf.set("spark.sql.ansi.enabled", "false")
|
|
|
|
|
|
def test_with_parameters_spark():
|
|
def train(config, data=None):
|
|
if isinstance(data, pyspark.broadcast.Broadcast):
|
|
data = data.value
|
|
print(config, len(data))
|
|
|
|
data = ["a"] * 10**6
|
|
|
|
with_parameters_train = with_parameters(train, data=data)
|
|
partial_train = partial(train, data=data)
|
|
|
|
spark = SparkSession.builder.getOrCreate()
|
|
rdd = spark.sparkContext.parallelize(list(range(2)))
|
|
|
|
t_partial = timeit(lambda: rdd.map(lambda x: partial_train(config=x)).collect(), number=5)
|
|
print("python_partial_train: " + str(t_partial))
|
|
|
|
t_spark = timeit(
|
|
lambda: rdd.map(lambda x: with_parameters_train(config=x)).collect(),
|
|
number=5,
|
|
)
|
|
print("spark_with_parameters_train: " + str(t_spark))
|
|
|
|
# assert t_spark < t_partial
|
|
|
|
|
|
def test_get_n_cpus_spark():
|
|
n_cpus = get_n_cpus()
|
|
assert isinstance(n_cpus, int)
|
|
|
|
|
|
def test_broadcast_code():
|
|
from flaml.automl.model import LGBMEstimator
|
|
from flaml.tune.spark.utils import broadcast_code
|
|
|
|
custom_code = """
|
|
from flaml.automl.model import LGBMEstimator
|
|
from flaml import tune
|
|
|
|
class MyLargeLGBM(LGBMEstimator):
|
|
@classmethod
|
|
def search_space(cls, **params):
|
|
return {
|
|
"n_estimators": {
|
|
"domain": tune.lograndint(lower=4, upper=32768),
|
|
"init_value": 32768,
|
|
"low_cost_init_value": 4,
|
|
},
|
|
"num_leaves": {
|
|
"domain": tune.lograndint(lower=4, upper=32768),
|
|
"init_value": 32768,
|
|
"low_cost_init_value": 4,
|
|
},
|
|
}
|
|
"""
|
|
|
|
_ = broadcast_code(custom_code=custom_code)
|
|
from flaml.tune.spark.mylearner import MyLargeLGBM
|
|
|
|
assert isinstance(MyLargeLGBM(), LGBMEstimator)
|
|
|
|
|
|
def test_get_broadcast_data():
|
|
data = ["a"] * 10
|
|
spark = SparkSession.builder.getOrCreate()
|
|
bc_data = spark.sparkContext.broadcast(data)
|
|
assert get_broadcast_data(bc_data) == data
|
|
|
|
|
|
def test_to_pandas_on_spark(capsys):
|
|
pdf = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
|
|
psdf = to_pandas_on_spark(pdf)
|
|
print(psdf)
|
|
captured = capsys.readouterr()
|
|
assert captured.out == " a b\n0 1 4\n1 2 5\n2 3 6\n"
|
|
assert isinstance(psdf, ps.DataFrame)
|
|
|
|
spark = SparkSession.builder.getOrCreate()
|
|
sdf = spark.createDataFrame(pdf)
|
|
psdf = to_pandas_on_spark(sdf)
|
|
print(psdf)
|
|
captured = capsys.readouterr()
|
|
assert captured.out == " a b\n0 1 4\n1 2 5\n2 3 6\n"
|
|
assert isinstance(psdf, ps.DataFrame)
|
|
|
|
pds = pd.Series([1, 2, 3])
|
|
pss = to_pandas_on_spark(pds)
|
|
print(pss)
|
|
captured = capsys.readouterr()
|
|
assert captured.out == "0 1\n1 2\n2 3\ndtype: int64\n"
|
|
assert isinstance(pss, ps.Series)
|
|
|
|
|
|
def test_train_test_split_pyspark():
|
|
pdf = pd.DataFrame({"x": [1, 2, 3, 4], "y": [0, 1, 1, 0]})
|
|
spark = SparkSession.builder.getOrCreate()
|
|
sdf = spark.createDataFrame(pdf).repartition(1)
|
|
psdf = to_pandas_on_spark(sdf).spark.repartition(1)
|
|
train_sdf, test_sdf = train_test_split_pyspark(sdf, test_fraction=0.5, to_pandas_spark=False, seed=1)
|
|
train_psdf, test_psdf = train_test_split_pyspark(psdf, test_fraction=0.5, stratify_column="y", seed=1)
|
|
assert isinstance(train_sdf, pyspark.sql.dataframe.DataFrame)
|
|
assert isinstance(test_sdf, pyspark.sql.dataframe.DataFrame)
|
|
assert isinstance(train_psdf, ps.DataFrame)
|
|
assert isinstance(test_psdf, ps.DataFrame)
|
|
assert train_sdf.count() == 2
|
|
assert train_psdf.shape[0] == 2
|
|
print(train_sdf.toPandas())
|
|
print(test_sdf.toPandas())
|
|
print(train_psdf.to_pandas())
|
|
print(test_psdf.to_pandas())
|
|
|
|
|
|
def test_unique_pandas_on_spark():
|
|
pdf = pd.DataFrame({"x": [1, 2, 2, 3], "y": [0, 1, 1, 0]})
|
|
spark = SparkSession.builder.getOrCreate()
|
|
sdf = spark.createDataFrame(pdf)
|
|
psdf = to_pandas_on_spark(sdf)
|
|
label_set, counts = unique_pandas_on_spark(psdf)
|
|
assert np.array_equal(label_set, np.array([2, 1, 3]))
|
|
assert np.array_equal(counts, np.array([2, 1, 1]))
|
|
|
|
|
|
def test_len_labels():
|
|
y1 = np.array([1, 2, 5, 4, 5])
|
|
y2 = ps.Series([1, 2, 5, 4, 5])
|
|
assert len_labels(y1) == 4
|
|
ll, la = len_labels(y2, return_labels=True)
|
|
assert ll == 4
|
|
assert set(la.to_numpy()) == {1, 2, 5, 4}
|
|
|
|
|
|
def test_unique_value_first_index():
|
|
y1 = np.array([1, 2, 5, 4, 5])
|
|
y2 = ps.Series([1, 2, 5, 4, 5])
|
|
l1, f1 = unique_value_first_index(y1)
|
|
l2, f2 = unique_value_first_index(y2)
|
|
assert np.array_equal(l1, np.array([1, 2, 4, 5]))
|
|
assert np.array_equal(f1, np.array([0, 1, 3, 2]))
|
|
assert np.array_equal(l2, np.array([1, 2, 5, 4]))
|
|
assert np.array_equal(f2, np.array([0, 1, 2, 3]))
|
|
|
|
|
|
def test_n_current_trials():
|
|
spark = SparkSession.builder.getOrCreate()
|
|
sc = spark._jsc.sc()
|
|
num_executors = len([executor.host() for executor in sc.statusTracker().getExecutorInfos()]) - 1
|
|
|
|
def get_n_current_trials(n_concurrent_trials=0, num_executors=num_executors):
|
|
try:
|
|
FLAML_MAX_CONCURRENT = int(os.getenv("FLAML_MAX_CONCURRENT", 0))
|
|
except ValueError:
|
|
FLAML_MAX_CONCURRENT = 0
|
|
num_executors = max(num_executors, FLAML_MAX_CONCURRENT, 1)
|
|
max_spark_parallelism = max(spark.sparkContext.defaultParallelism, FLAML_MAX_CONCURRENT)
|
|
max_concurrent = max(1, max_spark_parallelism)
|
|
n_concurrent_trials = min(
|
|
n_concurrent_trials if n_concurrent_trials > 0 else num_executors,
|
|
max_concurrent,
|
|
)
|
|
print("n_concurrent_trials:", n_concurrent_trials)
|
|
return n_concurrent_trials
|
|
|
|
os.environ["FLAML_MAX_CONCURRENT"] = "invlaid"
|
|
assert get_n_current_trials() == max(num_executors, 1)
|
|
tmp_max = spark.sparkContext.defaultParallelism
|
|
assert get_n_current_trials(1) == 1
|
|
assert get_n_current_trials(2) == min(2, tmp_max)
|
|
assert get_n_current_trials(50) == min(50, tmp_max)
|
|
assert get_n_current_trials(200) == min(200, tmp_max)
|
|
os.environ["FLAML_MAX_CONCURRENT"] = "0"
|
|
assert get_n_current_trials() == max(num_executors, 1)
|
|
os.environ["FLAML_MAX_CONCURRENT"] = "4"
|
|
tmp_max = max(4, spark.sparkContext.defaultParallelism)
|
|
assert get_n_current_trials() == min(4, tmp_max)
|
|
os.environ["FLAML_MAX_CONCURRENT"] = "9999999"
|
|
assert get_n_current_trials() == 9999999
|
|
os.environ["FLAML_MAX_CONCURRENT"] = "100"
|
|
tmp_max = max(100, spark.sparkContext.defaultParallelism)
|
|
assert get_n_current_trials(1) == 1
|
|
assert get_n_current_trials(2) == min(2, tmp_max)
|
|
assert get_n_current_trials(50) == min(50, tmp_max)
|
|
assert get_n_current_trials(200) == min(200, tmp_max)
|
|
del os.environ["FLAML_MAX_CONCURRENT"]
|
|
|
|
|
|
def test_iloc_pandas_on_spark():
|
|
psdf = ps.DataFrame({"x": [1, 2, 2, 3], "y": [0, 1, 1, 0]}, index=[0, 1, 2, 3])
|
|
psds = ps.Series([1, 2, 2, 3], index=[0, 1, 2, 3])
|
|
assert iloc_pandas_on_spark(psdf, 0).tolist() == [1, 0]
|
|
d1 = iloc_pandas_on_spark(psdf, slice(1, 3)).to_pandas()
|
|
d2 = pd.DataFrame({"x": [2, 2], "y": [1, 1]}, index=[1, 2])
|
|
assert d1.equals(d2)
|
|
d1 = iloc_pandas_on_spark(psdf, [1, 3]).to_pandas()
|
|
d2 = pd.DataFrame({"x": [2, 3], "y": [1, 0]}, index=[0, 1])
|
|
assert d1.equals(d2)
|
|
assert iloc_pandas_on_spark(psds, 0) == 1
|
|
assert iloc_pandas_on_spark(psds, slice(1, 3)).tolist() == [2, 2]
|
|
assert iloc_pandas_on_spark(psds, [0, 3]).tolist() == [1, 3]
|
|
|
|
|
|
def test_spark_metric_loss_score():
|
|
spark = SparkSession.builder.getOrCreate()
|
|
scoreAndLabels = map(
|
|
lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]),
|
|
[
|
|
(0.1, 0.0),
|
|
(0.1, 1.0),
|
|
(0.4, 0.0),
|
|
(0.6, 0.0),
|
|
(0.6, 1.0),
|
|
(0.6, 1.0),
|
|
(0.8, 1.0),
|
|
],
|
|
)
|
|
dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])
|
|
dataset = to_pandas_on_spark(dataset)
|
|
# test pr_auc
|
|
metric = spark_metric_loss_score(
|
|
"pr_auc",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("pr_auc: ", metric)
|
|
assert str(metric)[:5] == "0.166"
|
|
# test roc_auc
|
|
metric = spark_metric_loss_score(
|
|
"roc_auc",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("roc_auc: ", metric)
|
|
assert str(metric)[:5] == "0.291"
|
|
|
|
scoreAndLabels = [
|
|
(-28.98343821, -27.0),
|
|
(20.21491975, 21.5),
|
|
(-25.98418959, -22.0),
|
|
(30.69731842, 33.0),
|
|
(74.69283752, 71.0),
|
|
]
|
|
dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])
|
|
dataset = to_pandas_on_spark(dataset)
|
|
# test rmse
|
|
metric = spark_metric_loss_score(
|
|
"rmse",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("rmse: ", metric)
|
|
assert str(metric)[:5] == "2.842"
|
|
# test mae
|
|
metric = spark_metric_loss_score(
|
|
"mae",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("mae: ", metric)
|
|
assert str(metric)[:5] == "2.649"
|
|
# test r2
|
|
metric = spark_metric_loss_score(
|
|
"r2",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("r2: ", metric)
|
|
assert str(metric)[:5] == "0.006"
|
|
# test mse
|
|
metric = spark_metric_loss_score(
|
|
"mse",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("mse: ", metric)
|
|
assert str(metric)[:5] == "8.079"
|
|
# test var
|
|
metric = spark_metric_loss_score(
|
|
"var",
|
|
dataset["raw"],
|
|
dataset["label"],
|
|
)
|
|
print("var: ", metric)
|
|
assert str(metric)[:5] == "-1489"
|
|
|
|
predictionAndLabelsWithProbabilities = [
|
|
(1.0, 1.0, 1.0, [0.1, 0.8, 0.1]),
|
|
(0.0, 2.0, 1.0, [0.9, 0.05, 0.05]),
|
|
(0.0, 0.0, 1.0, [0.8, 0.2, 0.0]),
|
|
(1.0, 1.0, 1.0, [0.3, 0.65, 0.05]),
|
|
]
|
|
dataset = spark.createDataFrame(
|
|
predictionAndLabelsWithProbabilities,
|
|
["prediction", "label", "weight", "probability"],
|
|
)
|
|
dataset = to_pandas_on_spark(dataset)
|
|
# test logloss
|
|
metric = spark_metric_loss_score(
|
|
"log_loss",
|
|
dataset["probability"],
|
|
dataset["label"],
|
|
)
|
|
print("log_loss: ", metric)
|
|
assert str(metric)[:5] == "0.968"
|
|
# test accuracy
|
|
metric = spark_metric_loss_score(
|
|
"accuracy",
|
|
dataset["prediction"],
|
|
dataset["label"],
|
|
)
|
|
print("accuracy: ", metric)
|
|
assert str(metric)[:5] == "0.25"
|
|
# test f1
|
|
metric = spark_metric_loss_score(
|
|
"f1",
|
|
dataset["prediction"],
|
|
dataset["label"],
|
|
)
|
|
print("f1: ", metric)
|
|
assert str(metric)[:5] == "0.333"
|
|
|
|
scoreAndLabels = [
|
|
([0.0, 1.0], [0.0, 2.0]),
|
|
([0.0, 2.0], [0.0, 1.0]),
|
|
([], [0.0]),
|
|
([2.0], [2.0]),
|
|
([2.0, 0.0], [2.0, 0.0]),
|
|
([0.0, 1.0, 2.0], [0.0, 1.0]),
|
|
([1.0], [1.0, 2.0]),
|
|
]
|
|
dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "label"])
|
|
dataset = to_pandas_on_spark(dataset)
|
|
# test micro_f1
|
|
metric = spark_metric_loss_score(
|
|
"micro_f1",
|
|
dataset["prediction"],
|
|
dataset["label"],
|
|
)
|
|
print("micro_f1: ", metric)
|
|
assert str(metric)[:5] == "0.304"
|
|
# test macro_f1
|
|
metric = spark_metric_loss_score(
|
|
"macro_f1",
|
|
dataset["prediction"],
|
|
dataset["label"],
|
|
)
|
|
print("macro_f1: ", metric)
|
|
assert str(metric)[:5] == "0.111"
|
|
|
|
scoreAndLabels = [
|
|
(
|
|
[1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0],
|
|
[1.0, 2.0, 3.0, 4.0, 5.0],
|
|
),
|
|
([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 3.0]),
|
|
([1.0, 2.0, 3.0, 4.0, 5.0], []),
|
|
]
|
|
dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "label"])
|
|
dataset = to_pandas_on_spark(dataset)
|
|
# test ap
|
|
metric = spark_metric_loss_score(
|
|
"ap",
|
|
dataset["prediction"],
|
|
dataset["label"],
|
|
)
|
|
print("ap: ", metric)
|
|
assert str(metric)[:5] == "0.644"
|
|
# test ndcg
|
|
# ndcg is tested in synapseML rank tests, so we don't need to test it here
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# test_with_parameters_spark()
|
|
# test_get_n_cpus_spark()
|
|
# test_broadcast_code()
|
|
# test_get_broadcast_data()
|
|
# test_train_test_split_pyspark()
|
|
test_n_current_trials()
|
|
# test_len_labels()
|
|
# test_iloc_pandas_on_spark()
|
|
test_spark_metric_loss_score()
|