the r/w lock is now in the database itself

This commit is contained in:
krangelov
2021-11-15 13:13:00 +01:00
parent 0a204a47f7
commit c1690ffa77
5 changed files with 141 additions and 417 deletions

View File

@@ -23,6 +23,10 @@ size_t getpagesize()
#define ftruncate _chsize
typedef DWORD pid_t;
#define getpid GetCurrentProcessId
static
int last_error_to_errno()
{
@@ -286,6 +290,11 @@ typedef struct mchunk mbin;
static char slovo[5] = {'S','L','O','V','O'};
typedef struct {
pid_t pid;
object next;
} process_entry;
struct PGF_INTERNAL_DECL malloc_state
{
/* Each .ngf file starts with 'SLOVO' as in:
@@ -316,10 +325,13 @@ struct PGF_INTERNAL_DECL malloc_state
ref<PgfPGF> transient_revisions;
ref<PgfConcr> transient_concr_revisions;
#ifdef _WIN32
#ifndef _WIN32
pthread_rwlock_t rwlock;
#else
/* Stores a Reader/Writer lock for Windows */
LONG lock;
LONG rwlock;
#endif
process_entry p;
};
PGF_INTERNAL
@@ -385,21 +397,6 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
close(fd);
throw pgf_systemerror(code, filepath);
}
try {
rwlock = ipc_new_file_rwlock(this->filepath, &is_first);
} catch (pgf_systemerror e) {
#ifndef MREMAP_MAYMOVE
if (fd < 0) {
::free(ms);
} else
#endif
munmap(ms,file_size);
::free((void *) this->filepath);
close(fd);
throw e;
}
#else
char *name;
char buf[256];
@@ -490,6 +487,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
throw pgf_error("Invalid file content");
}
register_process(&is_first);
if (is_first) {
// We must make sure that left-over transient revisions are
// released. This may happen if a client process was killed
@@ -520,8 +519,11 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
}
PGF_INTERNAL
PgfDB::~PgfDB() {
PgfDB::~PgfDB()
{
if (ms != NULL) {
unregister_process();
size_t size =
ms->top + chunksize(ptr(ms,ms->top)) + sizeof(size_t);
@@ -546,13 +548,94 @@ PgfDB::~PgfDB() {
if (fd >= 0)
close(fd);
#ifndef _WIN32
ipc_release_file_rwlock(filepath, rwlock);
#endif
::free((void*) filepath);
}
PGF_INTERNAL
void PgfDB::register_process(bool *is_first)
{
process_entry *pentry = &ms->p;
object *plast = NULL;
if (ms->p.pid != 0) {
while (pentry != (process_entry *) ptr(ms,0)) {
#ifndef _WIN32
char proc_file[32];
sprintf(proc_file, "/proc/%d", pentry->pid);
bool alive = (access(proc_file, F_OK) == 0);
#else
HANDLE hProcess = OpenProcess(PROCESS_QUERY_INFORMATION,
FALSE,pentry->pid);
DWORD dwExitCode = STILL_ACTIVE;
if (hProcess != NULL)
GetExitCodeProcess(hProcess,&dwExitCode);
bool alive = (dwExitCode == STILL_ACTIVE);
CloseHandle(hProcess);
#endif
if (!alive) {
// if there are dead processes -> remove them
if (plast == NULL) {
if (ms->p.next == 0) {
ms->p.pid = 0;
break;
} else {
object next = pentry->next;
*pentry = *((process_entry *) ptr(ms,next));
free_internal(next);
}
} else {
*plast = pentry->next;
free_internal(ofs(ms,pentry));
pentry = (process_entry *) ptr(ms, *plast);
}
} else {
plast = &pentry->next;
pentry = (process_entry *) ptr(ms, *plast);
}
}
}
if (plast == NULL) {
*is_first = true;
} else {
*is_first = false;
*plast = malloc_internal(sizeof(process_entry));
pentry = (process_entry*) ptr(ms,*plast);
pentry->next = 0;
}
pentry->pid = getpid();
}
PGF_INTERNAL
void PgfDB::unregister_process()
{
pid_t pid = getpid();
process_entry *pentry = &ms->p;
object *plast = NULL;
while (pentry != (process_entry *) ptr(ms,0)) {
if (pentry->pid == pid) {
if (plast == NULL) {
if (ms->p.next == 0) {
ms->p.pid = 0;
} else {
object next = pentry->next;
*pentry = *((process_entry *) ptr(ms,next));
free_internal(next);
}
} else {
*plast = pentry->next;
free_internal(ofs(ms,pentry));
pentry = (process_entry *) ptr(ms, *plast);
}
break;
} else {
plast = &pentry->next;
pentry = (process_entry *) ptr(ms, *plast);
}
}
}
PGF_INTERNAL
ref<PgfPGF> PgfDB::get_revision(PgfText *name)
{
@@ -600,9 +683,25 @@ void PgfDB::init_state(size_t size)
ms->transient_revisions = 0;
ms->transient_concr_revisions = 0;
#if _WIN32
ms->lock = 0;
#ifndef _WIN32
pthread_rwlockattr_t attr;
if (pthread_rwlockattr_init(&attr) != 0) {
throw pgf_systemerror(errno);
}
if (pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) {
pthread_rwlockattr_destroy(&attr);
throw pgf_systemerror(errno);
}
if (pthread_rwlock_init(&ms->rwlock, &attr) != 0) {
pthread_rwlockattr_destroy(&attr);
throw pgf_systemerror(errno);
}
pthread_rwlockattr_destroy(&attr);
#else
ms->rwlock = 0;
#endif
ms->p.pid = getpid();
ms->p.next = 0;
}
/* Take a chunk off a bin list. */
@@ -1384,20 +1483,20 @@ void PgfDB::lock(DB_scope_mode m)
{
#ifndef _WIN32
int res =
(m == READER_SCOPE) ? pthread_rwlock_rdlock(rwlock)
: pthread_rwlock_wrlock(rwlock);
(m == READER_SCOPE) ? pthread_rwlock_rdlock(&ms->rwlock)
: pthread_rwlock_wrlock(&ms->rwlock);
if (res != 0)
throw pgf_systemerror(res);
#else
for (int i = 0; ; ++i) {
unsigned __int32 temp = ms->lock;
unsigned __int32 temp = ms->rwlock;
if (m == READER_SCOPE && !Writer(temp)) {
if (InterlockedCompareExchange(&ms->lock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp)
if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp)
return;
else
continue;
} else if (m == WRITER_SCOPE && AllClear(temp)) {
if (InterlockedCompareExchange(&ms->lock, SetWriter(temp, true), temp) == temp)
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp)
return;
else
continue;
@@ -1408,7 +1507,7 @@ void PgfDB::lock(DB_scope_mode m)
}
//The pending write operation is taking too long, so we'll drop to the kernel and wait
if (InterlockedCompareExchange(&ms->lock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp)
if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp)
continue;
i = 0; //Reset the spincount for the next time
@@ -1416,8 +1515,8 @@ void PgfDB::lock(DB_scope_mode m)
do
{
temp = ms->lock;
} while (InterlockedCompareExchange(&ms->lock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp);
temp = ms->rwlock;
} while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp);
}
}
#endif
@@ -1426,10 +1525,10 @@ void PgfDB::lock(DB_scope_mode m)
void PgfDB::unlock()
{
#ifndef _WIN32
pthread_rwlock_unlock(rwlock);
pthread_rwlock_unlock(&ms->rwlock);
#else
while (true) {
unsigned __int32 temp = ms->lock;
unsigned __int32 temp = ms->rwlock;
if (ReaderCount(temp) > 0) {
if (ReaderCount(temp) == 1 && WaitingCount(temp) != 0) {
//Note: this isn't nor has to be thread-safe, as the worst a duplicate notification can do
@@ -1441,11 +1540,11 @@ void PgfDB::unlock()
}
//Decrement reader count
if (InterlockedCompareExchange(&ms->lock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp)
if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp)
break;
} else {
while(true) {
temp = ms->lock;
temp = ms->rwlock;
assert(Writer(temp));
if (WaitingCount(temp) == 0)
break;
@@ -1456,7 +1555,7 @@ void PgfDB::unlock()
}
//Decrement writer count
if (InterlockedCompareExchange(&ms->lock, SetWriter(temp, false), temp) == temp)
if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp)
break;
}
}