В этом посте демонстрируется подход к обработке большого файла AWS S3 (вероятно, миллионов записей) в управляемые фрагменты, работающие параллельно, с помощью AWS S3 Select.

В моем последнем посте мы обсуждали достижение эффективности обработки большого файла AWS S3 с помощью S3 select. Обработка была последовательной, и для большого файла могла потребоваться целая вечность. Так как же нам распараллелить обработку нескольких модулей? 🤔 Что ж, в этом посте мы реализуем и посмотрим, как это работает!

📝 Настоятельно рекомендую ознакомиться с моим последним сообщением Потоковая передача файлов S3 через S3-Select, чтобы установить контекст для этого сообщения.

Мне всегда нравится разбивать проблему на более мелкие части, необходимые для ее решения (аналитический подход). Давайте попробуем решить это за 3 простых шага:

1. Найдите общее количество байтов в файле S3.

Очень похоже на 1-й шаг нашего последнего поста, здесь мы также пытаемся сначала найти размер файла. Следующий фрагмент кода демонстрирует функцию, которая выполняет запрос HEAD к нашему файлу S3 и определяет размер файла в байтах.

# core/utils.py

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size

2. Создайте задачу сельдерея для обработки куска

Здесь мы бы определили задачу celery для обработки фрагмента файла (который будет выполняться параллельно позже). Общая обработка здесь будет выглядеть так:

  • Получите start и end bytes этого чанка в качестве аргумента
  • Извлеките эту часть файла S3 с помощью S3-Select и сохраните ее локально во временном файле (как CSV в этом примере).
  • Прочтите этот временный файл и выполните любую необходимую обработку.
  • Удалить этот временный файл

📝 Я называю эту задачу обработчиком фрагментов файлов. Он обрабатывает кусок из файла. Выполнение нескольких таких задач завершает обработку всего файла.

# core/tasks.py

@celery.task(name='core.tasks.chunk_file_processor', bind=True)
def chunk_file_processor(self, **kwargs):
    """ Creates and process a single file chunk based on S3 Select ScanRange start and end bytes
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    filename = kwargs.get('filename')
    start_byte_range = kwargs.get('start_byte_range')
    end_byte_range = kwargs.get('end_byte_range')
    header_row_str = kwargs.get('header_row_str')
    local_file = filename.replace('.csv', f'.{start_byte_range}.csv')
    file_path = path.join(current_app.config.get('BASE_DIR'), 'temp', local_file)

    logger.info(f'Processing {filename} chunk range {start_byte_range} -> {end_byte_range}')
    try:
        # 1. fetch data from S3 and store it in a file
        store_scrm_file_s3_content_in_local_file(
            bucket=bucket, key=key, file_path=file_path, start_range=start_byte_range,
            end_range=end_byte_range, delimiter=S3_FILE_DELIMITER, header_row=header_row_str)

        # 2. Process the chunk file in temp folder
        id_set = set()
        with open(file_path) as csv_file:
            csv_reader = csv.DictReader(csv_file, delimiter=S3_FILE_DELIMITER)
            for row in csv_reader:
                # perform any other processing here
                id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')

        # 3. delete local file
        if path.exists(file_path):
            unlink(file_path)
    except Exception:
        logger.exception(f'Error in file processor: {filename}')

3. Выполняйте несколько задач сельдерея параллельно

Это самый интересный шаг в этом потоке. Мы создадим несколько задач сельдерея для параллельного запуска через Группу сельдерея.
Как только мы узнаем общее количество байтов файла в S3 (из шага 1), мы вычисляем start и end bytes для чанка и вызываем задачу, которую мы созданный на шаге 2 через группу celery. Диапазон start и end bytes представляет собой непрерывный диапазон размера файла. При желании мы также можем вызвать задачу обратного вызова (результата) после завершения всех наших задач обработки.

# core/tasks.py

@celery.task(name='core.tasks.s3_parallel_file_processing', bind=True)
def s3_parallel_file_processing_task(self, **kwargs):
    """ Creates celery tasks to process chunks of file in parallel
    """
    bucket = kwargs.get('bucket')
    key = kwargs.get('key')
    try:
        filename = key
        # 1. Check file headers for validity -> if failed, stop processing
        desired_row_headers = (
            'id',
            'name',
            'age',
            'latitude',
            'longitude',
            'monthly_income',
            'experienced'
        )
        is_headers_valid, header_row_str = validate_scrm_file_headers_via_s3_select(
            bucket=bucket,
            key=key,
            delimiter=S3_FILE_DELIMITER,
            desired_headers=desired_row_headers)
        if not is_headers_valid:
            logger.error(f'{filename} file headers validation failed')
            return False
        logger.info(f'{filename} file headers validation successful')

        # 2. fetch file size via S3 HEAD
        file_size = get_s3_file_size(bucket=bucket, key=key)
        if not file_size:
            logger.error(f'{filename} file size invalid {file_size}')
            return False
        logger.info(f'We are processing {filename} file about {file_size} bytes :-o')

        # 2. Create celery group tasks for chunk of this file size for parallel processing
        start_range = 0
        end_range = min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size)
        tasks = []
        while start_range < file_size:
            tasks.append(
                chunk_file_processor.signature(
                    kwargs={
                        'bucket': bucket,
                        'key': key,
                        'filename': filename,
                        'start_byte_range': start_range,
                        'end_byte_range': end_range,
                        'header_row_str': header_row_str
                    }
                )
            )
            start_range = end_range
            end_range = end_range + min(S3_FILE_PROCESSING_CHUNK_SIZE, file_size - end_range)
        job = (group(tasks) | chunk_file_processor_callback.s(data={'filename': filename}))
        _ = job.apply_async()
    except Exception:
        logger.exception(f'Error processing file: {filename}')


@celery.task(name='core.tasks.chunk_file_processor_callback', bind=True, ignore_result=False)
def chunk_file_processor_callback(self, *args, **kwargs):
    """ Callback task called post chunk_file_processor()
    """
    logger.info('Callback called')

# core/utils.py

def store_scrm_file_s3_content_in_local_file(bucket: str, key: str, file_path: str, start_range: int, end_range: int,
                                             delimiter: str, header_row: str):
    """Retrieves S3 file content via S3 Select ScanRange and store it in a local file.
       Make sure the header validation is done before calling this.

    Args:
        bucket (str): S3 bucket
        key (str): S3 key
        file_path (str): Local file path to store the contents
        start_range (int): Start range of ScanRange parameter of S3 Select
        end_range (int): End range of ScanRange parameter of S3 Select
        delimiter (str): S3 file delimiter
        header_row (str): Header row of the local file. This will be inserted as first line in local file.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    try:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'CSV': {
                    'FieldDelimiter': delimiter,
                    'RecordDelimiter': '\n',
                },
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        """
        f = open(file_path, 'wb')  # we receive data in bytes and hence opening file in bytes
        f.write(header_row.encode())
        f.write('\n'.encode())
        for event in response['Payload']:
            if records := event.get('Records'):
                f.write(records['Payload'])
        f.close()
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    except Exception:
        logger.exception(f'Error reading S3 file {bucket} : {key}')

Вот и все! 😎 Теперь вместо потоковой передачи файла S3 байт за байтом мы распараллеливаем обработку, параллельно обрабатывая чанки. Это было не так сложно, не так ли? 😅

📌 Вы можете проверить мой репозиторий GitHub для полного рабочего примера этого подхода.

🔍 Сравнение времени обработки

Если мы сравним время обработки того же файла, который мы обрабатывали в нашем предыдущем посте, с этим подходом, обработка выполняется примерно на 68% быстрее (с тем же оборудованием и конфигурацией). 😆

╔═════════════════╦═══════════════════╦════════════════════════════╗
║                 ║ Streaming S3 File Parallel Processing S3 File║
╠═════════════════╬═══════════════════╬════════════════════════════╣
║ File size       ║ 4.8MB             ║ 4.8MB                      ║
║ Processing time ║ ~37 seconds       ║ ~12 seconds                ║
╚═════════════════╩═══════════════════╩════════════════════════════╝

✔️ Преимущества такого подхода

  • Очень большой файл, содержащий миллионы записей, можно обработать за считанные минуты. Я использую этот подход в производственной среде некоторое время, и это очень приятно
  • Вычисления и обработка распределены между распределенными работниками
  • Скорость обработки может быть изменена за счет наличия рабочих пулов.
  • Больше никаких проблем с памятью

📑 Ресурсы

Первоначально опубликовано на https://dev.to 22 января 2019 г.