WedX - журнал о программировании и компьютерных науках

ETL в файлы csv, разделенные, а затем помещенные в s3 для использования красным смещением

Только начал работать с Кибой, не нашел ничего очевидного, но я мог просто направить своего внутреннего ребенка (который ищет свою обувь, глядя в потолок).

Я хочу сбросить очень большую таблицу в Amazon Redshift. Кажется, что самый быстрый способ сделать это — записать кучу CSV-файлов в корзину S3, а затем попросить Redshift (с помощью команды COPY) загрузить их. Остальное сделают гремлины с магическим масштабированием.

Итак, я думаю, что я хочу, чтобы Киба писал CSV-файл для каждых 10 000 строк данных, затем помещал его в s3, а затем начинал запись в новый файл. В конце сделайте вызов постобработки COPY

Итак, могу ли я «конвейеризировать» работу или это должен быть большой вложенный класс назначения?

i.e.

source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process


Ответы:


1

Я не уверен, что здесь точный вопрос. Но я думаю, что ваше решение кажется правильным в целом, но мало предложений.

  1. Вы также можете подумать о том, чтобы иметь более 10 000 записей на CSV-файл и gzip их при отправке на S3.
  2. Вы хотите увидеть создание menifest, содержащее список нескольких файлов, а затем запустить команду copy, предоставляющую menifest файл в качестве входных данных.
15.06.2018

2

Киба автор здесь. Спасибо, что попробовали!

В настоящее время лучший способ реализовать это — создать то, что я бы назвал «пунктом назначения буферизации». (Версия этого, вероятно, в какой-то момент окажется в Kiba Common).

(Пожалуйста, проверьте тщательно, я только что написал это для вас сегодня утром, вообще не запускал его, хотя в прошлом я использовал менее общие версии. Также имейте в виду, что эта версия использует буфер в памяти для ваших 10 000 строк, поэтому увеличение числа до гораздо большего потребует памяти. Однако также можно создать версию с наименьшим потреблением памяти, которая будет записывать строки в файл по мере их получения)

class BufferingDestination
  def initialize(buffer_size:, on_flush:)
    @buffer = []
    @buffer_size
    @on_flush = on_flush
    @batch_index = 0
  end

  def write(row)
    @buffer << row
    flush if @buffer.size >= buffer_size
  end

  def flush
    on_flush.call(batch_index: @batch_index, rows: @buffer)
    @batch_index += 1
    @buffer.clear
  end

  def close
    flush
  end
end

Это то, что вы затем можете использовать следующим образом, например, повторно используя место назначения Kiba Common CSV (хотя можно и свой написать):

require 'kiba-common/destinations/csv'

destination BufferingDestination,
  buffer_size: 10_000,
  on_flush: -> { |batch_index, rows|
    filename = File.join("output-#{sprintf("%08d", batch_index)}")
    csv = Kiba::Common::Destinations::CSV.new(
      filename: filename,
      csv_options: { ... },
      headers: %w(my fields here)
    )
    rows.each { |r| csv.write(r) }
    csv.close
  }

Затем вы можете активировать COPY прямо в блоке on_flush после создания файла (если вы хотите, чтобы загрузка началась сразу) или в блоке post_process (но это начнется только после того, как все CSV будут готовы, что может быть функцией чтобы обеспечить некоторую форму транзакционной глобальной загрузки, если хотите).

Вы можете проявить фантазию и запустить очередь потоков, чтобы фактически обрабатывать загрузку параллельно, если вам это действительно нужно (но тогда будьте осторожны с потоками зомби и т. д.).

Другой способ — иметь «многошаговые» процессы ETL, когда один скрипт генерирует CSV, а другой выбирает их для загрузки, выполняя их одновременно (это то, что я объяснял в своем выступлении на RubyKaigi 2018, например).

Дайте мне знать, как все работает для вас!

15.06.2018
  • Мой другой способ заключался в том, чтобы использовать преобразование для агрегирования набора строк, а затем возвращать имя файла на следующий шаг в конвейере. Однако я еще не пробовал этого. 15.06.2018
  • Это отличная идея, но вы не можете сделать это на уровне преобразования! Я планирую внести изменения в Kiba, которые позволят именно это, потому что я думаю, что это было бы здорово для таких сценариев — вы можете прокомментировать это здесь github.com/thbar/kiba/issues/53 или подпишитесь, чтобы получать обновления. Спасибо за заметку! 16.06.2018
  • @KenMayer первая реализация, которая в конечном итоге позволит то, что вы упомянули, доступна здесь github.com/thbar/kiba /pull/57 (на данный момент считается экспериментальным/нестабильным, но я все же решил упомянуть об этом). 17.06.2018

  • 3

    Thibaut, я сделал что-то подобное, за исключением того, что я загрузил это во временный файл, я думаю...

    require 'csv'
    
    # @param limit [Integer, 1_000] Number of rows per csv file
    # @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
    #        each csv file is finished
    module PacerPro
      class CSVDestination
        def initialize(limit: 1_000, callback: ->(obj) { })
          @limit = limit
          @callback = callback
    
          @csv = nil
          @row_count = 0
        end
    
        # @param row [Hash] returned from transforms
        def write(row)
          csv << row.values
          @row_count += 1
          return if row_count < limit
    
          self.close
        end
    
        # Called by Kiba when the transform pipeline is finished
        def close
          csv.close
    
          callback.call(csv)
    
          tempfile.unlink
    
          @csv = nil
          @row_count = 0
        end
    
        private
    
        attr_reader :limit, :callback
        attr_reader :row_count, :tempfile
    
        def csv
          @csv ||= begin
            @tempfile = Tempfile.new('csv')
            CSV.open(@tempfile, 'w')
          end
        end
      end
    end
    
    15.06.2018
  • Выглядит хорошо - спасибо, что поделились своим решением! Надеюсь, у вас будет хороший опыт :-) 16.06.2018
  • Новые материалы

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

    Учебные заметки: создание моего первого пакета Node.js
    Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

    Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
    Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..

    ИИ в аэрокосмической отрасли
    Каждый полет – это шаг вперед к великой мечте. Чтобы это происходило в их собственном темпе, необходима команда астронавтов для погони за космосом и команда технического обслуживания..


    Для любых предложений по сайту: wedx@cp9.ru