2018年10月25日 星期四

C++ 學習筆記:async, future

Example:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include <future>
#include <thread>
#include <chrono>
#include <random>
#include <iostream>
#include <exception>
#include <list>

int randomSleepAndOutput(char c)
{
    // random-number generator (use c as seed to get different sequences)
    std::default_random_engine dre(c);
    std::uniform_int_distribution<int> id(10, 1000);

    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(id(dre)));
        std::cout.put(c).flush();
    }

    return c;
}

void parallelExecution()
{
    std::cout << "Starting lambda1 in background and lambda2 in foreground:" << std::endl;

    auto lambda1 = []{ return randomSleepAndOutput('b'); };
    auto lambda2 = []{ return randomSleepAndOutput('f'); };

    // Execute callable object 1 asynchronously (now or later or never):
    std::future<int> result1(std::async(lambda1));

    // Execute callable object 2 synchronously (here and now)
    // If callable 1 is started, now we have a parallel execution
    int result2 = lambda2();

    // Wait for lambda1 to finish and add its result to result2
    int result = result1.get() + result2;

    std::cout << "\nlambda1() + lambda2() = " << result << std::endl;
}

void lazyEvaluation()
{
    auto lambda1 = []{ return randomSleepAndOutput('.'); };
    auto lambda2 = []{ return randomSleepAndOutput('+'); };

    std::future<int> result1(std::async(std::launch::deferred, lambda1));
    std::future<int> result2(std::async(std::launch::deferred, lambda2));

    // Do something heavy and then evaluation
    std::default_random_engine dre(12345);
    std::uniform_int_distribution<int> id(1, 1000);
    auto ms = id(dre);
    std::this_thread::sleep_for(std::chrono::milliseconds(ms));

    // Lazy evaluation
    auto result = ms % 2 == 0 ? result1.get() : result2.get();
}

void throwExceptionTask()
{
    std::list<int> bigList;

    while (true) {
        for (int i = 0; i < 1000000; ++i) {
            bigList.push_back(i);
        }
        std::cout.put('.').flush();
    }
}

void parallelExecutionWithException()
{
    std::cout << "Starting 2 tasks" << std::endl;
    std::cout << " - task1: process endless loop of memory consumption" << std::endl;
    std::cout << " - task2: wait for <return> and then for throwExceptionTask" << std::endl;

    // Start throwExceptionTask asynchronously (now or later or never)
    auto f1 = std::async(throwExceptionTask);  

    std::cin.get();

    std::cout << "\nWait for the end of throwExceptionTask" << std::endl;

    try {
        // Wait for throwExceptionTask() to finish (raises exception if any)
        f1.get();  
    } catch (const std::exception& e) {
        std::cout << "EXCEPTION: " << e.what() << std::endl;
    }
}

void waitingForTwoTasks()
{
    std::cout << "Starting 2 operations asynchronously:" << std::endl;

    auto f1 = std::async([]{ randomSleepAndOutput('.'); });
    auto f2 = std::async([]{ randomSleepAndOutput('+'); });
    auto isOneOfThemRunning = [&f1, &f2]() {
        return f1.wait_for(std::chrono::seconds(0)) != std::future_status::deferred ||
               f2.wait_for(std::chrono::seconds(0)) != std::future_status::deferred;
    };
    auto isOneOfThemReady = [&f1, &f2]() {
        return f1.wait_for(std::chrono::seconds(0)) == std::future_status::ready ||
               f2.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
    };

    if (isOneOfThemRunning()) {
        while (!isOneOfThemReady()) {
        //...;
        std::this_thread::yield();  // hint to reschedule to the next thread
        }
    }
    std::cout.put('\n').flush();

    try {
        f1.get();
        f2.get();
    } catch (const std::exception& e) {
        std::cout << "\nEXCEPTION: " << e.what() << std::endl;
    }
    std::cout << "\ndone" << std::endl;
}

int main()
{
    parallelExecution();
    lazyEvaluation();
    parallelExecutionWithException();
    waitingForTwoTasks();
}


std::future<int> result1(std::async(lambda1));
  • async(): 提供一個介面,讓程式在允許情況下,使用一個獨立執行緒在背景執行 callable object
    • async 會試著啟動 thread 來執行 callable object
    • 不一定總是可在背景用獨立執行緒執行,可能不會被執行,例如單執行緒環境下
    • callable object 後的參數可以放欲傳入的引數,例如:std::async(someLambda, argx, argy);  or std::async(&X::mem, x , 42); // try to call x.mem(42)
  • future(): 提供介面讓使用者等待 thread 完成執行,並且可以存取執行結果(回傳值或例外)
    • 藉由 callable object 的 return type 來 Specialize
    • 若 callable object 僅背景執行任務,則為 std::future<void>
    • 透過 get() or wait(),可強迫 callable object 被執行
    • future 與 async callable object 交換 data 的機制,是透過共同存取一個 "shared state"


int result = result1.get() + result2;
  • get(): 
    • 呼叫 get 時,以下三種可能情況會發生:
      • callable 已經透過 async() 在另一個 thread 啟動,且已經完成執行,會立即得到結果
      • callable 已經透過 async() 在另一個 thread 啟動,但尚未執行完畢,get() 會 block 直到執行結束並產生結果
      • callable 尚未開始執行,呼叫 get 會強制 callable 開始執行(如同 synchronous 呼叫),get 會 block 直到執行結束並產生結果
    • 以上行為直接保證在單執行緒的環境下,也可得到正確的執行結果
    • 若沒有呼叫 get(),async 亦沒有執行 callable,則 main() 結束時此函式會完全沒有被執行
    • get 只可以被呼叫一次。呼叫後,future 進入 invalid 狀態
  • 平行化程式的撰寫方法(使其亦相容於單執行緒環境):
    1. include <future>
    2. 可平行化執行的 callable 傳給 async
    3. async 結果指派給 future<ReturnType>
    4. 當想需要 callable 結果或是想要確保被啟動的 callable 在此處完成時,呼叫 future 的 get()

std::future<int> result1(std::async(lambda1));
int result = lambda2() + result1.get();
  • 上述寫法,看似可以透過 async 啟動 lambda1,再透過第二行 synchronous 執行 lambda2,但其實有可能完全得不到平行化的好處。因為右側 evaluate 的順序並沒有規定,有可能先執行 get() 進而 synchronous 執行 lambda1,再執行 lambda2
  • Call early and return late
    • 即最大化呼叫 async 與 get 之間的距離
    • 可得到平行化最好的效果

std::future<int> result1(std::async(std::launch::async, lambda1));
  • Launch policy
    • std::launch::async
      • 強迫立即 asynchronously 啟動或丟出 std::system_error 的例外
      • 不需要再呼叫 get(),因為若 future 的 lifetime 結束時,程式會等待 lambda1 執行完成
      • std::async(std::launch::async, ...) 沒有指派給 future,則變成 synchronous 呼叫,會 block 直到 lambda1 執行結束
    • std::launch::deferred
      • defer lambda1 直到 get 被呼叫
      • 可用來實作 lazy evaluation
    • 是 scoped enumeration,所以需要 std::launch or launch



// Try to call lambda1 asynchronously
std::future<int> result(std::async(lambda1));

...;

// Wait for lambda1 to be done(might start it)
result.wait();
  • wait()
    • 相較於 get 只能執行一次,wait 提供一個介面可以等待 callable 執行完成,同時不會去存取執行結果
    • 效果: 強迫 future 對應的 thread 啟動,且等待至其執行結束
    • wait 系列介面,可被多次呼叫
  • wait_for(), wait_until()
    • 若 callable 尚未啟動,不會強迫 future 對應的 thread 啟動
    • wait_for 可帶入 duration
    • wait_until 可帶入 timepoint
    • 兩者皆會回傳以下其一:
      • std::future_status::deferred: async 延遲執行,且沒有 wait 或 get 被呼叫而強迫啟動
      • std::future_status::timeout: 已啟動執行,但尚未執行完畢,且等待的人超過 timeout
      • std::future_status::ready: 執行完成
    • 可達到 Speculative Execution [推測執行]
      • 是一種最佳化技術,可利用空轉的時間去做其他事
    • 可以用來 "poll" 判斷背景任務是否已啟動或執行中:
      • f.wait_for(chrono::seconds(0)) != future_status::ready


share_future<int> f = async(CallableObj);
  • std::future 只可以呼叫一次 get,呼叫第二次會產生 undefined behavior(C++標準實作鼓勵丟出 std::future_error)
  • 需要多次存取結果值的情況,可用 std::shared_future
  • share_future<int> f = async(CallableObj).share();
  • 比較介面:
    • T future<T>::get();
    • const T& shared_future<T>::get()



Reference:

  1. The C++ Standard Library: A Tutorial and Reference




沒有留言:

張貼留言