Я смотрю на потоковую передачу scala/slick и пытаюсь понять, как она работает. Вот мой тестовый код
val bigdata = TableQuery[BigData]
val x = db.stream(bigdata.result.transactionally.withStatementParameters(fetchSize = 100)).foreach {
(tuple: (Int, UUID)) =>
println(tuple._1 + " " + tuple._2)
Thread.sleep(50)//emulating slow consumer.
}
Await.result(x, 100000 seconds)
Пока код работает, я включил журнал запросов postgresql, чтобы понять, что происходит под капотом. Я вижу повторный запрос каждые 100 элементов
2015-11-06 15:03:24 IST [24379-3] postgres@scala_test ЖУРНАЛ: выполнить выборку из S_2/C_3: выбрать x2. "id", x2. "data" из "bigdata" x2 2015-11-06 15:03:29 IST [24379-4] postgres@scala_test ЖУРНАЛ: выполнить
fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:34 IST [24379-5] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:39 IST [24379-6] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:44 IST [24379-7] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:49 IST [24379-8] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
Однако похоже, что он извлекал весь набор данных. Я ожидал запроса со смещением.
ie SELECT * FROM bigdata LIMIT 100 OFFSET 500
Похоже, все опрошено, и данные отправки отправляются частично.
Затем, пока выполняется вышеуказанная потоковая передача, я вставил новый набор данных в ту же таблицу.
Перед трансляцией
SELECT count(*) FROM bigdata -> 500
Затем вставил несколько строк
SELECT count(*) FROM bigdata -> 700
Но потоковая передача останавливается на 500. Похоже, это указывает на то, что новые данные никогда не извлекаются и не передаются обратно. Любые идеи, как потоковая передача работает в slick.