Что такое асинхронное выполнение? Прежде чем углубляться в модель асинхронного выполнения, давайте разберемся с моделью синхронного программирования.
В модели синхронного программирования поток выполнения выполняет задачи блокирующим образом. Это означает, что поток 1 начинает выполнение Задачи- 1 и не выбирает другую задачу до завершения выполнения Задачи-1 в своем контексте.
В асинхронной модели программирования поток1 может оставить выполнение задачи-1 посередине и передать его другому потоку (поток2) для выполнения следующей последовательности инструкций задачи-1. Thread2 снова может выполнить небольшую часть Task-1 и снова передать его другому потоку (thread3) и так далее. Под термином «поток» здесь я подразумеваю только потоки пользовательского уровня.
Такое чередование задачи может происходить и в однопользовательском потоковом приложении. В этой ситуации потоки ядра помогают в выполнении чередующихся задач.
Схематично это работает.
T1 - это основной поток, который выбирает одну задачу, выполняет ее частично, а затем задача выполняется в другом потоке. T1 не ждет завершения задачи Task-1.
Здесь подойдет реальный пример. Например, у вас есть сетевое приложение (сервер) с одним потоком выполнения. Он прослушивает один порт на предмет входящих событий (или сообщений). При синхронной обработке этот основной поток будет получать события и обрабатывать их. Любые другие сообщения, которые могут поступать в эту точку, будут поставлены в очередь просто потому, что нет потока для обслуживания этих событий или сообщений.
В асинхронном режиме принимающий поток просто получит сообщения в сети (возможно, частично обработает их, если требуется), а затем отправит сообщение в другой поток. Сразу после этого основной поток переходит к получению следующего события из сети. В многоядерной системе такой режим выполнения определенно приведет к повышению пропускной способности и производительности.
Это просто сетевой пример, и есть примеры из разных областей информатики.
Основным методом достижения такой асинхронности является пул потоков. Пул потоков - это пул потоков, ожидающих в очереди сообщений. Сообщения помещаются в очередь, и любой поток может выбирать сообщения из нее и выполнять их независимо. Если в очереди нет работы (или сообщений или событий), то все потоки в пуле потоков ожидают в пустой очереди.
Вы можете кодировать пул потоков, немного разбираясь в потоках и блокировках. И хорошо реализовать это с нуля, прежде чем использовать какую-либо существующую библиотеку.
В более поздней части этого сообщения в блоге я расскажу о библиотеке REST-API, названной cpprestsdk или Casablanca, разработанной Microsoft. Написан на C ++. В основном он используется для связи REST (HTTP) между веб-службами или микросервисами. Я не собираюсь объяснять, как вы можете использовать эту библиотеку для HTTP-запросов и связанной с ними асинхронной обработки, скорее я объясню, как вы можете использовать эту библиотеку для асинхронных задач общего назначения.
Прежде чем переходить к следующим разделам, вам необходимо установить последнюю версию Casablanca в вашей системе. Все следующие коды будут работать в системе на базе Linux (я не тестировал в Windows или Mac). Я тестировал их с Ubuntu 18.04. Я предоставлю инструкции по установке cpprestsdk в конце этого сообщения в блоге. Я предоставил файл сборки CMake для сборки этих кодов и создания исполняемых файлов.
Вы можете найти все следующие коды + CMakeFiles по следующей ссылке
Https://github.com/ddeka0/cppLearn/tree/master/cpprest/examples
Следуя наиболее распространенному синтаксису, вы увидите при использовании Casablanca.
pplx::task< return_type_x >( // a lambda function return someting_of_type "return_type_x" ) .then( // catch the return values from the previous lambda function // another lambda function ) .wait(); // wait for the chain of two lambda functions // to execute and finish
В этом примере первый блок (// лямбда-функция) будет выполняться в другом контексте потока. И когда эта лямбда-функция будет завершена, блок [.then] будет выполнен тем же или другим потоком.
Как вы могли догадаться, продолжение [.then] дает нам мощный способ обработки функций обратного вызова в модели асинхронного программирования.
За кулисами в Casablanca есть пул потоков, размер пула по умолчанию - 40. Следующий конкретный пример развеет многие ваши сомнения относительно правильного использования синтаксиса.
#include "unistd.h" #include "bits/stdc++.h" #include "cpprest/asyncrt_utils.h" using namespace std; using namespace utility; int main() { std::string x("Hello"); auto t1 = pplx::task<std::string>([&x]() // catch x { std::cout <<"Entry1 with TID : "<<std::this_thread::get_id() <<std::endl; sleep(2); // some dummy work return (x + std::string(" World !")); // return a std::string }) .then([](string x) // catch the return value of the prevTask or prevLambda function { sleep(1); // some dummy work std::cout << "Entry2 with TID : " <<std::this_thread::get_id() <<" : "<< x << std::endl; }); t1.wait(); }
Еще один пример, чтобы прояснить основы,
#include "unistd.h" #include "bits/stdc++.h" #include "cpprest/asyncrt_utils.h" using namespace std; using namespace utility; int main() { // This is one more way of submitting tasks to thread pool pplx::task_from_result() .then([]() { std::cout <<"Entry3 with TID : "<<std::this_thread::get_id() <<std::endl; return pplx::task_from_result<std::string>("Hello"); // return as task }) .then([](string x) // capture the return value from prevTask (as simple string) { std::cout << "Entry4 with TID : "<<std::this_thread::get_id()<<std::endl; std::vector<string> ret = {x,"World !"}; return ret; }) .then([](pplx::task<std::vector<string>> prevTask) // capturing as task (works) { std::vector<string> v_strings; try { v_strings = prevTask.get(); } catch (const std::exception& e) { std::cout << e.what() << std::endl; } for(auto &str:v_strings) std::cout << str <<" "; std::cout << std::endl; }).wait(); // please wait main thread, for these chain to complete }
Переходим к еще нескольким примерам.
В этом примере поток отправляет 20 задач в очередь и ожидает их завершения.
#include <bits/stdc++.h> #include "cpprest/asyncrt_utils.h" using namespace std; using namespace utility; #define NUM_ELEMENTS 20 /*This function takes a vector as input and process each element of the vector independently using the thread pool This function returns a vector of all the task handles (or threads) This vector will be used later to merge all the thread (using .wait()) */ pplx::task<std::vector<pplx::task<void>>> Process(std::vector<int> &v) { std::vector<pplx::task<void>> tids; for(auto &x:v) { auto t = pplx::task<void>([&x]() { x = rand()%100; // dummy processing int cnt = 1000000; while(cnt-- > 0) { rand(); } }); tids.push_back(t); } cout <<"return from Process()"<< endl; return pplx::task_from_result<std::vector<pplx::task<void>>>(tids); } int main() { srand(time(NULL)); std::vector<int> v; for(int i = 0;i<NUM_ELEMENTS;i++) { v.push_back(i); } Process(v) .then([](pplx::task<std::vector<pplx::task<void>>> prevTask) // see line number 50, here we capture the return value { std::vector<pplx::task<void>> tids; try { tids = prevTask.get(); } catch(const std::exception& e) { std::cout << e.what() << std::endl; } cout <<"waiting for worker threads to finish"<<endl; for(auto &t : tids) { t.wait(); } }) .wait(); cout << "All worker threads finished task"<< endl; for(auto &x:v) { std::cout << x <<" "; } cout << endl; }
Другой пример
Нам не нужно писать все только внутри блока [.then], мы также можем вызывать другие внешние функции. При необходимости мы можем передать некоторые аргументы этой внешней функции, как показано в примере ниже (std :: string («He»)).
#include <unistd.h> #include <bits/stdc++.h> #include "cpprest/asyncrt_utils.h" using namespace std; using namespace utility; /*Actual code starts from this point ......................... */ std::string f() { return std::string(" World !"); } int main() { auto g = [](std::string x) { return std::string(x + "llo"); }; pplx::task_from_result<std::string>("He") .then([g](std::string x) { auto ret = g(x) + f(); return ret; }) .then([](pplx::task<std::string> prevTask) { std::cout << prevTask.get() << std::endl; }).wait(); }
Последний пример. Это пример, показывающий, как вы можете использовать пул потоков для отправки событий.
#include <unistd.h> #include <bits/stdc++.h> #include "cpprest/asyncrt_utils.h" using namespace std; using namespace utility; /*Actual code starts from this point ......................... */ /*This file implements use case of thread pool using calablanca library */ void TaskA() { sleep(rand()%5); cout <<"taskA executing .." << endl; } void TaskB() { sleep(rand()%5); cout <<"taskB executing .." << endl; } std::map<int,std::function<void(void)>> taskIdMap = { {0, TaskA}, {1, TaskB} }; int main() { srand(time(NULL)); int cnt = 100; // 100 task to dispatch while(cnt--) { int taskId = rand()%2; // simulates an receipt of an event or task auto func = taskIdMap[taskId]; // select what to execute pplx::task<int>([func,taskId]() // check int return type { func(); return taskId; }) .then([](pplx::task<int> prevTask) // capture return value here from prevTask { auto id = prevTask.get(); cout <<(id == 0?"TaskA is done !":"TaskB is done !") <<endl; }); } cout <<"No more task to dispatch. Main thread is free now" <<endl; getchar(); // to wait main thread for all worker thread to finish their tasks }
Инструкции по установке:
Установка по умолчанию (менеджер пакетов apt) идет со старой версией cpprestsdk. В этой версии вы не можете изменить размер threadPool. Могут быть и другие отличия от новой версии. Но ключевым моментом для меня был размер пула потоков. Поэтому я предпочитаю использовать последнюю версию от Git.
Однако, если вы хотите быстро протестировать примеры кодов. Вот команда для установки cpprestsdk.
sudo apt-get install libcpprest-dev
— — — — — — — —
Следующие инструкции предназначены для новой версии cpprestsdk.
Если у вас уже установлены пакеты apt для cpprestsdk, удалите их с помощью следующих команд.
sudo apt-get purge libcpprest-dev sudo apt-get remove libcpprest-dev sudo apt autoremove
— — — — — — —
1. sudo apt-get install g++ git libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev libwebsocketpp-dev openssl libssl-dev ninja-build 2. git clone https://github.com/Microsoft/cpprestsdk.git casablanca 3. cd casablanca mkdir build.debug cd build.debug cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Debug ninja 4. (optional test check, needs an internet connection) cd Release/Binaries ./test_runner *_test.so 5. sudo ninja install (inside /casablanca/build.debug/) 6. echo $LD_LIBRARY_PATH (optional, setting this would work for only this shell session) (if the above output does not include Casablanca Release/Binaries, then do the following) export LD_LIBRARY_PATH=$LD_LIBRARY_PATH: (for example path could be /home/deka/Desktop/casablanca/build.debug/Release/Binaries ) 7. Install created binaries in /usr/lib/ 1. unset LD_LIBRARY_PATH (if you have done step 6) 2. sudo ldconfig 3. sudo /home/deka/Desktop/casablanca/build.debug/Release/Binaries/* /usr/lib (replace the path, with your path to binaries)
— — — — — — —
После успешной установки мы можем использовать следующий фрагмент кода, чтобы установить количество рабочих потоков Casablanca.
crossplat :: threadpool :: initialize_with_threads (200 или 300 и т. д.);
Ниже приведен пример программы для тестирования установки новой библиотеки с переменным размером пула потоков.
#include <bits/stdc++.h> #include <pplx/threadpool.h> #include <cpprest/asyncrt_utils.h> #define SLEEP_WORK 1 #define LOOP_WORK 0 #define THREAD_LOG 0 using namespace std; using namespace utility; using namespace crossplat; // required for threadpool::initialize_with_threads to work using namespace std::chrono; std::mutex mtx; int main() { // SET THREAD_LOG to 0 while doing these test //threadpool::initialize_with_threads(2); // takes 50 seconds in 4 cores with SLEEP_WORK //threadpool::initialize_with_threads(2); // takes 67 seconds in 4 cores with LOOP_WORK //threadpool::initialize_with_threads(4); // takes 25 seconds in 4 cores with SLEEP_WORK //threadpool::initialize_with_threads(4); // takes 21 seconds in 4 cores with LOOP_WORK threadpool::initialize_with_threads(25); // takes 4 seconds in 4 cores SLEEP_WORK //threadpool::initialize_with_threads(25); // takes 21 seconds in 4 cores LOOP_WORK //threadpool::initialize_with_threads(50); // takes 2 seconds in 4 cores SLEEP_WORK //threadpool::initialize_with_threads(50); // takes 21 seconds in 4 cores LOOP_WORK //threadpool::initialize_with_threads(100); // takes 1 seconds in 4 cores SLEEP_WORK //threadpool::initialize_with_threads(100); // takes 21 seconds in 4 cores LOOP_WORK // default is 40 threads // takes 3 seconds in 4 cores with SLEEP_WORK // takes 21 seconds in 4 cores with LOOP_WORK std::vector<pplx::task<void>> all_tasks; // to wait on tasks std::set<std::thread::id> Set; // to count the distinct threads got involved in tasks auto start = high_resolution_clock::now(); for(int i = 0;i<100;i++) { auto t = pplx::task<void>([&Set]() { #if SLEEP_WORK sleep(1); #endif #if LOOP_WORK int cnt = 1000000; // almost takes 1 sec while(cnt--) { rand(); } #endif #if THREAD_LOG mtx.lock(); std::cout <<"Hello from " <<std::this_thread::get_id()<<endl; /*cout could be outside lock also*/ Set.insert(std::this_thread::get_id()); mtx.unlock(); #endif }); all_tasks.push_back(t); } // wait for all task to finish // otherwise main thread would exit before casablanca // worker threads finish their tasks for(auto &tsk:all_tasks) { tsk.wait(); } auto stop = high_resolution_clock::now(); auto duration = duration_cast<seconds>(stop - start); cout <<"Total time taken = "<<duration.count()<<endl; #if THREAD_LOG cout <<Set.size()<<" distinct threads got involed in all the tasks and " <<"tota time taken = "<<duration.count()<<endl; /*Set.size() output should be 6 or less than 6 (less than 6), because sometimes same thread may pick up the work */ #endif }