다시 기존의 문제를 지적하자면
Job을 넣는(Push) 쪽과 Flush하는 쪽이 달라서 문제가 발생한다
int main()
{
// ...
while(true)
{
// 현재는 main 스레드에서 Job을 Flush중
GRoom->FlushJob();
}
}
- 역시 효율적이지 못하다
- Flush 대상이 수천 수백만개라면?
- Flush 대상이 아에없다면?
- Push하는 애가 Flush까지 하게 만든다면?
class JobQueue : public enable_shared_from_this<JobQueue>
{
public:
// Job을 이런식으로 넣고
void DoAsync(CallbackType&& callback)
{
Push(ObjectPool<Job>::MakeShared(std::move(callback)));
}
template<typename T, typename Ret, typename... Args>
void DoAsync(Ret(T::*memFunc)(Args...), Args... args)
{
shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
Push(ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...));
}
void ClearJobs() { _jobs.Clear(); }
private:
void Push(JobRef&& job);
void Execute();
protected:
LockQueue<JobRef> _jobs;
Atomic<int32> _jobCount = 0;
};
void JobQueue::Push(JobRef&& job)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK
// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
if (prevCount == 0)
{
Execute();
}
}
// 모두해결됐을까?? 아직은 아래와 같은 문제가 있음.
// 1) 일감이 너~무 몰리면?
// 2) DoAsync 타고 타고 가서~ 절대 끝나지 않는 상황 (일감이 한 쓰레드한테 몰림)
void JobQueue::Execute()
{
while (true)
{
Vector<JobRef> jobs;
_jobs.PopAll(OUT jobs);
const int32 jobCount = static_cast<int32>(jobs.size());
for (int32 i = 0; i < jobCount; i++)
jobs[i]->Execute();
// 남은 일감이 0개라면 종료
if (_jobCount.fetch_sub(jobCount) == jobCount)
{
return;
}
}
}
#pragma once
template<typename T>
class LockQueue
{
public:
void Push(JobRef item)
{
WRITE_LOCK;
_items.push(item);
}
T Pop()
{
WRITE_LOCK;
if (_items.empty())
return T();
T ret = _items.front();
_items.pop();
return ret;
}
void PopAll(OUT Vector<T>& items)
{
WRITE_LOCK;
while (T item = Pop())
items.push_back(item);
}
void Clear()
{
WRITE_LOCK;
_items = Queue<T>();
}
private:
USE_LOCK;
Queue<T> _items;
};