Введение
Фабричный шаблон проектирования является частью семейства шаблонов проектирования, называемых «Creational Design Patterns». Эти шаблоны проектирования используются для создания и конструирования объектов ООП наилучшим образом для конкретного сценария. .
В центре внимания этой статьи будет шаблон проектирования factory, который предоставляет интерфейс для создания объектов в суперклассе, но позволяет подклассам изменять тип создаваемых объектов.
Мы должны реализовать шаблон проектирования factory, когда хотим создавать объекты, не указывая точный класс, который будет создан. Это может повысить гибкость кода за счет добавления еще одного уровня абстракции и инкапсуляции создания объектов. Это означает более удобочитаемость, возможность повторного использования кода, упрощение обслуживания и многое другое.
Пример — Фабрика конвейеров данных
Допустим, наша цель — создать несколько конвейеров данных, по одному для каждой базы данных, с которой работает наша компания.
В настоящее время мы не уверены, какие базы данных будут задействованы в этом процессе, но мы знаем, что все они выполняют одни и те же шаги Извлечение и Преобразование. Единственное отличие заключается в шаге Загрузить в целевую базу данных.
Мы могли бы решить эту проблему, создав общий интерфейс, который будет принимать целевую базу данных в качестве входных данных и возвращать объект конвейера, специально оптимизированный для обработки передачи данных в эту конкретную базу данных.
Мы можем сделать все это, следуя заводскому шаблону проектирования.
Класс конвейера
Первый шаг при кодировании шаблона проектирования фабрики — создание общего класса, от которого будут наследоваться все остальные классы. Попробуйте закодировать повторяющееся поведение всех классов в этом классе и освободить место для изменений.
В нашем случае повторяющееся поведение заключается в том, что все конвейеры имеют одни и те же шаги Extract и Transform, поэтому давайте начнем с этого:
from abc import ABC, abstractmethod
class Pipeline(ABC):
# Extract Step - loading customer data
@staticmethod
def extract():
return pd.read_csv("test.csv")
# Transform Step - sorting, adding moving average, cleaning null values
def transform(self):
df = Pipeline.extract()
df.sort_values(by = 'age', ascending=False, inplace=True)
df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2)
df['moving_average_3'].replace(np.nan, 0, inplace=True)
return df
Далее я добавлю метод run_etl() и абстрактный метод load(). Это означает, что мне не нужно реализовывать эту функцию прямо сейчас, но все классы, которые унаследованы от этого класса, должны будут реализовать функцию с именем load.
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
class Pipeline(ABC):
# Extract Step - loading customer data
@staticmethod
def extract():
return pd.read_csv("test.csv")
# Transform Step - sorting, adding moving average, cleaning null values
def transform(self):
df = Pipeline.extract()
df.sort_values(by = 'age', ascending=False, inplace=True)
df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2)
df['moving_average_3'].replace(np.nan, 0, inplace=True)
return df
# Load Step - abstract method
@abstractmethod
def load(self):
pass
# Activate the Pipline
def run_etl(self):
self.load()
Классы MySQL_Pipeline и Postgres_Pipeline
Итак, у нас есть общий класс Pipeline, давайте создадим более конкретные классы конвейера. Начну с MySQL:
class MySQL_Pipeline(Pipeline):
# Load data to MySQL
def load(self):
# Grab transformed data
data_to_load = self.transform()
# Connect to MySQL and insert the data
try:
customers_db = pymysql.connect(host='***.***.***.***', port=3306,
user = 'Bar', passwd = '****', database='customers')
cursor = customers_db.cursor()
cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist())
customers_db.commit()
cursor.close()
except Exception as e:
print(e)
finally:
customers_db.close()
Как мы видим, этот класс MySQL_Pipeline наследуется от общего класса Pipeline. Поэтому он должен реализовать функцию load. Эта функция загрузки специально используется для загрузки данных в MySQL, она не будет работать ни с какой другой базой данных.
Далее, давайте сделаем то же самое для PostgreSQL:
class Postgres_Pipeline(Pipeline):
# Load data to Postgres
def load(self):
# Grab transformed data
data_to_load = self.transform()
# Connect to Postgres and insert the data
try:
customers_db = psycopg2.connect(host='***.***.***.***', port=5432,
user = 'postgres', password = '****', database='customers')
cursor = customers_db.cursor()
cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist())
customers_db.commit()
cursor.close()
except Exception as e:
print(e)
finally:
customers_db.close()
Класс PipelineFactory
Теперь, чтобы реализовать шаблон проектирования фабрики, нам нужно добавить класс, который принимает целевую базу данных в качестве входных данных и возвращает один из подклассов конвейера, MySQL или Postgres.
# Factory for generating pipelines
class PipelineFactory():
def create_pipeline(destination):
if destination == 'mysql':
return MySQL_Pipeline()
elif destination == 'postgres':
return Postgres_Pipeline()
else:
raise ValueError("Pipeline type not supported")
Чтобы использовать эту фабрику конвейеров, все, что нам нужно сделать, это указать место назначения, и мы легко создадим столько конвейеров для этого места назначения, сколько захотим, например:
# Generate two pipelines, one for MySQL, one for Postgres
mysql_pipline = PipelineFactory.create_pipeline('mysql')
postgres_pipline = PipelineFactory.create_pipeline('postgres')
Затем для их запуска нам просто нужно развернуть метод run_etl():
# Deploy pipelines mysql_pipline.run_etl() postgres_pipline.run_etl()
В результате получаются чистые и преобразованные данные в обеих базах данных.
MySQL:

PostgreSQL:

Полный код:
Посмотрите код этой статьи:
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import pymysql
import psycopg2
class Pipeline(ABC):
# Extract Step - loading customer data
@staticmethod
def extract():
return pd.read_csv("test.csv")
# Transform Step - sorting, adding moving average, cleaning null values
def transform(self):
df = Pipeline.extract()
df.sort_values(by = 'age', ascending=False, inplace=True)
df['moving_average_3'] = round(df['age'].rolling(3).mean(), 2)
df['moving_average_3'].replace(np.nan, 0, inplace=True)
return df
# Load Step - abstract method
@abstractmethod
def load(self):
pass
# Activate the Pipline
def run_etl(self):
self.load()
class MySQL_Pipeline(Pipeline):
# Load data to MySQL
def load(self):
# Grab transformed data
data_to_load = self.transform()
# Connect to MySQL and insert the data
try:
customers_db = pymysql.connect(host='***.***.***.***', port=3306,
user = 'Bar', passwd = '****', database='customers')
cursor = customers_db.cursor()
cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist())
customers_db.commit()
cursor.close()
except Exception as e:
print(e)
finally:
customers_db.close()
class Postgres_Pipeline(Pipeline):
# Load data to Postgres
def load(self):
# Grab transformed data
data_to_load = self.transform()
# Connect to Postgres and insert the data
try:
customers_db = psycopg2.connect(host='***.***.***.***', port=5432,
user = 'postgres', password = '****', database='customers')
cursor = customers_db.cursor()
cursor.executemany("INSERT INTO customers (first_name, age, moving_average_3) VALUES (%s, %s, %s)", data_to_load.values.tolist())
customers_db.commit()
cursor.close()
except Exception as e:
print(e)
finally:
customers_db.close()
# Factory for generating pipelines
class PipelineFactory():
def create_pipeline(destination):
if destination == 'mysql':
return MySQL_Pipeline()
elif destination == 'postgres':
return Postgres_Pipeline()
else:
raise ValueError("Pipeline type not supported")
# Generate two pipelines, one for MySQL, one for Postgres
mysql_pipline = PipelineFactory.create_pipeline('mysql')
postgres_pipline = PipelineFactory.create_pipeline('postgres')
# Deploy pipelines
mysql_pipline.run_etl()
postgres_pipline.run_etl()
Это для статьи. Следите за новостями из серии Python.