(C++ : Concurrency) 3. promise & future

Posted by : at

Category : Cpp


promise

새로운 thread의 결과를 쉽게 공유하는 방법
주의할 점은 promise 객체를 참조(&, &&)로 전달해야한다.

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

//void add(std::promise<int>&& p, int a, int b)
void add(std::promise<int>& p, int a, int b)
{
    int s = a + b;
    std::this_thread::sleep_for(1s);
    p.set_value(s);                     // 지금 바로 값을 넣어주세요
    // p.set_value_at_thread_exit(s);   // 스레드가 끝날때 값을 넣어주세요
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();
    //std::thread t(add, std::move(pm), 10, 20);
    std::thread t(add, std::ref(pm), 10, 20);

    // ...

    int ret = ft.get();
    std::cout << ret << std::endl;
    t.join();
}
  • promise에 값을 set
  • future에서 get

exception이 발생한다면?

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void divide(std::promise<int>&& p, int a, int b)
{
    try
    {
        if(b == 0) throw std::runtime_error("divide by zero");
        p.set_value(a/b);
    }
    catch(...)
    {
        p.set_exception(std::current_exception());  // 주 thread로 exception을 보낸다
    }
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();

    std::thread t(divide, std::move(pm), 10, 0);
    try
    {
        int ret = ft.get();
    }
    catch(std::exception& e)
    {
        std::cout << e.what() << std::endl;
    }

    t.join();
}
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <numeric>

int main()
{
    std::vector<int> v1 = {1,2,3,4,5,6,7,8,9,10};
    std::vector<int> v2(10);

    std::partial_sum(v1.begin(), v1.end(), v2.begin());
    // v1의 부분합을 구해서 v2에 넣어라

    int s = std::accumulate(v2.begin(), v2.end(), 0);
    // v2를 모두 더해라

    // 위 두 작업이 시간이 많이 걸린다면? 
    // thread로 따로 빼서 작업하는게 효율적일 것이다.

    for(auto n : v2)
        std::cout << n << ", ";

    std::cout << "\n" << s << std::endl;
}
int main()
{
    std::vector<int> v1 = {1,2,3,4,5,6,7,8,9,10};
    std::vector<int> v2(10);

    // 이렇게 하면될까?
    std::thread t([&]{
        std::partial_sum(v1.begin(), v1.end(), v2.begin());
        int s = std::accumulate(v2.begin(), v2.end(), 0);
    });

    // Nope! : t의 연산이 끝나기전 for문을 돌게된다
    for(auto n : v2)
        std::cout << n << ", ";

    std::cout << "\n" << s << std::endl;
    t.join();
}

Example

int main()
{
    std::vector<int> v1 = {1,2,3,4,5,6,7,8,9,10};
    std::vector<int> v2(10);

    std::promise<void> pm1;
    std::future<void> ft1 = pm1.get_future();

    std::promise<int> pm2;
    std::future<int> ft2 = pm2.get_future();
    
    std::thread t([&]{
        std::partial_sum(v1.begin(), v1.end(), v2.begin());
        pm1.set_value();    // 첫 번째 연산이 끝남을 알린다.
        int s = std::accumulate(v2.begin(), v2.end(), 0);
        pm2.set_value(s);
    });

    ft1.get();  // 첫 번째 연산대기
    for(auto n : v2)
        std::cout << n << ", ";

    int ret = ft2.get();
    std::cout << "\n" << s << std::endl;
    t.join();
}

future time-out

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void add(std::promise<int>&& p, int a, int b)
{
    std::this_thread::sleep_for(3s);
    p.set_value(a+b);
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();

    std::thread t(add, std::move(pm), 10, 20);
    
    //int n1 = ft.get();  // get은 무한정 대기 -> 특정시간만 대기하고 싶다면?
    std::future_status ret = ft.wait_for(2s);

    if(ret == std::future_status::ready)
        std::cout << "ready!" << std::endl;
    else if(ret == std::future_status::timeout)
        std::cout << "timeout!" << std::endl;
    else
        std::cout << "deferred!" << std::endl;

    t.join();
}

shared future

set_value를 한 값을 여러 thread에서 get하고 싶다

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void add(std::promise<int>&& p, int a, int b)
{
    std::this_thread::sleep_for(3s);
    p.set_value(a+b);
}

void consume(std::shared_future<int> sf)
{
    sf.get();
    std::cout << "finish foo" << std::endl;
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();
    //std::future<int> ft2 = ft;  // Error future는 복사가 안된다.
    std::shared_future<int> ft2 = ft.share();

    std::thread t(add, std::move(pm), 10, 20);

    std::thread t1(consume, ft2);
    std::thread t2(consume, ft2);

    t.join();
    t1.join();
    t2.join();
}

주의사항

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void add(std::promise<int>&& p, int a, int b)
{
    std::this_thread::sleep_for(3s);
    p.set_value(a+b);   // 1. set_value는 한 번만 할 수 있다.
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();  // 2. future는 한 번만 할 수 있다(여러개 꺼내고 싶다면 shared_future)
    std::thread t(add, std::move(pm), 10, 20);

    ft.get();   // 3. get도 한 번만 할 수 있다(get을 했는지 여부는 valid()로 확인)
    t.join();
}

std::packaged_task

#include <iostream>
#include <thread>
#include <future>

// promise를 썼다는게 이미 멀티 스레드를 염두에 둔 함수이다.
void add(std::promise<int>&& p, int a, int b)
{
    p.set_value(a+b);
}

// 만약 기존함수를 쓰고싶다면? -> packaged_task
int add(int a, int b)
{
    return a+b;
}

int main()
{
    std::promise<int> pm;
    std::future<int> ft = pm.get_future();
    std::thread t(add, std::move(pm), 10, 20);

    ft.get();
    t.join();

    // ----------------

    // packaged_task

    std::packaged_task<int(int, int)> task(add);
    std::future<int> ft = task.get_future();
    task(10, 20);   // 함수가 호출된다.
    std::thread t(std::move(task), 10, 20); // 멀티쓰레드를 쓰겠다면
    int ret = ft.get();
    t.join();
}

About Taehyung Kim

안녕하세요? 8년차 현업 C++ 개발자 김태형이라고 합니다. 😁 C/C++을 사랑하며 다양한 사람과의 협업을 즐깁니다. ☕ 꾸준한 자기개발을 미덕이라 생각하며 노력중이며, 제가 얻은 지식을 홈페이지에 정리 중입니다. 좀 더 상세한 제 이력서 혹은 Private 프로젝트 접근 권한을 원하신다면 메일주세요. 😎

Star
Useful Links