Что такое асинхронное выполнение? Прежде чем углубляться в модель асинхронного выполнения, давайте разберемся с моделью синхронного программирования.

В модели синхронного программирования поток выполнения выполняет задачи блокирующим образом. Это означает, что поток 1 начинает выполнение Задачи- 1 и не выбирает другую задачу до завершения выполнения Задачи-1 в своем контексте.

В асинхронной модели программирования поток1 может оставить выполнение задачи-1 посередине и передать его другому потоку (поток2) для выполнения следующей последовательности инструкций задачи-1. Thread2 снова может выполнить небольшую часть Task-1 и снова передать его другому потоку (thread3) и так далее. Под термином «поток» здесь я подразумеваю только потоки пользовательского уровня.

Такое чередование задачи может происходить и в однопользовательском потоковом приложении. В этой ситуации потоки ядра помогают в выполнении чередующихся задач.

Схематично это работает.

T1 - это основной поток, который выбирает одну задачу, выполняет ее частично, а затем задача выполняется в другом потоке. T1 не ждет завершения задачи Task-1.

Здесь подойдет реальный пример. Например, у вас есть сетевое приложение (сервер) с одним потоком выполнения. Он прослушивает один порт на предмет входящих событий (или сообщений). При синхронной обработке этот основной поток будет получать события и обрабатывать их. Любые другие сообщения, которые могут поступать в эту точку, будут поставлены в очередь просто потому, что нет потока для обслуживания этих событий или сообщений.

В асинхронном режиме принимающий поток просто получит сообщения в сети (возможно, частично обработает их, если требуется), а затем отправит сообщение в другой поток. Сразу после этого основной поток переходит к получению следующего события из сети. В многоядерной системе такой режим выполнения определенно приведет к повышению пропускной способности и производительности.

Это просто сетевой пример, и есть примеры из разных областей информатики.

Основным методом достижения такой асинхронности является пул потоков. Пул потоков - это пул потоков, ожидающих в очереди сообщений. Сообщения помещаются в очередь, и любой поток может выбирать сообщения из нее и выполнять их независимо. Если в очереди нет работы (или сообщений или событий), то все потоки в пуле потоков ожидают в пустой очереди.

Вы можете кодировать пул потоков, немного разбираясь в потоках и блокировках. И хорошо реализовать это с нуля, прежде чем использовать какую-либо существующую библиотеку.

В более поздней части этого сообщения в блоге я расскажу о библиотеке REST-API, названной cpprestsdk или Casablanca, разработанной Microsoft. Написан на C ++. В основном он используется для связи REST (HTTP) между веб-службами или микросервисами. Я не собираюсь объяснять, как вы можете использовать эту библиотеку для HTTP-запросов и связанной с ними асинхронной обработки, скорее я объясню, как вы можете использовать эту библиотеку для асинхронных задач общего назначения.

Прежде чем переходить к следующим разделам, вам необходимо установить последнюю версию Casablanca в вашей системе. Все следующие коды будут работать в системе на базе Linux (я не тестировал в Windows или Mac). Я тестировал их с Ubuntu 18.04. Я предоставлю инструкции по установке cpprestsdk в конце этого сообщения в блоге. Я предоставил файл сборки CMake для сборки этих кодов и создания исполняемых файлов.

Вы можете найти все следующие коды + CMakeFiles по следующей ссылке

Https://github.com/ddeka0/cppLearn/tree/master/cpprest/examples

Следуя наиболее распространенному синтаксису, вы увидите при использовании Casablanca.

pplx::task< return_type_x >(
  
  // a lambda function
  
  return someting_of_type "return_type_x"
 
 )
 .then(

  // catch the return values from the previous lambda function
  
  // another lambda function
 
 )
 .wait(); // wait for the chain of two lambda functions 
   // to execute and finish

В этом примере первый блок (// лямбда-функция) будет выполняться в другом контексте потока. И когда эта лямбда-функция будет завершена, блок [.then] будет выполнен тем же или другим потоком.

Как вы могли догадаться, продолжение [.then] дает нам мощный способ обработки функций обратного вызова в модели асинхронного программирования.

За кулисами в Casablanca есть пул потоков, размер пула по умолчанию - 40. Следующий конкретный пример развеет многие ваши сомнения относительно правильного использования синтаксиса.

#include "unistd.h"
#include "bits/stdc++.h"
#include "cpprest/asyncrt_utils.h"
using namespace std;
using namespace utility;
int main() {

 std::string x("Hello");
 auto t1 = pplx::task<std::string>([&x]() // catch x
 {
     std::cout <<"Entry1 with TID : "<<std::this_thread::get_id()
      <<std::endl;
     sleep(2);  // some dummy work
     return (x + std::string(" World !")); // return a std::string
 })
 .then([](string x) // catch the return value of the prevTask or    prevLambda function
 { 
    sleep(1); // some dummy work
    std::cout << "Entry2 with TID : " <<std::this_thread::get_id()
    <<" : "<< x << std::endl;
 });
 t1.wait();
}

Еще один пример, чтобы прояснить основы,

#include "unistd.h"
#include "bits/stdc++.h"
#include "cpprest/asyncrt_utils.h"
using namespace std;
using namespace utility;
int main() {
 // This is one more way of submitting tasks to thread pool
pplx::task_from_result()
 .then([]()
 { 
     std::cout <<"Entry3 with TID : "<<std::this_thread::get_id()      <<std::endl;
     return pplx::task_from_result<std::string>("Hello"); // return as task  
 })
 .then([](string x) // capture the return value from prevTask (as simple string)
 { 
  std::cout << "Entry4 with TID : "<<std::this_thread::get_id()<<std::endl;
std::vector<string> ret = {x,"World !"};
  return ret;
 })
 .then([](pplx::task<std::vector<string>> prevTask) // capturing as task (works)
 {
  std::vector<string> v_strings; 
  try {
   v_strings = prevTask.get(); 
  }
  catch (const std::exception& e) { 
   std::cout << e.what() << std::endl;
  }
for(auto &str:v_strings)
   std::cout << str <<" ";
std::cout << std::endl;
}).wait(); // please wait main thread, for these chain to complete
}

Переходим к еще нескольким примерам.

В этом примере поток отправляет 20 задач в очередь и ожидает их завершения.

#include <bits/stdc++.h>
#include "cpprest/asyncrt_utils.h"
using namespace std;
using namespace utility;
#define NUM_ELEMENTS 20
/*This function takes a vector as input and process each element
of the vector independently using the thread pool 
This function returns a vector of all the task handles (or threads) 
This vector will be used later to merge all the thread (using .wait())
*/
pplx::task<std::vector<pplx::task<void>>> Process(std::vector<int> &v) {
 std::vector<pplx::task<void>> tids;
 for(auto &x:v) {
  auto t = pplx::task<void>([&x]()
  { 
   x = rand()%100; // dummy processing
   int cnt = 1000000;
   while(cnt-- > 0) {
    rand();
   }
  });
  tids.push_back(t);
 }
 cout <<"return from Process()"<< endl;
 return pplx::task_from_result<std::vector<pplx::task<void>>>(tids);
}
int main() { 
 srand(time(NULL));
 std::vector<int> v;
 for(int i = 0;i<NUM_ELEMENTS;i++) {
  v.push_back(i);
 }
 
 Process(v)
 .then([](pplx::task<std::vector<pplx::task<void>>> prevTask) // see line number 50, here we capture the return value
 {
  std::vector<pplx::task<void>> tids;
  try {
   tids = prevTask.get();
  }
  catch(const std::exception& e) {
   std::cout << e.what() << std::endl;
  }
  cout <<"waiting for worker threads to finish"<<endl;
  for(auto &t : tids) {
   t.wait();
  }
 })
 .wait();
 
 cout << "All worker threads finished task"<< endl;
 for(auto &x:v) {
  std::cout << x <<" ";
 }
 cout << endl;
}

Другой пример

Нам не нужно писать все только внутри блока [.then], мы также можем вызывать другие внешние функции. При необходимости мы можем передать некоторые аргументы этой внешней функции, как показано в примере ниже (std :: string («He»)).

#include <unistd.h>
#include <bits/stdc++.h>
#include "cpprest/asyncrt_utils.h"
using namespace std;
using namespace utility;
/*Actual code starts from this point ......................... */
std::string f() {
 return std::string(" World !");
}
int main() {
auto g = [](std::string x) {
  return std::string(x + "llo");
 };
pplx::task_from_result<std::string>("He")
 .then([g](std::string x)
 { 
  auto ret = g(x) + f();
  return ret;
 })
 .then([](pplx::task<std::string> prevTask)
 {
  std::cout << prevTask.get() << std::endl;
}).wait();
}

Последний пример. Это пример, показывающий, как вы можете использовать пул потоков для отправки событий.

#include <unistd.h>
#include <bits/stdc++.h>
#include "cpprest/asyncrt_utils.h"
using namespace std;
using namespace utility;
/*Actual code starts from this point ......................... */
/*This file implements use case of thread pool using calablanca library */
void TaskA() {
 sleep(rand()%5);
 cout <<"taskA executing .."  << endl;
}
void TaskB() {
 sleep(rand()%5);
 cout <<"taskB executing .."  << endl;
}
std::map<int,std::function<void(void)>> taskIdMap = {
 {0,  TaskA},
 {1,  TaskB}
};
int main() {
 srand(time(NULL));
 int cnt = 100; // 100 task to dispatch 
 while(cnt--) {
  int taskId = rand()%2; // simulates an receipt of an event or task
  auto func = taskIdMap[taskId]; // select what to execute
  pplx::task<int>([func,taskId]() // check int return type
  {
   func();
   return taskId;
})
  .then([](pplx::task<int> prevTask) // capture return value here from prevTask 
  {
   auto id = prevTask.get();
   cout  <<(id == 0?"TaskA is done !":"TaskB is done !")  <<endl;
  });
 }
 cout <<"No more task to dispatch. Main thread is free now" <<endl;
 getchar(); // to wait main thread for all worker thread to finish their tasks
}

Инструкции по установке:

Установка по умолчанию (менеджер пакетов apt) идет со старой версией cpprestsdk. В этой версии вы не можете изменить размер threadPool. Могут быть и другие отличия от новой версии. Но ключевым моментом для меня был размер пула потоков. Поэтому я предпочитаю использовать последнюю версию от Git.

Однако, если вы хотите быстро протестировать примеры кодов. Вот команда для установки cpprestsdk.

sudo apt-get install libcpprest-dev

— — — — — — — —

Следующие инструкции предназначены для новой версии cpprestsdk.

Если у вас уже установлены пакеты apt для cpprestsdk, удалите их с помощью следующих команд.

sudo apt-get purge libcpprest-dev
 
sudo apt-get remove libcpprest-dev
 
sudo apt autoremove

— — — — — — —

1.  sudo apt-get install g++ git libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev libwebsocketpp-dev openssl libssl-dev ninja-build
 
 
 
2.  git clone https://github.com/Microsoft/cpprestsdk.git casablanca
 
 
 
3.  cd casablanca
 
 mkdir build.debug
 
 cd build.debug
 
 cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Debug
 
 ninja
 
 
 
4. (optional test check, needs an internet connection) 
 
 cd Release/Binaries
 
 ./test_runner *_test.so
 
 
 
5. sudo ninja install (inside /casablanca/build.debug/)
 
 
 
6.  echo $LD_LIBRARY_PATH (optional, setting this would work for only this shell session)
 
 (if the above output does not include Casablanca Release/Binaries, then do the following)
 
 
 
 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:
 
 (for example path could be /home/deka/Desktop/casablanca/build.debug/Release/Binaries )
 
 
 
7. Install created binaries in /usr/lib/
 
 
 
 1. unset LD_LIBRARY_PATH (if you have done step 6)
 
 2. sudo ldconfig
 
 3. sudo /home/deka/Desktop/casablanca/build.debug/Release/Binaries/* /usr/lib
 
 (replace the path, with your path to binaries)

— — — — — — —

После успешной установки мы можем использовать следующий фрагмент кода, чтобы установить количество рабочих потоков Casablanca.

crossplat :: threadpool :: initialize_with_threads (200 или 300 и т. д.);

Ниже приведен пример программы для тестирования установки новой библиотеки с переменным размером пула потоков.

#include <bits/stdc++.h>
#include <pplx/threadpool.h>
#include <cpprest/asyncrt_utils.h>
#define SLEEP_WORK  1
#define LOOP_WORK   0
#define THREAD_LOG  0
using namespace std;
using namespace utility;
using namespace crossplat;  // required for threadpool::initialize_with_threads to work
using namespace std::chrono;
std::mutex mtx;
int main() {
    
    // SET THREAD_LOG to 0 while doing these test 
    //threadpool::initialize_with_threads(2);   // takes 50 seconds in 4 cores with SLEEP_WORK
    //threadpool::initialize_with_threads(2);   // takes 67 seconds in 4 cores with LOOP_WORK
    
    //threadpool::initialize_with_threads(4);   // takes 25 seconds in 4 cores with SLEEP_WORK
    //threadpool::initialize_with_threads(4);   // takes 21 seconds in 4 cores with LOOP_WORK
    
    threadpool::initialize_with_threads(25);   // takes 4 seconds in 4 cores SLEEP_WORK
    //threadpool::initialize_with_threads(25);   // takes 21 seconds in 4 cores LOOP_WORK
//threadpool::initialize_with_threads(50);   // takes 2 seconds in 4 cores SLEEP_WORK
    //threadpool::initialize_with_threads(50);   // takes 21 seconds in 4 cores LOOP_WORK
    
    //threadpool::initialize_with_threads(100);   // takes 1 seconds in 4 cores SLEEP_WORK
    //threadpool::initialize_with_threads(100);   // takes 21 seconds in 4 cores LOOP_WORK
    
    // default is 40 threads                    // takes 3 seconds in 4 cores with SLEEP_WORK
                                                // takes 21 seconds in 4 cores with LOOP_WORK
    
    std::vector<pplx::task<void>> all_tasks;    // to wait on tasks
    std::set<std::thread::id> Set;              // to count the distinct threads got involved in tasks
    
    auto start = high_resolution_clock::now();
    
    for(int i = 0;i<100;i++) {
        auto t = pplx::task<void>([&Set]()
        {
#if SLEEP_WORK
            sleep(1);
            #endif
#if LOOP_WORK
            
            int cnt = 1000000;  // almost takes 1 sec
            while(cnt--) {
                rand();
            }            
            
            #endif
#if THREAD_LOG            
            mtx.lock();
            std::cout <<"Hello from "
                <<std::this_thread::get_id()<<endl;
            /*cout could be outside lock also*/
            Set.insert(std::this_thread::get_id());
            mtx.unlock();
            #endif
});
        all_tasks.push_back(t);
    }
// wait for all task to finish
    // otherwise main thread would exit before casablanca
    // worker threads finish their tasks
    
    for(auto &tsk:all_tasks) {
        tsk.wait();
    }
    
    auto stop = high_resolution_clock::now(); 
    auto duration = duration_cast<seconds>(stop - start);
    cout <<"Total time taken = "<<duration.count()<<endl;
#if THREAD_LOG
    cout <<Set.size()<<" distinct threads got involed in all the tasks and "
        <<"tota time taken = "<<duration.count()<<endl;
    /*Set.size() output should be 6 or less than 6
    (less than 6), because sometimes same thread 
    may pick up the work */
    #endif
}