Использование MLFlow и Apache Spark для изоляции логических выводов и зависимостей обучения.
Вы когда-нибудь пытались развернуть модель машинного обучения только для того, чтобы получить сообщение об ошибке, похожее на следующее?
RuntimeError: Running pandas version ('1.5.3') is incompatible with min ('1.1.0'} and max ('1.2.5') versions
Если вы развернули и получили ошибку, это ваш счастливый день. Лучше ошибка, чем модельная зависимость, которая дает немного разные результаты для немного разных версий.
Каждый инженер, работающий с MLOps, рано или поздно столкнется с проблемой синхронизации между зависимостями, используемыми во время обучения модели, и теми, которые используются во время обслуживания модели.
Если вы развернули достаточное количество моделей машинного обучения, мне не нужно доказывать вам, что эта история реальна, но просто для протокола вот еще несколько голосов, рассказывающих вам похожие истории:
- проблема, обсуждаемая на TensorFlow GitHub и вопрос в StackOverflow
- отличный пост на LinkedIn с обсуждением проблемы
Решение? Измените версию зависимости логического вывода, чтобы она была совместима с той же версией, которая использовалась во время обучения, верно? Может быть, но поскольку это довольно громоздко и может довести кого угодно до безумия, сообщество разработало несколько решений.
Давайте рассмотрим, как MLFlow помогает нам достичь полной изоляции между обучением и зависимостями вывода для пакетной оценки с помощью Apache Spark.
Проблема совместимости — как решить?
Давайте создадим модель с проблемой совместимости, с которой мы можем поиграть:
class MyModel(): NAME = "my_model" def __init__(self): # compatibility limitation self._min_compatible_pandas_version: Version = version.parse("1.1.0") self._max_compatible_pandas_version: Version = version.parse("1.2.5") def _check_compatibility(self): inference_pandas_version: Version = version.parse(pd.__version__) logger.info(f"running pandas version: {inference_pandas_version}") if inference_pandas_version < self._min_compatible_pandas_version \ or inference_pandas_version > self._max_compatible_pandas_version: raise RuntimeError( "Running pandas version ('%s') is incompatible with min ('%s'} and max ('%s') versions." % (inference_pandas_version, self._min_compatible_pandas_version, self._max_compatible_pandas_version) ) def predict(self, context, model_input: pd.DataFrame): self._check_compatibility() return model_input.apply(lambda column: column * 10)
Поскольку приведенная выше модель может работать только на Pandas>=1.1.0,<=1.2.5
, если мы хотим реализовать конвейер пакетного вывода, который по какой-то причине нуждается в Pandas==1.5.3
, мы столкнемся с задачей обучения синхронизации и зависимостей вывода.
Давайте попробуем запустить его в любом случае:
» poetry run python blog_post_mlflow_spark_udf/batch_predict.py "local" RuntimeError: Running pandas version ('1.5.3') is incompatible with min ('1.1.0'} and max ('1.2.5') versions.
Хорошо. Мы ожидали этого. 😃
Запуск в изолированной среде — шаг за шагом
Были бы вы счастливы, если бы модель машинного обучения могла работать полностью изолированно от вашей службы логического вывода, в данном случае от среды Spark? Если модель и служба логических выводов работают вместе, но полностью изолированы, это означает, что нам не нужно решать проблему синхронизации обучения и зависимостей логических выводов. Таким образом, модель машинного обучения может иметь любую зависимость, которая ей нужна, и служба логического вывода может иметь любую зависимость, в которой она нуждается. Окончание конфликтов. Мир на Земле. Давайте попробуем.
1. Определим нашу модель как модель Python, совместимую с MLFlow.
Для этого наша упрощенная модель просто наследуется от mlflow.pyfunc.PythonModel
:
class MyModel(mlflow.pyfunc.PythonModel): NAME = "my_model" ...
Обратите внимание, что я использую PythonModel
общее решение, но MLFlow имеет множество встроенных разновидностей моделей, которые вы можете использовать из коробки для регистрации обученной модели в реестре моделей MLFlow.
2. Определите среду обучения модели в файле yaml.
name: my-model-env channels: - conda-forge dependencies: - python >= 3.8,<=3.10 - pandas = 1.2
Назовем его model-env.yml
.
Вы можете узнать больше о файлах среды conda в официальной документации conda.
3. Зарегистрируйте модель в реестре MLFlow.
def register(): my_model = MyModel() model_info: ModelInfo = mlflow.pyfunc.log_model(artifact_path=my_model.NAME, registered_model_name=my_model.NAME, python_model=my_model, code_path=[str(Path("blog_post_mlflow_spark_udf"))], conda_env=str(Path("blog_post_mlflow_spark_udf", "my_model", "model-env.yml"))) return model_info if __name__ == '__main__': register()
Обратите внимание, как мы регистрируем нашу модель вместе с ее определением среды обучения в параметре conda_env
.
» poetry run python blog_post_mlflow_spark_udf/register_model.py Created version '1' of model 'my_model'.
В этой статье мы не будем подробно рассматривать реестр моделей MLFlow. В текущем примере модель регистрируется в локальной файловой системе вместо удаленного реестра MLFlow, хотя код практически такой же, за исключением конфигурации. Подробнее о жизненном цикле моделей машинного обучения и о том, где вписывается реестр, вы можете прочитать в этой статье.
4. Выполнение пакетного вывода в изолированной среде.
def batch_predict(env_manager: str): spark = SparkSession \ .builder \ .getOrCreate() input_data = generate_dummy_input_data(spark) # for more on URIs format for loading models see: # https://www.mlflow.org/docs/2.2.2/concepts.html#referencing-artifacts model_uri = f"models:/{MyModel.NAME}/latest" predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri, env_manager=env_manager) return input_data.withColumn("prediction", predict_udf(struct("value"))) if __name__ == '__main__': result = batch_predict(env_manager=(sys.argv[1])) result.show()
Обратите внимание на параметр env_manager
. Это позволит нам выполнять вывод изолированно, как мы хотим. Мы передадим его в mlflow.pyfunc.spark_udf
. Чтобы запустить вывод в режиме изоляции, мы должны передать "conda"
в env_manager
.
Эта УДФ будет:
- получить нашу модель из реестра MLFlow на основе заданного
model_uri
; - распаковать полученную модель и подготовить ее к выводу
- запустить функцию вывода модели, в нашем случае
my_model.predict
Когда Spark UDF будет готов, мы будем использовать его как любую другую UDF, передавая соответствующие столбцы в качестве входных данных, чтобы позволить Spark использовать его для обработки и вычисления результатов распределенным образом. И, поскольку это просто Spark UDF, мы можем добавить больше логики до (предварительная обработка) и после (постобработка) по своему желанию.
Давайте попробуем и запустим с conda
:
» poetry run python blog_post_mlflow_spark_udf/batch_predict.py "conda" +----+-----+----------+ | id|value|prediction| +----+-----+----------+ |id_1| 1| 10.0| |id_2| 2| 20.0| |id_3| 3| 30.0| +----+-----+----------+
Вуаля! Оно работает!
Счастливый (ML) конвейер!
____
Примечания:
- Весь код можно найти здесь: https://github.com/Yerachmiel-Feltzman/blog-post_mlflow-spark-udf
- Если вы хотите понять, как это волшебство работает под капотом, следите за моими следующими сообщениями, чтобы получать уведомления, когда я опубликую следующую статью на тему «Раскрытие UDF Spark MLFlow».