В Java 8 представлена ​​концепция параллельных потоков, которая позволяет использовать мощность многоядерных машин для повышения производительности. Параллельные потоки могут быть отличным инструментом для более эффективного выполнения задач, интенсивно использующих ЦП.

Итак, мы должны использовать параллельные потоки везде, верно?

Не обязательно.

При использовании параллельных потоков важно учитывать различные факторы и ограничения и взвешивать преимущества производительности, которые они могут предложить. В этой статье мы рассмотрим различные соображения и ограничения, которые следует учитывать при использовании параллельных потоков, а также потенциальный прирост производительности, который они могут обеспечить.

Последовательный поток против параллельного потока

Пакет Stream был представлен в Java 8 как мощный инструмент для обработки коллекций данных. Его не следует путать с потоками ввода-вывода Java, которые обрабатывают операции ввода и вывода.

Особенности потока включают в себя:

  • Потоки действуют как оболочка вокруг источника данных, такого как массив или коллекция, и предоставляют операции для работы с элементами источника данных.
  • Элементы потока можно посетить только один раз, после чего к ним нельзя получить доступ снова. Для повторного посещения тех же элементов необходимо создать новый поток.
  • Операции, выполняемые над потоком, не изменяют исходные данные, а вместо этого создают новый поток с результатом операции.
  • Потоки предоставляют широкий спектр функций, таких как фильтрация, сопоставление, поиск, сопоставление и т. д., которые помогают в обработке данных и манипулировании ими.

Последовательный поток:

Последовательные потоковые задачи выполняются последовательно на одном ядре, используя для обработки один поток. Поток, не указанный как параллельный, обрабатывается один за другим. Таким образом, последовательный поток не использует преимущества многоядерного процессора.

import java.util.Arrays;
import java.util.stream.IntStream;

public class StreamDemo {
 public static void main(String[] args) {
  StreamDemo streamDemo = new StreamDemo();

  streamDemo.sequentialStream();
 }

 public void sequentialStream() {
  System.out.println("Sequential Stream");
  System.out.println("-------------------------------------");
  this.createAnArrayStream(10)
    .forEach(this::processElement);
 }

 public void processElement(int i) {
  System.out.println("Element " + i + " Processing on " + Thread.currentThread().getName());
 }

 public IntStream createAnArrayStream(int size) {
  int[] result = new int[size];
  for (int i=0;i<size;i++){
   result[i]=i;
  }
  return Arrays.stream(result);
 }
}

Вывод для вышеуказанной программы:

Sequential Stream
-------------------------------------
Element 0 Processing on main
Element 1 Processing on main
Element 2 Processing on main
Element 3 Processing on main
Element 4 Processing on main
Element 5 Processing on main
Element 6 Processing on main
Element 7 Processing on main
Element 8 Processing on main
Element 9 Processing on main

Параллельный поток:

Параллельные потоки делят задачу на несколько подзадач и используют несколько ядер для их одновременной обработки. Эти потоки задаются вызовом метода parallelStream() на интерфейсе Collections или метода parallel() на интерфейсе BaseStream».

Порядок работы не гарантируется в paralleStream, но конечный результат будет таким же, как в sequentialStream.

import java.util.Arrays;
import java.util.stream.IntStream;

public class StreamDemo {
 public static void main(String[] args) {
  StreamDemo streamDemo = new StreamDemo();

  streamDemo.parallelStream();
 }

 public void parallelStream() {
  System.out.println("Parallel Stream");
  System.out.println("-------------------------------------");
  this.createAnArrayStream(10)
    .parallel()
    .forEach(this::processElement);
 }

 public void processElement(int i) {
  System.out.println("Element " + i + " Processing on " + Thread.currentThread().getName());
 }

 public IntStream createAnArrayStream(int size) {
  int[] result = new int[size];
  for (int i=0;i<size;i++){
   result[i]=i;
  }
  return Arrays.stream(result);
 }
}

Вывод для вышеуказанной программы:

Parallel Stream
-------------------------------------
Element 6 Processing on main
Element 5 Processing on main
Element 8 Processing on main
Element 9 Processing on main
Element 7 Processing on ForkJoinPool.commonPool-worker-2
Element 2 Processing on ForkJoinPool.commonPool-worker-1
Element 4 Processing on ForkJoinPool.commonPool-worker-2
Element 1 Processing on main
Element 0 Processing on ForkJoinPool.commonPool-worker-4
Element 3 Processing on ForkJoinPool.commonPool-worker-1

ForkJoinPool в параллельном потоке

Параллельные потоки используют инфраструктуру ForkJoinPool для разделения задачи на подзадачи и их одновременной обработки. Метод commonPool() используется для создания статического экземпляра ForkJoinPool, который используется внутри параллельного потока для управления подзадачами.

Фреймворк ForkJoinPool разбивает поток на несколько подзадач и обрабатывает их одновременно на разных ядрах. Затем он объединяет результаты подзадач для получения окончательного результата.

Сравнение производительности

Меньше увидеть сравнение производительности

Последовательный поток

Это пример кода, который мы будем использовать для оценки времени, необходимого для обработки последовательного потока.

import java.util.Arrays;
import java.util.stream.IntStream;

public class StreamDemo {
 public static void main(String[] args) {
  StreamDemo streamDemo = new StreamDemo();

  streamDemo.sequentialStream();
 }

 public void sequentialStream() {
  System.out.println("Sequential Stream");
  System.out.println("-------------------------------------");
  long startTime = System.currentTimeMillis();

  this.createAnArrayStream(1000)
    .forEach(this::processElement);

  long endTime = System.currentTimeMillis();
  System.out.println("Time took to process parallel stream: " +
    convertMilliSecToMinute(endTime - startTime) + " minutes");
 }

 public long convertMilliSecToMinute(long millis) {
  return millis/(1000 * 60);
 }

 public void processElement(int i) {
  try {
   Thread.sleep(1000);
  } catch (Exception exception) {
   System.out.println("Exception while processing element " + i + " exception " + exception);
  }
 }

 public IntStream createAnArrayStream(int size) {
  int[] result = new int[size];
  for (int i=0;i<size;i++){
   result[i]=i;
  }
  return Arrays.stream(result);
 }
}

Это вывод, который мы получаем

Sequential Stream
-------------------------------------
Time took to process parallel stream: 16 minutes

Параллельный поток

Это пример кода, который мы будем использовать для оценки времени, необходимого для обработки параллельного потока.

import java.util.Arrays;
import java.util.stream.IntStream;

public class StreamDemo {
 public static void main(String[] args) {
  StreamDemo streamDemo = new StreamDemo();

  streamDemo.parallelStream();
 }

 public void parallelStream() {
  System.out.println("Parallel Stream");
  System.out.println("-------------------------------------");
  long startTime = System.currentTimeMillis();

  this.createAnArrayStream(1000)
    .parallel()
    .forEach(this::processElement);

  long endTime = System.currentTimeMillis();
  System.out.println("Time took to process parallel stream: " +
    convertMilliSecToMinute(endTime - startTime) + " minutes");
 }

 public long convertMilliSecToMinute(long millis) {
  return millis/(1000 * 60);
 }

 public void processElement(int i) {
  try {
   Thread.sleep(1000);
  } catch (Exception exception) {
   System.out.println("Exception while processing element " + i + " exception " + exception);
  }
 }

 public IntStream createAnArrayStream(int size) {
  int[] result = new int[size];
  for (int i=0;i<size;i++){
   result[i]=i;
  }
  return Arrays.stream(result);
 }
}

Это вывод, который мы получаем

Parallel Stream
-------------------------------------
Time took to process parallel stream: 2 minutes

Здесь вы можете видеть, что параллельный поток работает на 14 минут быстрее, чем последовательный.

Несогласованное исключение в параллельном потоке

ForkJoinPool рекурсивно делит задачи на подзадачи и обрабатывает их одновременно. Если для подзадачи создается исключение, оно будет заключено в новое исключение, а исходное исключение будет установлено в качестве причины. Если в основном потоке возникнет исключение, оно будет вызвано как обычно.

См. Официальный комментарий.

Когда не использовать параллельный поток?

Хотя параллельные потоки потенциально могут повысить производительность, это не всегда лучший выбор для каждой задачи. Факторы, которые следует учитывать при принятии решения об использовании параллельного потока, включают:

  • Когда порядок важен: параллельные потоки могут не сохранять порядок элементов в исходных данных, поэтому, если порядок элементов важен для задачи, вместо этого следует использовать последовательный поток.
  • Когда время, необходимое для обработки одного элемента, очень мало: в этом случае накладные расходы на управление параллельным выполнением могут перевесить любой выигрыш в производительности.
  • Когда стоимость разделения потока высока: если источник данных неэффективен для разделения, например связанный список, стоимость разделения на подзадачи может перевесить любой прирост производительности.
  • Когда произведение NQ невелико: где N обозначает количество элементов исходных данных, а Q представляет объем вычислений, выполняемых для каждого элемента данных. Чем больше произведение N*Q, тем больше вероятность, что мы получим прирост производительности за счет распараллеливания. Эмпирическое правило состоит в том, что N должно быть больше 10 000 для задач с тривиально малым Q, таких как суммирование чисел.
  • Когда в системе недостаточно памяти или ресурсов для выполнения параллельной обработки.

Важно измерить и протестировать производительность кода, использующего параллельные потоки, и сравнить ее с производительностью последовательных потоков, прежде чем решить, какой из них использовать.

Использование общего пула в инфраструктуре ForkJoinPool параллельными потоками может привести к тому, что долго выполняющиеся задачи монополизируют доступные потоки, в результате чего другие операции параллельного потока будут заблокированы и будут ожидать доступности потоков. Это связано с тем, что общий пул является общим ресурсом, и все параллельные задачи используют этот пул для выполнения своих операций. Если одна длительная задача использует все потоки в пуле, никакая другая задача не может использовать эти потоки до тех пор, пока -беговая задача выполнена.

Например:
Допустим, у вас есть параллельная потоковая операция, обрабатывающая большой объем данных, например фильтрация и сопоставление большого списка элементов. Эта операция занимает много времени и использует большую часть доступных потоков в общем пуле.

Во время выполнения этой операции запускается другая операция параллельного потока, например сортировка меньшего списка элементов. Однако, поскольку общий пул уже интенсивно используется первой операцией, второй операции приходится ждать, пока потоки станут доступными, прежде чем она сможет начать обработку. В результате выполнение второй операции может занять больше времени, чем если бы она выполнялась отдельным пулом потоков.

Стоит отметить, что мы также можем создать собственный пул потоков, вызвав конструктор ForkJoinPool(int parallelism) вместо использования общего пула. Это позволит нам ограничить количество потоков, используемых нашими параллельными потоковыми операциями, чтобы избежать блокировки других операций, использующих общий пул.

Это все о «Java Parallel Stream». Отправьте нам свой отзыв, используя кнопку сообщения ниже. Ваши отзывы помогают нам создавать лучший контент для вас и других. Спасибо за прочтение!

Если вам понравилась статья, нажмите несколько раз кнопку 👏🏻 ниже. Чтобы показать свою поддержку!