added mutex for single writer exclusion

This commit is contained in:
Krasimir Angelov
2022-06-29 16:28:01 +02:00
parent edb9ff33c5
commit 88477a8834
2 changed files with 167 additions and 79 deletions

View File

@@ -90,7 +90,8 @@ struct PGF_INTERNAL_DECL malloc_state
size_t file_size;
#ifndef _WIN32
pthread_mutex_t mutex;
pthread_mutex_t rev_mutex;
pthread_mutex_t write_mutex;
pthread_rwlock_t rwlock;
#else
/* Stores a Reader/Writer lock for Windows */
@@ -200,7 +201,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
base = ((unsigned char *) ms) + page_size;
#endif
#else
char *mutex_name;
char *rev_mutex_name;
char *write_mutex_name;
char *event_name;
char buf[256];
@@ -212,13 +214,19 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
close(fd);
throw pgf_systemerror(code);
}
mutex_name = buf;
sprintf(mutex_name,
"gf-mutex-%lx-%lx-%lx",
rev_mutex_name = buf;
sprintf(rev_mutex_name,
"gf-rev-mutex-%lx-%lx-%lx",
hInfo.dwVolumeSerialNumber,
hInfo.nFileIndexHigh,
hInfo.nFileIndexLow);
event_name = buf+strlen(mutex_name+1);
write_mutex_name = rev_mutex_name+strlen(rev_mutex_name+1);
sprintf(write_mutex_name,
"gf-write-mutex-%lx-%lx-%lx",
hInfo.dwVolumeSerialNumber,
hInfo.nFileIndexHigh,
hInfo.nFileIndexLow);
event_name = write_mutex_name+strlen(write_mutex_name+1);
sprintf(event_name,
"gf-rwevent-%lx-%lx-%lx",
hInfo.dwVolumeSerialNumber,
@@ -250,7 +258,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
throw pgf_systemerror(code, filepath);
}
} else {
mutex_name = NULL;
rev_mutex_name = NULL;
write_mutex_name = NULL;
event_name = NULL;
hMap = INVALID_HANDLE_VALUE;
ms = (malloc_state*) ::malloc(mmap_size);
@@ -260,8 +269,24 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
base = ((unsigned char *) ms) + page_size;
}
hMutex = CreateMutex(NULL, FALSE, mutex_name);
if (hMutex == NULL) {
hRevMutex = CreateMutex(NULL, FALSE, rev_mutex_name);
if (hRevMutex == NULL) {
code = last_error_to_errno();
if (fd < 0) {
::free(ms);
} else {
UnmapViewOfFile(ms);
CloseHandle(hMap);
}
::free((void *) this->filepath);
close(fd);
throw pgf_systemerror(code, filepath);
}
hWriteMutex = CreateMutex(NULL, FALSE, write_mutex_name);
if (hWriteMutex == NULL) {
CloseHandle(hRevMutex);
code = last_error_to_errno();
if (fd < 0) {
::free(ms);
} else {
@@ -275,7 +300,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
hRWEvent = CreateEvent(NULL, FALSE, FALSE, event_name);
if (hRWEvent == NULL) {
CloseHandle(hMutex);
CloseHandle(hWriteMutex);
CloseHandle(hRevMutex);
if (fd < 0) {
::free(ms);
} else {
@@ -307,7 +333,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
CloseHandle(hMap);
}
CloseHandle(hRWEvent);
CloseHandle(hMutex);
CloseHandle(hWriteMutex);
CloseHandle(hRevMutex);
#endif
::free((void *) this->filepath);
@@ -333,7 +360,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
CloseHandle(hMap);
}
CloseHandle(hRWEvent);
CloseHandle(hMutex);
CloseHandle(hWriteMutex);
CloseHandle(hRevMutex);
#endif
::free((void *) this->filepath);
@@ -379,7 +407,8 @@ PgfDB::~PgfDB()
CloseHandle(hMap);
}
CloseHandle(hRWEvent);
CloseHandle(hMutex);
CloseHandle(hWriteMutex);
CloseHandle(hRevMutex);
#endif
if (fd >= 0)
@@ -397,9 +426,9 @@ PGF_INTERNAL
object PgfDB::register_revision(object o, txn_t txn_id)
{
#ifndef _WIN32
pthread_mutex_lock(&ms->mutex);
pthread_mutex_lock(&ms->rev_mutex);
#else
WaitForSingleObject(hMutex, INFINITE);
WaitForSingleObject(hRevMutex, INFINITE);
#endif
bool found = false;
@@ -442,9 +471,9 @@ object PgfDB::register_revision(object o, txn_t txn_id)
size_t n_max = (page_size-sizeof(malloc_state))/sizeof(revision_entry);
if (ms->n_revisions >= n_max) {
#ifndef _WIN32
pthread_mutex_unlock(&ms->mutex);
pthread_mutex_unlock(&ms->rev_mutex);
#else
ReleaseMutex(hMutex);
ReleaseMutex(hRevMutex);
#endif
throw pgf_error("Too many retained database revisions");
}
@@ -462,9 +491,9 @@ object PgfDB::register_revision(object o, txn_t txn_id)
#endif
#ifndef _WIN32
pthread_mutex_unlock(&ms->mutex);
pthread_mutex_unlock(&ms->rev_mutex);
#else
ReleaseMutex(hMutex);
ReleaseMutex(hRevMutex);
#endif
return (found_entry - ms->revisions) + 1;
@@ -481,9 +510,9 @@ void PgfDB::unregister_revision(object revision)
throw pgf_error("Invalid revision");
#ifndef _WIN32
pthread_mutex_lock(&ms->mutex);
pthread_mutex_lock(&ms->rev_mutex);
#else
WaitForSingleObject(hMutex, INFINITE);
WaitForSingleObject(hRevMutex, INFINITE);
#endif
if (--entry->ref_count == 0) {
@@ -520,18 +549,18 @@ void PgfDB::unregister_revision(object revision)
#endif
#ifndef _WIN32
pthread_mutex_unlock(&ms->mutex);
pthread_mutex_unlock(&ms->rev_mutex);
#else
ReleaseMutex(hMutex);
ReleaseMutex(hRevMutex);
#endif
}
void PgfDB::cleanup_revisions()
{
#ifndef _WIN32
pthread_mutex_lock(&ms->mutex);
pthread_mutex_lock(&ms->rev_mutex);
#else
WaitForSingleObject(hMutex, INFINITE);
WaitForSingleObject(hRevMutex, INFINITE);
#endif
#ifdef DEBUG_MEMORY_ALLOCATOR
@@ -578,9 +607,9 @@ void PgfDB::cleanup_revisions()
#endif
#ifndef _WIN32
pthread_mutex_unlock(&ms->mutex);
pthread_mutex_unlock(&ms->rev_mutex);
#else
ReleaseMutex(hMutex);
ReleaseMutex(hRevMutex);
#endif
}
@@ -611,7 +640,12 @@ int PgfDB::init_state()
if ((res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
return res;
}
if ((res = pthread_mutex_init(&ms->mutex, &attr)) != 0) {
if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
pthread_mutexattr_destroy(&attr);
@@ -1293,6 +1327,12 @@ ref<PgfConcr> PgfDB::revision2concr(PgfConcrRevision revision, size_t *p_txn_id)
PGF_INTERNAL
void PgfDB::start_transaction()
{
#ifndef _WIN32
pthread_mutex_lock(&ms->write_mutex);
#else
WaitForSingleObject(hWriteMutex, INFINITE);
#endif
top = ms->top;
free_blocks = ms->free_blocks;
free_descriptors[0] = ms->free_descriptors;
@@ -1363,6 +1403,8 @@ void PgfDB::commit(object o)
#ifndef MREMAP_MAYMOVE
}
#endif
pthread_mutex_unlock(&ms->write_mutex);
#else
if (current_db->fd > 0) {
if (free_descriptors[2] != 0) {
@@ -1389,6 +1431,8 @@ void PgfDB::commit(object o)
throw pgf_systemerror(last_error_to_errno());
}
}
ReleaseMutex(hWriteMutex);
#endif
}
@@ -1402,6 +1446,12 @@ void PgfDB::rollback()
free_descriptors[2] = 0;
last_free_block = 0;
last_free_block_size = 0;
#ifndef _WIN32
pthread_mutex_unlock(&ms->write_mutex);
#else
ReleaseMutex(hWriteMutex);
#endif
}
#ifdef _WIN32
@@ -1448,45 +1498,40 @@ __forceinline bool AllClear(unsigned __int32 lock)
void PgfDB::lock(DB_scope_mode m)
{
if (m == READER_SCOPE) {
#ifndef _WIN32
int res =
(m == READER_SCOPE) ? pthread_rwlock_rdlock(&ms->rwlock)
: pthread_rwlock_wrlock(&ms->rwlock);
if (res != 0)
throw pgf_systemerror(res);
int res = pthread_rwlock_rdlock(&ms->rwlock);
if (res != 0)
throw pgf_systemerror(res);
#else
for (int i = 0; ; ++i) {
unsigned __int32 temp = ms->rwlock;
if (m == READER_SCOPE && !Writer(temp)) {
if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp)
return;
else
continue;
} else if (m == WRITER_SCOPE && AllClear(temp)) {
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp)
return;
else
continue;
} else {
if (i < MAX_SPIN) {
YieldProcessor();
continue;
for (int i = 0; ; ++i) {
unsigned __int32 temp = ms->rwlock;
if (!Writer(temp)) {
if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp)
return;
else
continue;
} else {
if (i < MAX_SPIN) {
YieldProcessor();
continue;
}
//The pending write operation is taking too long, so we'll drop to the kernel and wait
if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp)
continue;
i = 0; //Reset the spincount for the next time
WaitForSingleObject(hRWEvent, INFINITE);
do
{
temp = ms->rwlock;
} while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp);
}
//The pending write operation is taking too long, so we'll drop to the kernel and wait
if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp)
continue;
i = 0; //Reset the spincount for the next time
WaitForSingleObject(hRWEvent, INFINITE);
do
{
temp = ms->rwlock;
} while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp);
}
}
#endif
}
// If another process has resized the file we must resize the map
if (mmap_size != ms->file_size)
@@ -1514,20 +1559,6 @@ void PgfDB::unlock()
if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp)
break;
} else {
while(true) {
temp = ms->rwlock;
assert(Writer(temp));
if (WaitingCount(temp) == 0)
break;
//Note: this is thread-safe (there's guaranteed not to be another EndWrite simultaneously)
//Wake all waiting readers or writers, loop until wake confirmation is received
SetEvent(hRWEvent);
}
//Decrement writer count
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp)
break;
}
}
#endif
@@ -1536,7 +1567,11 @@ void PgfDB::unlock()
void PgfDB::resize_map(size_t new_size)
{
#ifndef _WIN32
unsigned char* new_base;
int res;
unsigned char* new_base;
if ((res = pthread_rwlock_wrlock(&ms->rwlock)) != 0)
throw pgf_systemerror(res);
// OSX does not implement mremap or MREMAP_MAYMOVE
#ifndef MREMAP_MAYMOVE
@@ -1570,6 +1605,34 @@ void PgfDB::resize_map(size_t new_size)
base = new_base;
#else
for (int i = 0; ; ++i) {
unsigned __int32 temp = ms->rwlock;
if (AllClear(temp)) {
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp)
return;
else
continue;
} else {
if (i < MAX_SPIN) {
YieldProcessor();
continue;
}
//The pending write operation is taking too long, so we'll drop to the kernel and wait
if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp)
continue;
i = 0; //Reset the spincount for the next time
WaitForSingleObject(hRWEvent, INFINITE);
do
{
temp = ms->rwlock;
} while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp);
}
}
new_size += page_size;
if (fd >= 0) {
UnmapViewOfFile(ms);
@@ -1605,12 +1668,34 @@ void PgfDB::resize_map(size_t new_size)
current_base = (unsigned char*) base;
mmap_size = new_size;
ms->file_size = new_size;
#ifndef _WIN32
if ((res = pthread_rwlock_unlock(&ms->rwlock)) != 0)
throw pgf_systemerror(res);
#else
while(true) {
temp = ms->rwlock;
assert(Writer(temp));
if (WaitingCount(temp) == 0)
break;
//Note: this is thread-safe (there's guaranteed not to be another EndWrite simultaneously)
//Wake all waiting readers or writers, loop until wake confirmation is received
SetEvent(hRWEvent);
}
//Decrement writer count
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp)
break;
#endif
}
DB_scope::DB_scope(PgfDB *db, DB_scope_mode m)
{
db->lock(m);
mode = m;
save_db = current_db;
current_db = db;
current_base = db->base;
@@ -1623,7 +1708,8 @@ DB_scope::~DB_scope()
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wterminate"
current_db->unlock();
if (mode == READER_SCOPE)
current_db->unlock();
current_db = save_db;
current_base = current_db ? current_db->base

View File

@@ -81,7 +81,8 @@ private:
#else
DWORD pid;
HANDLE hMap;
HANDLE hMutex;
HANDLE hRevMutex;
HANDLE hWriteMutex;
HANDLE hRWEvent;
#endif
@@ -162,6 +163,7 @@ public:
~DB_scope();
private:
DB_scope_mode mode;
PgfDB* save_db;
DB_scope* next_scope;
};