ТЕХНОЛОГИЯ ЭКСПЕДИА ГРУПП - ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ
Не пропускать очередь
Попался в настройке Java ThreadPoolExecutor
Недавно мы обнаружили, что если вы неправильно настроите программу-исполнитель Java, он будет отклонять запросы, оставаясь при этом бездействующим!
При интеграции двух используемых здесь в Expedia Group ™ сервисов большого объема мы реализовали обработку запросов с выделенным пулом потоков, чтобы ограничить параллелизм и повысить устойчивость. Мы установили минимальный размер пула на 20 потоков, максимальный - на 300. Мы также хотели избежать очереди на стороне клиента - вместо этого мы предпочли бы изящную деградацию - поэтому мы настроили workQueue
на размер 1.
Это работало нормально, но во время нагрузочного тестирования мы заметили, что мы получаем несколько неудачных запросов из-заRejectedExecutionException
:
java.util.concurrent.RejectedExecutionException: Task someInternalTaskName rejected from java.util.concurrent.ThreadPoolExecutor@2e95c205[Running, pool size = 300, active threads = 3, queued tasks = 0, completed tasks = 24301742]
! at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
! at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
! at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
Это не совсем неожиданно - об этом говорится в документации ThreadPoolExecutor:
Новые задачи, отправленные в методе
execute(java.lang.Runnable)
будут отклонены ... , когда Исполнитель использует конечные границы как для максимального количества потоков, так и для емкости рабочей очереди и является насыщенным.
Но исключение конкретно говорит о том, что нет задач в очереди, и, глядя на наши метрики и дампы потоков, большинство наших рабочих потоков просто сидели около TIMED_WAITING.
Дампы потоков подтвердили, что все потоки были припаркованы в ожидании выполнения работы, а не заблокированы в ожидании ввода-вывода или любого другого очевидного ресурса. Итак, если все потоки в пуле ждут, почему он все еще отклоняет запросы?
threadPool.execute (фейспалм)
Ответ появился в методе ThreadPoolExecutor.execute(Runnable)
, который выглядит так (с некоторыми упрощениями и комментариями для ясности). Обратите внимание, что RejectedExecutionException
происходит из последней показанной здесь строки.
public void execute(Runnable command) { int poolState = ctl.get(); //If we have less than corePoolSize threads, always add a thread if (workerCountOf(poolState) < corePoolSize) { if (addWorker(command, true)) return; poolState = ctl.get(); } //Otherwise enqueue the item if (workQueue.offer(command)) { int recheck = ctl.get(); if (workerCountOf(recheck) == 0) addWorker(null, false); } //If we're below max threadpool size, add a worker and exit //If we're at max size, reject the command else if (!addWorker(command, false)) { reject(command); } }
Теперь вы видите проблему?
…
…
…
Команда никогда не передается напрямую потоку для выполнения. Вместо этого команда всегда попадает в очередь через workQueue.offer(command)
, и оттуда рабочие потоки удаляются из очереди. Если есть много потоков-производителей, вызывающих execute()
, а workQueue
ограничен (особенно, если он очень маленький), то возможно, что производители заполняют рабочую очередь до того, как какие-либо незанятые рабочие потоки смогут исключить из очереди. В этом случае RejectedExecutionExceptions
будет выброшено , даже если есть много незанятых рабочих потоков.
Избежать проблемы можно несколькими способами.
- Сделать
workQueue
достаточно большим для буферизации рабочих элементов, чтобы у потребителей было время вывести их из очереди, прежде чем придут новые рабочие элементы (это вариант, который мы выбрали для этой службы) - Вместо этого используйте
SynchronousQueue
, но учтите, что это может привести к блокировкеExecutor.execute(command)
- Использование неограниченной очереди - но это означает, что элементы могут ждать в очереди сколь угодно долго, что может привести к исчерпанию ресурсов.
Если вы создаете пул с помощью конструктора ThreadPoolExecutor
, вы можете выбрать именно ту стратегию, которую хотите. Исполнители, созданные из java.util.concurrent.Executors
заводских методов, выбирают значения по умолчанию, но, что интересно, они не все одинаковы.
newFixedThreadPool()
иnewSingleThreadExecutor()
используют неограниченную рабочую очередьnewCachedThreadPool()
используетSynchronousQueue
newWorkStealingPool()
использует совершенно другую реализацию пула потоков,ForkJoinPool
, которая имеет реализацию внутренней рабочей очереди, которая ограничена 64 миллионами элементов.
Общий случай
Наша конкретная проблема была в Java, но это всего лишь конкретный пример общей проблемы: если вы используете ограниченную очередь для подключения производителя к потребителю, то она должна быть достаточно большой, чтобы допускать задержку потребителя. В противном случае производители будут бороться за слот в очереди, даже если имеется много потребительской емкости.
И, наконец, стоит упомянуть, что ограничение ваших очередей - и всех ваших структур - является наилучшей практикой.