뭐 지금 이 동기화 방법은 너무 어려워서 대학원가야 직접 증명할수 있지만 그래도 공부하는 차원에서 한번 만들어 보는게 좋겠….지….?
Non-Blocking
- Non-Blocking 구현이란?
- 게으른 동기화를 통해 만족할만한 멀티 스레드 향상을 얻었다.
- 하지만, 블로킹 구현이어서 성능 향상의 여지가 남아있고 Prioirty Inversion이나 Convoying에서 자유롭지 못하다.
- 논 블로킹 구현은 게으론 동기화에서 출발한다.
- 락과 오버헤드 최소화, 그리고 marking을 사용할 것이다. O(1)의 validation
- lock을 사용하지 않는다
- 서로 경쟁하는 thread는 cas로 승부를 낸다.
- 한 번의 cas로 승부 결정
- 이겼으면 무조건 method가 성공적으로 종료해야 한다.
- 적어도 이전보다는 더 진전된 상태로 바꿔야 한다.
- 만약 패배했으면?
- cas가 실패하면 다른 스레드가 변경을 먼저 실행한 것이므로 변경을 포기해야 한다.
- cas가 실패했으므로 이 위치에서는 더 이상 작업할 수 없고 다른 위치를 찾아야 한다.
- 졌다는 이야기는 다른 스레드에서 먼저 자료구조를 수정했다는 이야기이므로, 지금까지 수집한 자료구조 정보를 더 이상 사용할 수 없고 다시 수집해야 한다.
지금 게으른 동기화에서 사용하고 있는 marking은 계속해서 사용해야 하고 lock을 제거해야 한다. 그리고 cas를 사용한다.
문제가 하나 있다. 실패한 즉시 루프를 돌면 안 된다. 의도가 잘못됐다. 예를 들어 자료구조가 A인 상태일 때 B 상태로 바꾸는 수정을 한 것이다. 이 상태에서 다른 스레드가 와서 A 상태였던 자료구조를 B가 아니라 C로 바꾸면 C로 수정된 결과가 B로 덮어진다.
따라서 수정사항을 모아 새로운 CAS를 해주어야 한다.
- CAS의 한계
- 한번에 하나의 변수밖에 바꾸지 못한다.
- 검색 횟수를 줄이기 위해선 marking이 필요하다
- 게으른 동기화와 비슷한 개념
- 여기서 마킹은 노드의 삭제이다.
- marking과 next의 atomic한 동시 변환이 가능해야 한다.
- 변형
- 하나의 변수에 주소와 marking을 동시에 저장
- marking변경용 cas 제공(attemptmark)
위에 말한 문제를 해결하려면 next가 7인가와 marking이 false인가를 동시에 같은시간에 확인해야 한다. 이건 못한다.
이런 문제를 해결하기 위해 주소와 marking이 있는 이 것을 두 군데 합쳐서 저장한뒤, cas한다. 이걸 attemp mark라는 함수라고 한다.
- Window에서 멀티 cas 구현
- CAS(oldmark, mark, oldnext, next)
- 32비트 주소 중 LSB를 마크로 사용(1비트 mark)
- next필드를 포인터로 직접 사용할 수 없게 되었으므로, 모든 next필드를 통한 node이동시 type 변환이 힐요하다
- 하지만 디버깅이 어려워진다.
일단 이미 구현되어 있는 함수들을 보자
bool LFNODE::CAS(int old_v, int new_v)
{
return atomic_compare_exchange_strong( reinterpret_cast<atomic_int *>(&next), &old_v, new_v);
}
bool LFNODE::CAS(LFNODE *old_node, LFNODE *new_node, bool oldMark, bool newMark)
{
int oldvalue = reinterpret_cast<int>(old_node);
if (oldMark)
oldvalue = oldvalue | 0x01;
else
oldvalue = oldvalue & 0xFFFFFFFE;
int newvalue = reinterpret_cast<int>(new_node);
if (newMark)
newvalue = newvalue | 0x01;
else
newvalue = newvalue & 0xFFFFFFFE; return CAS(oldvalue, newvalue);
}
복잡해 보이는데 cas를 하면 next를 old에서 new로 바꿔야 한다. old는 노드의 주소로 주소에 marking을 합성하면 old value가 나온다. 이 old value를 캐스팅해서 넣고 마크를 보고 0니아 1로 만들면 된다.
bool LFNODE::AttemptMark(LFNODE *old_node, bool newMark)
{
int oldvalue = reinterpret_cast<int>(old_node);
int newvalue = oldvalue;
if (newMark)
newvalue = newvalue | 0x01;
else
newvalue = newvalue & 0xFFFFFFFE;
return CAS(oldvalue, newvalue);
}
이건 next는 안건들고 마킹만 하는 함수이다.
와… 하루동안 키보드에 머리 박았는데 결국 구현 못했다………….. ㅠㅠ
어렵네….
이 곳에서 많이 배웠다.
실패!! 구현하는건 나중에 해보자…. 물어볼 사람도 없다 ㅠㅠ
허헣 망함 내일 그냥 이론만 갖고 오겠읍니다. ㅠㅠ
그냥 검증된 자료구조를 쓰자…ㅠㅜ
#include <chrono>
#include <stdio.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <atomic>
#define MAX 2000000
#define KEY_RANGE 200
using namespace std;
using namespace std::chrono;
volatile int sum;
class LFNODE;
class MPTR
{
private:
int value;
public:
void Set(LFNODE* node, bool removed)
{
value = reinterpret_cast<int>(node);
if (value)
{
value = value | 0x01;
}
else
{
value = value & 0xFFFFFFFE;
}
}
LFNODE* getptr()
{
return reinterpret_cast<LFNODE*> (value & 0xFFFFFFFE);
}
LFNODE* getptr(bool* removed)
{
int temp = value;
if ((temp & 0x1))
{
*removed = false;
}
else
{
*removed = true;
}
return reinterpret_cast<LFNODE*>(temp & 0xFFFFFFFE);
}
bool CAS(LFNODE* old_node, LFNODE* new_node, bool old_removed, bool new_removed)
{
int old_value, new_value;
old_value = reinterpret_cast<int>(old_node);
if (old_removed)
{
old_value = old_value | 0x01;
}
else
{
old_value = old_value & 0xFFFFFFFE;
}
new_value = reinterpret_cast<int>(new_node);
if (old_removed)
{
new_value = new_value | 0x01;
}
else
{
new_value = new_value & 0xFFFFFFFE;
}
return atomic_compare_exchange_strong(reinterpret_cast<atomic_int*>(&value), &old_value, new_value);
}
bool TryMarking(LFNODE* old_node, bool new_removed)
{
int old_value, new_value;
old_value = reinterpret_cast<int>(old_node);
old_value = old_value & 0xFFFFFFFE;
new_value = old_value;
if (new_removed)
{
new_value = new_value | 0x01;
}
return atomic_compare_exchange_strong(reinterpret_cast<atomic_int*>(&value), &old_value, new_value);
}
};
class LFNODE
{
public:
int key;
MPTR next;
LFNODE() { key = 0; next.Set(nullptr, false); }
LFNODE(int key_value)
{
next.Set(nullptr, false);
key = key_value;
}
~LFNODE() {}
};
class LFLIST
{
LFNODE head = LFNODE(), tail = LFNODE();
LFNODE* freelist;
LFNODE freetail;
mutex fl;
public:
LFLIST()
{
head.key = 0x80000000;
tail.key = 0x7FFFFFFF;
head.next.Set(&tail, false);
freetail.key = 0x7EEEEEEE;
freelist = &freetail;
}
~LFLIST() {}
void Init()
{
LFNODE* ptr;
while (head.next.getptr() != &tail)
{
ptr = head.next.getptr();
if (head.next.getptr()->next.getptr())
{
head.next = head.next.getptr()->next;
}
else break;
delete ptr;
}
}
void recycle_freelist()
{
LFNODE* p = freelist;
while (p != freelist)
{
LFNODE* n = p->next.getptr();
delete p;
p = n;
}
freelist = &freetail;
}
void find(int key, LFNODE* (&pred), LFNODE* (&curr))
{
retry:
pred = &head;
curr = pred->next.getptr();
while (true)
{
bool removed;
LFNODE* succ = curr->next.getptr(&removed);
while (true == removed)
{
if (false == pred->next.CAS(curr, succ, false, false))
goto retry;
//두단계 빠져나가게 하는 명령어가 c++ 에 없으니까. 다르게 할수도 있지만 복잡해진다.
curr = succ; succ = curr->next.getptr(&removed);
}
if (curr->key >= key)
{
return;
}
pred = curr;
curr = curr->next.getptr();
}
}
bool Add(int key)
{
LFNODE* pred = NULL, * curr = NULL;
while (true)
{
find(key, pred, curr);
if (key == curr->key)
{
return false;
}
else
{
LFNODE* node = new LFNODE(key);
node->next.Set(curr, false);
if (pred->next.CAS(curr, node, false, false))
{
return true;
}
}
}
}
bool Remove(int key)
{
LFNODE* pred, * curr;
//pred = &head;
//curr = pred->next.getptr();
bool snip;
while (true)
{
find(key, pred, curr);
if (key != curr->key)
{
return false;
}
else
{
LFNODE* succ = curr->next.getptr();
snip = curr->next.TryMarking(succ, true);
if (!snip)
{
continue;
}
pred->next.CAS(curr, succ, false, false);
return true;
}
}
}
bool Contains(int key)
{
LFNODE* pred;
LFNODE* curr;
bool marked = false;
curr = &head;
while (curr->key < key)
{
curr = curr->next.getptr();
LFNODE* succ = curr->next.getptr(&marked);
}
return curr->key == key && !marked;
}
void Display(int count)
{
LFNODE* p = head.next.getptr();
while (p != &tail)
{
cout << p->key << " ";
p = p->next.getptr();
count--;
if (count == 0)
{
break;
}
}
cout << endl;
}
};
LFLIST LFlist;
void Thread(int thread_count)
{
int key;
for (int i = 0; i < MAX / thread_count; i++)
{
switch (rand() % 3)
{
case 0:
key = rand() % KEY_RANGE;
LFlist.Add(key);
break;
case 1:
key = rand() % KEY_RANGE;
LFlist.Remove(key);
break;
case 2:
key = rand() % KEY_RANGE;
LFlist.Contains(key);
break;
default: cout << "Error\n";
exit(-1);
}
}
}
int main()
{
for (int i = 0; i < 8; i * 2)
{
LFlist.Init();
vector<thread> threads(i);
steady_clock::time_point start_time = high_resolution_clock::now();
for (int j = 0; j < i; j++)
{
threads[j] = thread(Thread, i);
}
steady_clock::time_point end_time = high_resolution_clock::now();
auto time = end_time - start_time;
int result_time = duration_cast<milliseconds>(time).count();
LFlist.Display(20);
cout << "스레드 갯수 : " << i << endl;
cout << "걸린 시간 : " << result_time << endl;
}
return 0;
}
실패한 코드다… 허헣
몇일동안 진짜 엄청 했는데 고치는 방법을 모르겠다 ㅠㅠ