ТЕХНОЛОГИЯ ЭКСПЕДИА ГРУПП - ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ
Не пропускать очередь
Попался в настройке Java ThreadPoolExecutor

Недавно мы обнаружили, что если вы неправильно настроите программу-исполнитель Java, он будет отклонять запросы, оставаясь при этом бездействующим!
При интеграции двух используемых здесь в Expedia Group ™ сервисов большого объема мы реализовали обработку запросов с выделенным пулом потоков, чтобы ограничить параллелизм и повысить устойчивость. Мы установили минимальный размер пула на 20 потоков, максимальный - на 300. Мы также хотели избежать очереди на стороне клиента - вместо этого мы предпочли бы изящную деградацию - поэтому мы настроили workQueue на размер 1.
Это работало нормально, но во время нагрузочного тестирования мы заметили, что мы получаем несколько неудачных запросов из-заRejectedExecutionException:
java.util.concurrent.RejectedExecutionException: Task someInternalTaskName rejected from java.util.concurrent.ThreadPoolExecutor@2e95c205[Running, pool size = 300, active threads = 3, queued tasks = 0, completed tasks = 24301742]
! at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
! at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
! at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
Это не совсем неожиданно - об этом говорится в документации ThreadPoolExecutor:
Новые задачи, отправленные в методе
execute(java.lang.Runnable)будут отклонены ... , когда Исполнитель использует конечные границы как для максимального количества потоков, так и для емкости рабочей очереди и является насыщенным.
Но исключение конкретно говорит о том, что нет задач в очереди, и, глядя на наши метрики и дампы потоков, большинство наших рабочих потоков просто сидели около TIMED_WAITING.

Дампы потоков подтвердили, что все потоки были припаркованы в ожидании выполнения работы, а не заблокированы в ожидании ввода-вывода или любого другого очевидного ресурса. Итак, если все потоки в пуле ждут, почему он все еще отклоняет запросы?

threadPool.execute (фейспалм)
Ответ появился в методе ThreadPoolExecutor.execute(Runnable), который выглядит так (с некоторыми упрощениями и комментариями для ясности). Обратите внимание, что RejectedExecutionException происходит из последней показанной здесь строки.
public void execute(Runnable command) {
int poolState = ctl.get();
//If we have less than corePoolSize threads, always add a thread
if (workerCountOf(poolState) < corePoolSize) {
if (addWorker(command, true))
return;
poolState = ctl.get();
}
//Otherwise enqueue the item
if (workQueue.offer(command)) {
int recheck = ctl.get();
if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//If we're below max threadpool size, add a worker and exit
//If we're at max size, reject the command
else if (!addWorker(command, false)) {
reject(command);
}
}
Теперь вы видите проблему?
…
…
…
Команда никогда не передается напрямую потоку для выполнения. Вместо этого команда всегда попадает в очередь через workQueue.offer(command), и оттуда рабочие потоки удаляются из очереди. Если есть много потоков-производителей, вызывающих execute(), а workQueue ограничен (особенно, если он очень маленький), то возможно, что производители заполняют рабочую очередь до того, как какие-либо незанятые рабочие потоки смогут исключить из очереди. В этом случае RejectedExecutionExceptions будет выброшено , даже если есть много незанятых рабочих потоков.
Избежать проблемы можно несколькими способами.
- Сделать
workQueueдостаточно большим для буферизации рабочих элементов, чтобы у потребителей было время вывести их из очереди, прежде чем придут новые рабочие элементы (это вариант, который мы выбрали для этой службы) - Вместо этого используйте
SynchronousQueue, но учтите, что это может привести к блокировкеExecutor.execute(command) - Использование неограниченной очереди - но это означает, что элементы могут ждать в очереди сколь угодно долго, что может привести к исчерпанию ресурсов.
Если вы создаете пул с помощью конструктора ThreadPoolExecutor, вы можете выбрать именно ту стратегию, которую хотите. Исполнители, созданные из java.util.concurrent.Executors заводских методов, выбирают значения по умолчанию, но, что интересно, они не все одинаковы.
newFixedThreadPool()иnewSingleThreadExecutor()используют неограниченную рабочую очередьnewCachedThreadPool()используетSynchronousQueuenewWorkStealingPool()использует совершенно другую реализацию пула потоков,ForkJoinPool, которая имеет реализацию внутренней рабочей очереди, которая ограничена 64 миллионами элементов.

Общий случай
Наша конкретная проблема была в Java, но это всего лишь конкретный пример общей проблемы: если вы используете ограниченную очередь для подключения производителя к потребителю, то она должна быть достаточно большой, чтобы допускать задержку потребителя. В противном случае производители будут бороться за слот в очереди, даже если имеется много потребительской емкости.
И, наконец, стоит упомянуть, что ограничение ваших очередей - и всех ваших структур - является наилучшей практикой.