List
낙천적인 동기화는 만족할 만한 결과가 나왔었다. 하지만 한번 더 리스트를 순회하여 오버헤드가 발생하는 문제점이 있었다.
이번에는 게으른 동기화를 알아보자
게으른 동기화
- 낙천적인 동기화는 lock의 횟수가 비약적으로 감소했으나, 리스트를 두 번 순회해야 한다는 눈에 보이는 오버헤드가 있다.
- 이를 극복하여 다시 순회하지 않는 알고리즘을 작성하였다.
- validate()가 노드를 처음부터 순회하지 않고 validation을 수행한다.
- pred와 curr의 잠금은 여전히 필요하다.
낙천적인 동기화의 단점을 보안하는 방법은 정말 좋은 것 같다. 그런데 어떻게 처음부터 순회하지 않고 판단을 할까?
- 게으른 동기화
- 각 노드에서 marked필드를 추가하여 그 노드가 집합에서 제거되어 있는지 표시한다.
- marked가 ture이면 제거되었다는 표시
- marking을 실제 제거 보다 반드시 먼저 수행한다.
- 순회를 할 때 대상 노드를 잠글 필요가 없고 노드가 head에서 접근할 수 있는지 확인하기 위해 전체 리스트를 다시 순회하지 않아도 된다.
- 각 노드에서 marked필드를 추가하여 그 노드가 집합에서 제거되어 있는지 표시한다.
간단히 말해서 삭제를 할때 삭제를 시키는게 아닌 마크를 시키고 삭제가 되었다고 생각하는 것이다. 이러면 확실히 삭제를 하는 도중에 추가를 해도 아무런 문제 없이 추가가 되고 스레드 간에 간섭이 확 줄것이다.
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#define MAX 10000
using namespace std;
using namespace std::chrono;
class Node
{
public:
int key;
Node* next;
mutex m;
bool is_removed;
Node() { key = 0; next = nullptr; is_removed = false; }
Node(int key)
{
this->key = key;
next = nullptr;
is_removed = false;
}
~Node() = default;
void lock()
{
m.lock();
}
void unlock()
{
m.unlock();
}
};
class List
{
public:
Node head, tail;
mutex lock;
List()
{
head.key = 0x80000000;
tail.key = 0x7FFFFFFF;
head.next = &tail;
}
~List() {}
void Clear()
{
Node* ptr;
while (head.next != &tail)
{
ptr = head.next;
head.next = head.next->next;
delete ptr;
}
}
bool Add(int key)
{
Node* pred;
Node* curr;
pred = &head;
curr = pred->next;
while (curr != &tail)
{
pred = curr;
curr = curr->next;
}
curr->lock();
pred->lock();
if (key == curr->key)
{
curr->unlock();
pred->unlock();
return false;
}
else
{
Node* node = new Node(key);
node->next = curr;
pred->next = node;
curr->unlock();
pred->unlock();
return true;
}
}
bool Remove(int key)
{
Node* pred;
Node* curr;
pred = &head;
curr = pred->next;
while (curr != &tail)
{
if (curr->key == key)
{
break;
}
pred = curr;
curr = curr->next;
}
if (curr == &tail)
{
return false;
}
pred->lock();
curr->lock();
curr->is_removed = true;
pred->lock();
curr->lock();
}
void Print()
{
Node* curr = head.next;
while (curr != &tail)
{
if (!curr->is_removed)
{
cout << curr->key << " ";
}
curr = curr->next;
}
cout << endl;
}
void Print(int count)
{
Node* curr = head.next;
for (int i = 0; i < count; i++)
{
if (!curr->is_removed)
{
cout << curr->key << " ";
}
curr = curr->next;
}
cout << endl;
}
};
List temp_list;
void Thread(int thread_count)
{
for (int i = 0; i < MAX / thread_count; i++)
{
temp_list.Add(i);
}
}
int main()
{
for (int i = 1; i <= 8; i *= 2)
{
temp_list.Clear();
steady_clock::time_point start_time = high_resolution_clock::now();
vector<thread> v;
for (int j = 0; j < i; j++)
{
v.emplace_back(thread(Thread, i));
}
for (int j = 0; j < v.size(); j++)
{
v[j].join();
}
steady_clock::time_point end_time = high_resolution_clock::now();
auto time = end_time - start_time;
int result_time = duration_cast<milliseconds>(time).count();
temp_list.Print(20);
cout << "스레드 갯수 : " << i << endl;
cout << "걸린 시간 : " << result_time << endl;
}
}
저번이랑 다르게 확인하는 부분이 사라졌다. 대신 삭제 했을때 락을 걸어 그냥 is_remove만 ture로 만들어 놓은다.
그리고 읽을때 is_remove만 확인한 뒤 갖고온다.
결과는
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
스레드 갯수 : 1
걸린 시간 : 210
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
스레드 갯수 : 2
걸린 시간 : 104
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
스레드 갯수 : 4
걸린 시간 : 54
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
스레드 갯수 : 8
걸린 시간 : 11
무쳤다…. 진짜 빨라졌다…
하지만 문제가 있다. 이 is_remove가 된 노드들이 많으면 나중에 쌓이고 이 노드들은 언젠가 문제를 일으키게 될것이다.