diff --git a/src/runtime/c/pgf/db.cxx b/src/runtime/c/pgf/db.cxx index 1eb6afe67..1d0b08125 100644 --- a/src/runtime/c/pgf/db.cxx +++ b/src/runtime/c/pgf/db.cxx @@ -90,7 +90,8 @@ struct PGF_INTERNAL_DECL malloc_state size_t file_size; #ifndef _WIN32 - pthread_mutex_t mutex; + pthread_mutex_t rev_mutex; + pthread_mutex_t write_mutex; pthread_rwlock_t rwlock; #else /* Stores a Reader/Writer lock for Windows */ @@ -200,7 +201,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { base = ((unsigned char *) ms) + page_size; #endif #else - char *mutex_name; + char *rev_mutex_name; + char *write_mutex_name; char *event_name; char buf[256]; @@ -212,13 +214,19 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { close(fd); throw pgf_systemerror(code); } - mutex_name = buf; - sprintf(mutex_name, - "gf-mutex-%lx-%lx-%lx", + rev_mutex_name = buf; + sprintf(rev_mutex_name, + "gf-rev-mutex-%lx-%lx-%lx", hInfo.dwVolumeSerialNumber, hInfo.nFileIndexHigh, hInfo.nFileIndexLow); - event_name = buf+strlen(mutex_name+1); + write_mutex_name = rev_mutex_name+strlen(rev_mutex_name+1); + sprintf(write_mutex_name, + "gf-write-mutex-%lx-%lx-%lx", + hInfo.dwVolumeSerialNumber, + hInfo.nFileIndexHigh, + hInfo.nFileIndexLow); + event_name = write_mutex_name+strlen(write_mutex_name+1); sprintf(event_name, "gf-rwevent-%lx-%lx-%lx", hInfo.dwVolumeSerialNumber, @@ -250,7 +258,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { throw pgf_systemerror(code, filepath); } } else { - mutex_name = NULL; + rev_mutex_name = NULL; + write_mutex_name = NULL; event_name = NULL; hMap = INVALID_HANDLE_VALUE; ms = (malloc_state*) ::malloc(mmap_size); @@ -260,8 +269,24 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { base = ((unsigned char *) ms) + page_size; } - hMutex = CreateMutex(NULL, FALSE, mutex_name); - if (hMutex == NULL) { + hRevMutex = CreateMutex(NULL, FALSE, rev_mutex_name); + if (hRevMutex == NULL) { + code = last_error_to_errno(); + if (fd < 0) { + ::free(ms); + } else { + UnmapViewOfFile(ms); + CloseHandle(hMap); + } + ::free((void *) this->filepath); + close(fd); + throw pgf_systemerror(code, filepath); + } + + hWriteMutex = CreateMutex(NULL, FALSE, write_mutex_name); + if (hWriteMutex == NULL) { + CloseHandle(hRevMutex); + code = last_error_to_errno(); if (fd < 0) { ::free(ms); } else { @@ -275,7 +300,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { hRWEvent = CreateEvent(NULL, FALSE, FALSE, event_name); if (hRWEvent == NULL) { - CloseHandle(hMutex); + CloseHandle(hWriteMutex); + CloseHandle(hRevMutex); if (fd < 0) { ::free(ms); } else { @@ -307,7 +333,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { CloseHandle(hMap); } CloseHandle(hRWEvent); - CloseHandle(hMutex); + CloseHandle(hWriteMutex); + CloseHandle(hRevMutex); #endif ::free((void *) this->filepath); @@ -333,7 +360,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { CloseHandle(hMap); } CloseHandle(hRWEvent); - CloseHandle(hMutex); + CloseHandle(hWriteMutex); + CloseHandle(hRevMutex); #endif ::free((void *) this->filepath); @@ -379,7 +407,8 @@ PgfDB::~PgfDB() CloseHandle(hMap); } CloseHandle(hRWEvent); - CloseHandle(hMutex); + CloseHandle(hWriteMutex); + CloseHandle(hRevMutex); #endif if (fd >= 0) @@ -397,9 +426,9 @@ PGF_INTERNAL object PgfDB::register_revision(object o, txn_t txn_id) { #ifndef _WIN32 - pthread_mutex_lock(&ms->mutex); + pthread_mutex_lock(&ms->rev_mutex); #else - WaitForSingleObject(hMutex, INFINITE); + WaitForSingleObject(hRevMutex, INFINITE); #endif bool found = false; @@ -442,9 +471,9 @@ object PgfDB::register_revision(object o, txn_t txn_id) size_t n_max = (page_size-sizeof(malloc_state))/sizeof(revision_entry); if (ms->n_revisions >= n_max) { #ifndef _WIN32 - pthread_mutex_unlock(&ms->mutex); + pthread_mutex_unlock(&ms->rev_mutex); #else - ReleaseMutex(hMutex); + ReleaseMutex(hRevMutex); #endif throw pgf_error("Too many retained database revisions"); } @@ -462,9 +491,9 @@ object PgfDB::register_revision(object o, txn_t txn_id) #endif #ifndef _WIN32 - pthread_mutex_unlock(&ms->mutex); + pthread_mutex_unlock(&ms->rev_mutex); #else - ReleaseMutex(hMutex); + ReleaseMutex(hRevMutex); #endif return (found_entry - ms->revisions) + 1; @@ -481,9 +510,9 @@ void PgfDB::unregister_revision(object revision) throw pgf_error("Invalid revision"); #ifndef _WIN32 - pthread_mutex_lock(&ms->mutex); + pthread_mutex_lock(&ms->rev_mutex); #else - WaitForSingleObject(hMutex, INFINITE); + WaitForSingleObject(hRevMutex, INFINITE); #endif if (--entry->ref_count == 0) { @@ -520,18 +549,18 @@ void PgfDB::unregister_revision(object revision) #endif #ifndef _WIN32 - pthread_mutex_unlock(&ms->mutex); + pthread_mutex_unlock(&ms->rev_mutex); #else - ReleaseMutex(hMutex); + ReleaseMutex(hRevMutex); #endif } void PgfDB::cleanup_revisions() { #ifndef _WIN32 - pthread_mutex_lock(&ms->mutex); + pthread_mutex_lock(&ms->rev_mutex); #else - WaitForSingleObject(hMutex, INFINITE); + WaitForSingleObject(hRevMutex, INFINITE); #endif #ifdef DEBUG_MEMORY_ALLOCATOR @@ -578,9 +607,9 @@ void PgfDB::cleanup_revisions() #endif #ifndef _WIN32 - pthread_mutex_unlock(&ms->mutex); + pthread_mutex_unlock(&ms->rev_mutex); #else - ReleaseMutex(hMutex); + ReleaseMutex(hRevMutex); #endif } @@ -611,7 +640,12 @@ int PgfDB::init_state() if ((res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { return res; } - if ((res = pthread_mutex_init(&ms->mutex, &attr)) != 0) { + if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) { + pthread_mutexattr_destroy(&attr); + return res; + } + if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) { + pthread_mutexattr_destroy(&attr); return res; } pthread_mutexattr_destroy(&attr); @@ -1293,6 +1327,12 @@ ref PgfDB::revision2concr(PgfConcrRevision revision, size_t *p_txn_id) PGF_INTERNAL void PgfDB::start_transaction() { +#ifndef _WIN32 + pthread_mutex_lock(&ms->write_mutex); +#else + WaitForSingleObject(hWriteMutex, INFINITE); +#endif + top = ms->top; free_blocks = ms->free_blocks; free_descriptors[0] = ms->free_descriptors; @@ -1363,6 +1403,8 @@ void PgfDB::commit(object o) #ifndef MREMAP_MAYMOVE } #endif + + pthread_mutex_unlock(&ms->write_mutex); #else if (current_db->fd > 0) { if (free_descriptors[2] != 0) { @@ -1389,6 +1431,8 @@ void PgfDB::commit(object o) throw pgf_systemerror(last_error_to_errno()); } } + + ReleaseMutex(hWriteMutex); #endif } @@ -1402,6 +1446,12 @@ void PgfDB::rollback() free_descriptors[2] = 0; last_free_block = 0; last_free_block_size = 0; + +#ifndef _WIN32 + pthread_mutex_unlock(&ms->write_mutex); +#else + ReleaseMutex(hWriteMutex); +#endif } #ifdef _WIN32 @@ -1448,45 +1498,40 @@ __forceinline bool AllClear(unsigned __int32 lock) void PgfDB::lock(DB_scope_mode m) { + if (m == READER_SCOPE) { #ifndef _WIN32 - int res = - (m == READER_SCOPE) ? pthread_rwlock_rdlock(&ms->rwlock) - : pthread_rwlock_wrlock(&ms->rwlock); - if (res != 0) - throw pgf_systemerror(res); + int res = pthread_rwlock_rdlock(&ms->rwlock); + if (res != 0) + throw pgf_systemerror(res); #else - for (int i = 0; ; ++i) { - unsigned __int32 temp = ms->rwlock; - if (m == READER_SCOPE && !Writer(temp)) { - if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp) - return; - else - continue; - } else if (m == WRITER_SCOPE && AllClear(temp)) { - if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp) - return; - else - continue; - } else { - if (i < MAX_SPIN) { - YieldProcessor(); - continue; + for (int i = 0; ; ++i) { + unsigned __int32 temp = ms->rwlock; + if (!Writer(temp)) { + if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp) + return; + else + continue; + } else { + if (i < MAX_SPIN) { + YieldProcessor(); + continue; + } + + //The pending write operation is taking too long, so we'll drop to the kernel and wait + if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp) + continue; + + i = 0; //Reset the spincount for the next time + WaitForSingleObject(hRWEvent, INFINITE); + + do + { + temp = ms->rwlock; + } while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp); } - - //The pending write operation is taking too long, so we'll drop to the kernel and wait - if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp) - continue; - - i = 0; //Reset the spincount for the next time - WaitForSingleObject(hRWEvent, INFINITE); - - do - { - temp = ms->rwlock; - } while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp); } - } #endif + } // If another process has resized the file we must resize the map if (mmap_size != ms->file_size) @@ -1514,20 +1559,6 @@ void PgfDB::unlock() if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp) break; } else { - while(true) { - temp = ms->rwlock; - assert(Writer(temp)); - if (WaitingCount(temp) == 0) - break; - - //Note: this is thread-safe (there's guaranteed not to be another EndWrite simultaneously) - //Wake all waiting readers or writers, loop until wake confirmation is received - SetEvent(hRWEvent); - } - - //Decrement writer count - if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp) - break; } } #endif @@ -1536,7 +1567,11 @@ void PgfDB::unlock() void PgfDB::resize_map(size_t new_size) { #ifndef _WIN32 - unsigned char* new_base; + int res; + unsigned char* new_base; + + if ((res = pthread_rwlock_wrlock(&ms->rwlock)) != 0) + throw pgf_systemerror(res); // OSX does not implement mremap or MREMAP_MAYMOVE #ifndef MREMAP_MAYMOVE @@ -1570,6 +1605,34 @@ void PgfDB::resize_map(size_t new_size) base = new_base; #else + + for (int i = 0; ; ++i) { + unsigned __int32 temp = ms->rwlock; + if (AllClear(temp)) { + if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp) + return; + else + continue; + } else { + if (i < MAX_SPIN) { + YieldProcessor(); + continue; + } + + //The pending write operation is taking too long, so we'll drop to the kernel and wait + if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp) + continue; + + i = 0; //Reset the spincount for the next time + WaitForSingleObject(hRWEvent, INFINITE); + + do + { + temp = ms->rwlock; + } while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp); + } + } + new_size += page_size; if (fd >= 0) { UnmapViewOfFile(ms); @@ -1605,12 +1668,34 @@ void PgfDB::resize_map(size_t new_size) current_base = (unsigned char*) base; mmap_size = new_size; ms->file_size = new_size; + + +#ifndef _WIN32 + if ((res = pthread_rwlock_unlock(&ms->rwlock)) != 0) + throw pgf_systemerror(res); +#else + while(true) { + temp = ms->rwlock; + assert(Writer(temp)); + if (WaitingCount(temp) == 0) + break; + + //Note: this is thread-safe (there's guaranteed not to be another EndWrite simultaneously) + //Wake all waiting readers or writers, loop until wake confirmation is received + SetEvent(hRWEvent); + } + + //Decrement writer count + if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp) + break; +#endif } DB_scope::DB_scope(PgfDB *db, DB_scope_mode m) { db->lock(m); + mode = m; save_db = current_db; current_db = db; current_base = db->base; @@ -1623,7 +1708,8 @@ DB_scope::~DB_scope() { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wterminate" - current_db->unlock(); + if (mode == READER_SCOPE) + current_db->unlock(); current_db = save_db; current_base = current_db ? current_db->base diff --git a/src/runtime/c/pgf/db.h b/src/runtime/c/pgf/db.h index d471a5406..65e5cb1fa 100644 --- a/src/runtime/c/pgf/db.h +++ b/src/runtime/c/pgf/db.h @@ -81,7 +81,8 @@ private: #else DWORD pid; HANDLE hMap; - HANDLE hMutex; + HANDLE hRevMutex; + HANDLE hWriteMutex; HANDLE hRWEvent; #endif @@ -162,6 +163,7 @@ public: ~DB_scope(); private: + DB_scope_mode mode; PgfDB* save_db; DB_scope* next_scope; };