В этом посте демонстрируется подход к обработке большого файла AWS S3 (вероятно, миллионов записей) в управляемые фрагменты, работающие параллельно, с помощью AWS S3 Select.
В моем последнем посте мы обсуждали достижение эффективности обработки большого файла AWS S3 с помощью S3 select. Обработка была последовательной, и для большого файла могла потребоваться целая вечность. Так как же нам распараллелить обработку нескольких модулей? 🤔 Что ж, в этом посте мы реализуем и посмотрим, как это работает!
📝 Настоятельно рекомендую ознакомиться с моим последним сообщением Потоковая передача файлов S3 через S3-Select, чтобы установить контекст для этого сообщения.
Мне всегда нравится разбивать проблему на более мелкие части, необходимые для ее решения (аналитический подход). Давайте попробуем решить это за 3 простых шага:
1. Найдите общее количество байтов в файле S3.
Очень похоже на 1-й шаг нашего последнего поста, здесь мы также пытаемся сначала найти размер файла. Следующий фрагмент кода демонстрирует функцию, которая выполняет запрос HEAD к нашему файлу S3 и определяет размер файла в байтах.
# core/utils.py
def get_s3_file_size(bucket: str, key: str) -> int:
"""Gets the file size of S3 object by a HEAD request
Args:
bucket (str): S3 bucket
key (str): S3 object path
Returns:
int: File size in bytes. Defaults to 0 if any error.
"""
aws_profile = current_app.config.get('AWS_PROFILE_NAME')
s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
file_size = 0
try:
response = s3_client.head_object(Bucket=bucket, Key=key)
if response:
file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
except ClientError:
logger.exception(f'Client error reading S3 file {bucket} : {key}')
return file_size
2. Создайте задачу сельдерея для обработки куска
Здесь мы бы определили задачу celery для обработки фрагмента файла (который будет выполняться параллельно позже). Общая обработка здесь будет выглядеть так:
- Получите
start
иend bytes
этого чанка в качестве аргумента - Извлеките эту часть файла S3 с помощью S3-Select и сохраните ее локально во временном файле (как CSV в этом примере).
- Прочтите этот временный файл и выполните любую необходимую обработку.
- Удалить этот временный файл
📝 Я называю эту задачу обработчиком фрагментов файлов. Он обрабатывает кусок из файла. Выполнение нескольких таких задач завершает обработку всего файла.
# core/tasks.py
@celery.task(name='core.tasks.chunk_file_processor', bind=True)
def chunk_file_processor(self, **kwargs):
""" Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
filename = kwargs.get('filename')
start_byte_range = kwargs.get('start_byte_range')
end_byte_range = kwargs.get('end_byte_range')
header_row_str = kwargs.get('header_row_str')
local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)
logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
try:
# 1. fetch data from S3 and store it in a file
store_scrm_file_s3_content_in_local_file(
bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)
# 2. Process the chunk file in temp folder
id_set = set()
with open(file_path) as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
for row in csv_reader:
# perform any other processing here
id_set.add(int(row.get('id')))
logger.info(f'{min(id_set)} --> {max(id_set)}')
# 3. delete local file
if path.exists(file_path):
unlink(file_path)
except Exception:
logger.exception(f'Error in file processor: {filename}')
3. Выполняйте несколько задач сельдерея параллельно
Это самый интересный шаг в этом потоке. Мы создадим несколько задач сельдерея для параллельного запуска через Группу сельдерея.
Как только мы узнаем общее количество байтов файла в S3 (из шага 1), мы вычисляем start
и end bytes
для чанка и вызываем задачу, которую мы созданный на шаге 2 через группу celery. Диапазон start
и end bytes
представляет собой непрерывный диапазон размера файла. При желании мы также можем вызвать задачу обратного вызова (результата) после завершения всех наших задач обработки.
# core/tasks.py @celery.task(name='core.tasks.s3_parallel_file_processing', bind=True) def s3_parallel_file_processing_task(self, **kwargs): """ Creates celery tasks to process chunks of file in parallel """ bucket = kwargs.get('bucket') key = kwargs.get('key') try: filename = key # 1. Check file headers for validity -> if failed, stop processing desired_row_headers = ( 'id', 'name', 'age', 'latitude', 'longitude', 'monthly_income', 'experienced' ) is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select( bucket=bucket, key=key, delimiter=S3_FILE_DELIMITER, desired_headers=desired_row_headers) if not is_headers_valid: logger.error(f'{filename} file headers validation failed') return False logger.info(f'{filename} file headers validation successful') # 2. fetch file size via S3 HEAD file_size = get_s3_file_size(bucket=bucket, key=key) if not file_size: logger.error(f'{filename} file size invalid {file_size}') return False logger.info(f'We are processing {filename} file about {file_size} bytes :-o') # 2. Create celery group tasks for chunk of this file size for parallel processing start_range = 0 end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size) tasks = [] while start_range < file_size: tasks.append( chunk_file_processor.signature( kwargs={ 'bucket': bucket, 'key': key, 'filename': filename, 'start_byte_range': start_range, 'end_byte_range': end_range, 'header_row_str': header_row_str } ) ) start_range = end_range end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range) job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename})) _ = job.apply_async() except Exception: logger.exception(f'Error processing file: {filename}') @celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False) def chunk_file_processor_callback(self, *args, **kwargs): """ Callback task called post chunk_file_processor() """ logger.info('Callback called')
# core/utils.py def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int, delimiter: str, header_row: str): """Retrieves S3 file content via S3 Select ScanRange and store it in a local file. Make sure the header validation is done before calling this. Args: bucket (str): S3 bucket key (str): S3 key file_path (str): Local file path to store the contents start_range (int): Start range of ScanRange parameter of S3 Select end_range (int): End range of ScanRange parameter of S3 Select delimiter (str): S3 file delimiter header_row (str): Header row of the local file. This will be inserted as first line in local file. """ aws_profile = current_app.config.get('AWS_PROFILE_NAME') s3_client = boto3.session.Session(profile_name=aws_profile).client('s3') expression = 'SELECT * FROM S3Object' try: response = s3_client.select_object_content( Bucket=bucket, Key=key, ExpressionType='SQL', Expression=expression, InputSerialization={ 'CSV': { 'FileHeaderInfo': 'USE', 'FieldDelimiter': delimiter, 'RecordDelimiter': '\n' } }, OutputSerialization={ 'CSV': { 'FieldDelimiter': delimiter, 'RecordDelimiter': '\n', }, }, ScanRange={ 'Start': start_range, 'End': end_range }, ) """ select_object_content() response is an event stream that can be looped to concatenate the overall result set """ f = open(file_path, 'wb') # we receive data in bytes and hence opening file in bytes f.write(header_row.encode()) f.write('\n'.encode()) for event in response['Payload']: if records := event.get('Records'): f.write(records['Payload']) f.close() except ClientError: logger.exception(f'Client error reading S3 file {bucket} : {key}') except Exception: logger.exception(f'Error reading S3 file {bucket} : {key}')
Вот и все! 😎 Теперь вместо потоковой передачи файла S3 байт за байтом мы распараллеливаем обработку, параллельно обрабатывая чанки. Это было не так сложно, не так ли? 😅
📌 Вы можете проверить мой репозиторий GitHub для полного рабочего примера этого подхода.
🔍 Сравнение времени обработки
Если мы сравним время обработки того же файла, который мы обрабатывали в нашем предыдущем посте, с этим подходом, обработка выполняется примерно на 68% быстрее (с тем же оборудованием и конфигурацией). 😆
╔═════════════════╦═══════════════════╦════════════════════════════╗ ║ ║ Streaming S3 File ║ Parallel Processing S3 File║ ╠═════════════════╬═══════════════════╬════════════════════════════╣ ║ File size ║ 4.8MB ║ 4.8MB ║ ║ Processing time ║ ~37 seconds ║ ~12 seconds ║ ╚═════════════════╩═══════════════════╩════════════════════════════╝
✔️ Преимущества такого подхода
- Очень большой файл, содержащий миллионы записей, можно обработать за считанные минуты. Я использую этот подход в производственной среде некоторое время, и это очень приятно
- Вычисления и обработка распределены между распределенными работниками
- Скорость обработки может быть изменена за счет наличия рабочих пулов.
- Больше никаких проблем с памятью
📑 Ресурсы
- Мой репозиторий GitHub, демонстрирующий описанный выше подход
- Ссылка на AWS S3 Select boto3
- Руководство пользователя AWS S3 Select
Первоначально опубликовано на https://dev.to 22 января 2019 г.