프로그래밍 지식/C++

C++문법 / 생산자 소비자 문제, Producer and Consumer Problem

게임이 더 좋아 2021. 12. 3. 21:54
반응형
728x170

Mutex로 deadlock을 걸리지 않게 하는 법을 배웠으니

왜 deadlock인 상황이 발생하고

왜 mutex를 걸어줘야하고

왜 thread를 이용해서 작업을 하는지 

몸소 느껴보자

 


 

먼저 코드를 볼까?

 

#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <mutex>  
#include <queue>
#include <string>


using namespace std;
using namespace chrono;


// 생산자 함수
//string을 담는 큐, 뮤텍스 객체, index(스레드 번호)를 매개변수로받음
void producer(queue<string>* downloaded_pages, mutex* m, int index) {
    for (int i = 0; i < 5; i++) {
        // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
        // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다. -> 실제로 다른 것이 아니라 다르다고 가정
        //100ms, 200ms, 300ms, 400ms, 500ms 라고 가정
        this_thread::sleep_for(milliseconds(100 * index));
        string content = "웹사이트 : " + to_string(i) + " from thread(" + to_string(index) + ")\n";

        // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
        // 해당 큐는 다른 스레드도 참조해서 lock이 필요함
        m->lock();
        downloaded_pages->push(content);
        m->unlock();
    }
}
// string을 담는 큐 ( producer와 같음), 뮤텍스 객체, 처리량
void consumer(queue<string>* downloaded_pages, mutex* m, int* num_processed) {
    // 25개가 될 때까지 진행
    while (*num_processed < 25) {
        m->lock();

        // 큐가 비어있으면 대기해야함
        if (downloaded_pages->empty()) {
            m->unlock();  // 큐에 접근하기 전 lock을 걸었으므로 큐에 걸린 락을 풀고 대기해야함

            // 10 밀리초 뒤에 다시 확인
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            continue;
        }

        // 큐에 무엇인가 들어있으면 해당 페이지를 꺼냄
        string content = downloaded_pages->front();
        downloaded_pages->pop();

        (*num_processed)++;
        m->unlock();//큐에서 빼냈으면 생산자가 일할 수 있도록 언락킹

        // content 를 처리한다. (80ms 정도 걸린다고 가정)
        cout << content;
        this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}


int main() {


    // 현재 다운로드한 페이지들 리스트
    queue<string> downloaded_pages;
    //뮤텍스 객체
    mutex m;

    //생산자 스레드 
    vector<thread> producers;

    int num_processed = 0;// 처리한 페이지 수
     //소비자 스레드
    std::vector<thread> consumers;

    steady_clock::time_point start = steady_clock::now();
   
    
    for (int i = 0; i < 5; i++) {
        producers.push_back(thread(producer, &downloaded_pages, &m, i + 1));
    }

    for (int i = 0; i < 3; i++) {
        consumers.push_back(thread(consumer, &downloaded_pages, &m, &num_processed));
    }


    //join 하고 종료
    for (int i = 0; i < 5; i++) {
        producers[i].join();
    }
    for (int i = 0; i < 3; i++) {
        consumers[i].join();
    }

    steady_clock::time_point end = steady_clock::now();


    duration<double> sec = end - start;
    cout << "elapsed time " << sec.count();

}

 

결과는???

 

 

 

2.6초 걸렸다.

무조건 150ms 가 5개 쉬는데.. 각각이 쉬어서 그런지 그렇게 별로 안걸렸다.

그리고 producer thread 를 5개 만들었는데

critical section인 download_pages에 대해선

해당 큐에 lock을 걸어서 lock을 건 스레드만이 작동할 수 있도록 하였다.

 

Consumer 입장에선 25개를 처리할 때까지 계속 스레드가 while문을 돌린다.

하지만 큐에 무엇인가 있어야만 처리할 수 있다.

아쉽지만 큐에 무엇인가 추가될 떄까지 해당 스레드는 계속 while문만 돌려야 한다.

**비효율적이긴 하다.

 


 

프로그래머라면 

모름지기 비효율적인 부분을 알고서는 지나갈 수 없는 법.

그래서 고쳐보기로 했다.

큐를 굳이 계속 들여다보는 것은 CPU 낭비이므로

해당 스레드는 큐가 비어있을 때까지 sleep하는 것이 어떨까?

 

이렇게 효율적인 작업을 해줄 수 있는 변수가 있으니

바로 "조건 변수", "condition_variable 이다.

 

큐에 무엇이 있을 때만 Consumer 스레드를 동작하게 해보자

 

바뀐 부분만 살짝씩 보자

void producer(queue<string>* downloaded_pages, mutex* m, int index, condition_variable* cv) {
    for (int i = 0; i < 5; i++) {
        ...

        //consumer thread를 깨움
        cv->notify_one();
    }
}


void consumer(queue<string>* downloaded_pages, mutex* m, int* num_processed, condition_variable* cv) {
    // 25개가 될 때까지 진행
    while (*num_processed < 25) {
        unique_lock<mutex> lk(*m);
        cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
        
        //25개를 모두 처리하면 스레드 동작 종료
        if (*num_processed == 25) {
            lk.unlock();
            return;
        }

        
        // 큐에 무엇인가 들어있으면 해당 페이지를 꺼냄
        string content = downloaded_pages->front();
        downloaded_pages->pop();

        (*num_processed)++;
        lk.unlock();//큐에서 빼냈으면 생산자가 일할 수 있도록 언락킹
		
        ...
    }
}


int main() {

    ...
    //조건 변수
    condition_variable cv;
    ...


    steady_clock::time_point start = steady_clock::now();
   
    
    for (int i = 0; i < 5; i++) {
        producers.push_back(thread(producer, &downloaded_pages, &m, i + 1, &cv));
    }

    for (int i = 0; i < 3; i++) {
        consumers.push_back(thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
    }


    //join 하고 종료
    for (int i = 0; i < 5; i++) {
        producers[i].join();
    }

    //깬 스레드가 작업을 다하고 종료해도
    //나머지 자고 있는 thread 반환받아야함
    cv.notify_all();


    for (int i = 0; i < 3; i++) {
        consumers[i].join();
    }
    ...



}

 

결과는???

 

우리가 의도한대로 바뀌었다.

하지만 시간은 줄지 않았네??

우리가 효율성을 증대시켰는데..?

우리는 CPU의 load를 줄인 것이지

해당 하는 일의 시간 효율성을 올리지는 않았다.

다만 낭비되는 CPU자원을 줄였다는 것이다.

반대로 그렇다면 CPU자원을 아꼈으니.. push 하는 thread를 하나 더 늘리면 어떨까?

당연히 빨리 집어넣으니 최종적으로 시간이 줄 것이다.

 

즉, 우리가 지금 당장 CPU의 자원을 아꼈다고 해서 바로 표가 나지는 않지만

어떻게든 이익으로 이어지므로 우리는 이렇게 설계하는 방향으로 나아가야한다.

 

아무튼 다시 돌아와서

 

무엇이 변하게 했는가?를 살펴보자

 

새로 생긴 것 중에는

unique_lock<mutex> lk(*m);

가 있다.

 

**unique_lock으로 선언한 이유는 그저 cv의 wait함수가 unique_lock을 매개변수로 받기 때문이다.

그리고 unique_lock은 lock_guard가 생성자에 의해서만 lock되는 것과 달리

unlock후 다시 lock을 걸 수도 있다.

 

그 아래는

cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

 

cv->wait의 역할은

뒤의 조건이 true가 될 때까지 wait을 하는 것이다.

 

다시 말하면 큐에 원소들이 있거나, 전리된 페이지의 개수가 25개 일 때 대기, wait를 끝낸다.

-> wait을 끝내면 해당 아래의 라인을 순차적으로 수행한다.

 

만약 false 라면 lk 를 unlock 한 뒤에, 영원히 sleep 한다.

-> 해당 스레드의 작업 중단이라고 생각하면 된다.

 

즉, 처음에 큐를 들여봐서 원소가 있거나 페이지 개수가 25개라면

우선 해당스레드는 sleep을 하게 만드는 것이다.

-> 아까 말대로 굳이 깨서 주기적으로 큐를 확인할 필요는 없다.

 

Producer에서는 하나가 추가되었다.

 

cv->notify_one();

 

cv에 notify라는 함수가 있나보다.

 

이 함수는 자고 있는 쓰레드 중 하나 (Consumer 겠지??) 를 깨워서 조건을 다시 검사하게 한다.

만약 큐에 원소가 있거나 25개라면 wait 함수에서 다시 sleep 하지 않고 나머지 동작을 할 것이다.

 

결국 우리는 Mutex를 실제 적용해보았고

Mutex에서 굳이 CPU를 점유하면서 낭비되는 연산을 막아보았다.

이게 멀티 스레딩 중에 가장 쉬운 것이다.

 

우선 나는 눈이 아프다.

컴퓨터를 너무 오래봤다.

잠깐 쉬다오자

 

 

728x90
반응형
그리드형