스마트 포인터로 처리할 수 없을까?
template<typename T>
class LockFreeStack
{
struct Node
{
Node(const T& value) : data(make_shared<T>(value)), next(nullptr)
{
}
// data와 next Node를 shared_ptr로 관리
shared_ptr<T> data;
shared_ptr<Node> next;
};
public:
void Push(const T& value)
{
// Node를 넣을때 make_shared한다
shared_ptr<Node> node = make_shared<Node>(value);
// 그냥 head를 읽어올 경우 atomic하지 않기에 std::atomic_load통해 읽어온다
node->next = std::atomic_load(&_head);
// 일반 atomic_compare_exchange_weak와 동작이 같음.
// atomic 하게 _head == node->next 인지 확인 후 _head = node 하라
while (std::atomic_compare_exchange_weak(&_head, &node->next, node) == false)
{
}
}
shared_ptr<T> TryPop()
{
shared_ptr<Node> oldHead = std::atomic_load(&_head);
while (oldHead && std::atomic_compare_exchange_weak(&_head, &oldHead, oldHead->next) == false)
{
}
if (oldHead == nullptr)
return shared_ptr<T>();
return oldHead->data;
}
// 이렇게 하면 delete를 굳이 하지 않아도 된다.
/*
그런데 shared_ptr자체가 lock_free한가???
-> 만약 shared_ptr자체에서 lock을하고 있다면 이게 무슨 의미인가???
shared_ptr<int32> ptr;
bool value = atomic_is_lock_free(&ptr); // false가 나옴, share_ptr자체에서 lock을 걸게된다.
*/
private:
shared_ptr<Node> _head;
};
실질적 reference counting으로 lockfree를 구현해보자
아래 내용이 엄청어려운데, 나는 왜 이런코드를 못 짜지 실망말자.
아래코드는 논문에서 나온내용을 코드로 구현, 오래된 학술의 결과이다.(보통사람은 아래처럼 잘 못짠다는 말)
template<typename T>
class LockFreeStack
{
struct Node;
struct CountedNodePtr
{
int32 externalCount = 0; // 포인터 참조 횟수
Node* ptr = nullptr;
};
struct Node
{
Node(const T& value) : data(make_shared<T>(value))
{
}
shared_ptr<T> data;
atomic<int32> internalCount = 0;
CountedNodePtr next;
};
public:
// [][][][][][][]
// [head]
void Push(const T& value)
{
CountedNodePtr node;
node.ptr = new Node(value);
node.externalCount = 1;
// [!]
node.ptr->next = _head;
while (_head.compare_exchange_weak(node.ptr->next, node) == false)
{
}
}
// [][][][][][][]
// [head]
shared_ptr<T> TryPop()
{
CountedNodePtr oldHead = _head;
while (true)
{
// 참조권 획득 (externalCount를 현 시점 기준 +1 한 애가 이김)
// externalCount이 기본 1이고 이함수를 지나면 기본 2가 된다.
// 만약 여러 스레드에서 동시 진입이 되었다면 2이상이 되겠지?
IncreaseHeadCount(oldHead);
// 최소한 externalCount >= 2 일테니 삭제X (안전하게 접근할 수 있는)
// 여기까지오면 externalCount을 증가 시켰기에 접근권(참조권)은 있는것.
Node* ptr = oldHead.ptr;
// 데이터 없음
if (ptr == nullptr)
return shared_ptr<T>();
// 소유권 획득 (ptr->next로 head를 바꿔치기 한 애가 이김)
if (_head.compare_exchange_strong(oldHead, ptr->next))
{
// 참조권을 획득한 첫 번째 Thread가 여기로 들어옴
shared_ptr<T> res;
res.swap(ptr->data);
// 데이터를 누가 쓰고있나 확인용
// external : 1 -> 2(나+1) -> 4(나+1 남+2)
// internal : 1 -> 0
const int32 countIncrease = oldHead.externalCount - 2;
// externalCount - 2을 하면 몇 명(스레드)이 참조권을 갖고 있는지 나온다.
// fetch_add에 얼마를 넣든 원래 값이 리턴됨
// 예를들어 internalCount = 0이고 fetch_add(2)를하면 리턴값은 0이고 이후 2로 변경
if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease)
delete ptr;
return res;
}
else if (ptr->internalCount.fetch_sub(1) == 1)
{
// 참조권은 있으나 소유권이 없는, 그러니깐 첫 번째 Thread이후의 Thread는 모두 여기로 온다.
// 참조권은 얻었으나, 소유권은 실패 -> 뒷수습은 내가 한다
delete ptr;
}
}
}
private:
void IncreaseHeadCount(CountedNodePtr& oldCounter)
{
while (true)
{
CountedNodePtr newCounter = oldCounter;
newCounter.externalCount++;
if (_head.compare_exchange_strong(oldCounter, newCounter))
{
oldCounter.externalCount = newCounter.externalCount;
break;
}
}
}
private:
atomic<CountedNodePtr> _head;
};