Введение
Фабричный шаблон проектирования является частью семейства шаблонов проектирования, называемых «Creational Design Patterns». Эти шаблоны проектирования используются для создания и конструирования объектов ООП наилучшим образом для конкретного сценария. .
В центре внимания этой статьи будет шаблон проектирования factory, который предоставляет интерфейс для создания объектов в суперклассе, но позволяет подклассам изменять тип создаваемых объектов.
Мы должны реализовать шаблон проектирования factory, когда хотим создавать объекты, не указывая точный класс, который будет создан. Это может повысить гибкость кода за счет добавления еще одного уровня абстракции и инкапсуляции создания объектов. Это означает более удобочитаемость, возможность повторного использования кода, упрощение обслуживания и многое другое.
Пример — Фабрика конвейеров данных
Допустим, наша цель — создать несколько конвейеров данных, по одному для каждой базы данных, с которой работает наша компания.
В настоящее время мы не уверены, какие базы данных будут задействованы в этом процессе, но мы знаем, что все они выполняют одни и те же шаги Извлечение и Преобразование. Единственное отличие заключается в шаге Загрузить в целевую базу данных.
Мы могли бы решить эту проблему, создав общий интерфейс, который будет принимать целевую базу данных в качестве входных данных и возвращать объект конвейера, специально оптимизированный для обработки передачи данных в эту конкретную базу данных.
Мы можем сделать все это, следуя заводскому шаблону проектирования.
Класс конвейера
Первый шаг при кодировании шаблона проектирования фабрики — создание общего класса, от которого будут наследоваться все остальные классы. Попробуйте закодировать повторяющееся поведение всех классов в этом классе и освободить место для изменений.
В нашем случае повторяющееся поведение заключается в том, что все конвейеры имеют одни и те же шаги Extract и Transform, поэтому давайте начнем с этого:
from abc import ABC, abstractmethod class Pipeline(ABC): # Extract Step - loading customer data @staticmethod def extract(): return pd.read_csv("test.csv") # Transform Step - sorting, adding moving average, cleaning null values def transform(self): df = Pipeline.extract() df.sort_values(by = 'age', ascending=False, inplace=True) df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2) df['moving_average_3'].replace(np.nan, 0, inplace=True) return df
Далее я добавлю метод run_etl() и абстрактный метод load(). Это означает, что мне не нужно реализовывать эту функцию прямо сейчас, но все классы, которые унаследованы от этого класса, должны будут реализовать функцию с именем load.
from abc import ABC, abstractmethod import pandas as pd import numpy as np class Pipeline(ABC): # Extract Step - loading customer data @staticmethod def extract(): return pd.read_csv("test.csv") # Transform Step - sorting, adding moving average, cleaning null values def transform(self): df = Pipeline.extract() df.sort_values(by = 'age', ascending=False, inplace=True) df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2) df['moving_average_3'].replace(np.nan, 0, inplace=True) return df # Load Step - abstract method @abstractmethod def load(self): pass # Activate the Pipline def run_etl(self): self.load()
Классы MySQL_Pipeline и Postgres_Pipeline
Итак, у нас есть общий класс Pipeline, давайте создадим более конкретные классы конвейера. Начну с MySQL:
class MySQL_Pipeline(Pipeline): # Load data to MySQL def load(self): # Grab transformed data data_to_load = self.transform() # Connect to MySQL and insert the data try: customers_db = pymysql.connect(host='***.***.***.***', port=3306, user = 'Bar', passwd = '****', database='customers') cursor = customers_db.cursor() cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist()) customers_db.commit() cursor.close() except Exception as e: print(e) finally: customers_db.close()
Как мы видим, этот класс MySQL_Pipeline наследуется от общего класса Pipeline. Поэтому он должен реализовать функцию load. Эта функция загрузки специально используется для загрузки данных в MySQL, она не будет работать ни с какой другой базой данных.
Далее, давайте сделаем то же самое для PostgreSQL:
class Postgres_Pipeline(Pipeline): # Load data to Postgres def load(self): # Grab transformed data data_to_load = self.transform() # Connect to Postgres and insert the data try: customers_db = psycopg2.connect(host='***.***.***.***', port=5432, user = 'postgres', password = '****', database='customers') cursor = customers_db.cursor() cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist()) customers_db.commit() cursor.close() except Exception as e: print(e) finally: customers_db.close()
Класс PipelineFactory
Теперь, чтобы реализовать шаблон проектирования фабрики, нам нужно добавить класс, который принимает целевую базу данных в качестве входных данных и возвращает один из подклассов конвейера, MySQL или Postgres.
# Factory for generating pipelines class PipelineFactory(): def create_pipeline(destination): if destination == 'mysql': return MySQL_Pipeline() elif destination == 'postgres': return Postgres_Pipeline() else: raise ValueError("Pipeline type not supported")
Чтобы использовать эту фабрику конвейеров, все, что нам нужно сделать, это указать место назначения, и мы легко создадим столько конвейеров для этого места назначения, сколько захотим, например:
# Generate two pipelines, one for MySQL, one for Postgres mysql_pipline = PipelineFactory.create_pipeline('mysql') postgres_pipline = PipelineFactory.create_pipeline('postgres')
Затем для их запуска нам просто нужно развернуть метод run_etl():
# Deploy pipelines mysql_pipline.run_etl() postgres_pipline.run_etl()
В результате получаются чистые и преобразованные данные в обеих базах данных.
MySQL:
PostgreSQL:
Полный код:
Посмотрите код этой статьи:
from abc import ABC, abstractmethod import pandas as pd import numpy as np import pymysql import psycopg2 class Pipeline(ABC): # Extract Step - loading customer data @staticmethod def extract(): return pd.read_csv("test.csv") # Transform Step - sorting, adding moving average, cleaning null values def transform(self): df = Pipeline.extract() df.sort_values(by = 'age', ascending=False, inplace=True) df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2) df['moving_average_3'].replace(np.nan, 0, inplace=True) return df # Load Step - abstract method @abstractmethod def load(self): pass # Activate the Pipline def run_etl(self): self.load() class MySQL_Pipeline(Pipeline): # Load data to MySQL def load(self): # Grab transformed data data_to_load = self.transform() # Connect to MySQL and insert the data try: customers_db = pymysql.connect(host='***.***.***.***', port=3306, user = 'Bar', passwd = '****', database='customers') cursor = customers_db.cursor() cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist()) customers_db.commit() cursor.close() except Exception as e: print(e) finally: customers_db.close() class Postgres_Pipeline(Pipeline): # Load data to Postgres def load(self): # Grab transformed data data_to_load = self.transform() # Connect to Postgres and insert the data try: customers_db = psycopg2.connect(host='***.***.***.***', port=5432, user = 'postgres', password = '****', database='customers') cursor = customers_db.cursor() cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist()) customers_db.commit() cursor.close() except Exception as e: print(e) finally: customers_db.close() # Factory for generating pipelines class PipelineFactory(): def create_pipeline(destination): if destination == 'mysql': return MySQL_Pipeline() elif destination == 'postgres': return Postgres_Pipeline() else: raise ValueError("Pipeline type not supported") # Generate two pipelines, one for MySQL, one for Postgres mysql_pipline = PipelineFactory.create_pipeline('mysql') postgres_pipline = PipelineFactory.create_pipeline('postgres') # Deploy pipelines mysql_pipline.run_etl() postgres_pipline.run_etl()
Это для статьи. Следите за новостями из серии Python.