Киба автор здесь. Спасибо, что попробовали!
В настоящее время лучший способ реализовать это — создать то, что я бы назвал «пунктом назначения буферизации». (Версия этого, вероятно, в какой-то момент окажется в Kiba Common).
(Пожалуйста, проверьте тщательно, я только что написал это для вас сегодня утром, вообще не запускал его, хотя в прошлом я использовал менее общие версии. Также имейте в виду, что эта версия использует буфер в памяти для ваших 10 000 строк, поэтому увеличение числа до гораздо большего потребует памяти. Однако также можно создать версию с наименьшим потреблением памяти, которая будет записывать строки в файл по мере их получения)
class BufferingDestination
def initialize(buffer_size:, on_flush:)
@buffer = []
@buffer_size
@on_flush = on_flush
@batch_index = 0
end
def write(row)
@buffer << row
flush if @buffer.size >= buffer_size
end
def flush
on_flush.call(batch_index: @batch_index, rows: @buffer)
@batch_index += 1
@buffer.clear
end
def close
flush
end
end
Это то, что вы затем можете использовать следующим образом, например, повторно используя место назначения Kiba Common CSV (хотя можно и свой написать):
require 'kiba-common/destinations/csv'
destination BufferingDestination,
buffer_size: 10_000,
on_flush: -> { |batch_index, rows|
filename = File.join("output-#{sprintf("%08d", batch_index)}")
csv = Kiba::Common::Destinations::CSV.new(
filename: filename,
csv_options: { ... },
headers: %w(my fields here)
)
rows.each { |r| csv.write(r) }
csv.close
}
Затем вы можете активировать COPY
прямо в блоке on_flush
после создания файла (если вы хотите, чтобы загрузка началась сразу) или в блоке post_process
(но это начнется только после того, как все CSV будут готовы, что может быть функцией чтобы обеспечить некоторую форму транзакционной глобальной загрузки, если хотите).
Вы можете проявить фантазию и запустить очередь потоков, чтобы фактически обрабатывать загрузку параллельно, если вам это действительно нужно (но тогда будьте осторожны с потоками зомби и т. д.).
Другой способ — иметь «многошаговые» процессы ETL, когда один скрипт генерирует CSV, а другой выбирает их для загрузки, выполняя их одновременно (это то, что я объяснял в своем выступлении на RubyKaigi 2018, например).
Дайте мне знать, как все работает для вас!
15.06.2018