From c1690ffa77441cb8ec536557bd91d69e2d1f9678 Mon Sep 17 00:00:00 2001 From: krangelov Date: Mon, 15 Nov 2021 13:13:00 +0100 Subject: [PATCH] the r/w lock is now in the database itself --- src/runtime/c/Makefile.am | 2 - src/runtime/c/pgf/db.cxx | 173 ++++++++++++++---- src/runtime/c/pgf/db.h | 12 +- src/runtime/c/pgf/ipc.cxx | 358 -------------------------------------- src/runtime/c/pgf/ipc.h | 13 -- 5 files changed, 141 insertions(+), 417 deletions(-) delete mode 100644 src/runtime/c/pgf/ipc.cxx delete mode 100644 src/runtime/c/pgf/ipc.h diff --git a/src/runtime/c/Makefile.am b/src/runtime/c/Makefile.am index c86f849b2..3788ccd0f 100644 --- a/src/runtime/c/Makefile.am +++ b/src/runtime/c/Makefile.am @@ -10,8 +10,6 @@ pgfinclude_HEADERS = \ libpgf_la_SOURCES = \ pgf/db.cxx \ pgf/db.h \ - pgf/ipc.cxx \ - pgf/ipc.h \ pgf/text.cxx \ pgf/text.h \ pgf/pgf.cxx \ diff --git a/src/runtime/c/pgf/db.cxx b/src/runtime/c/pgf/db.cxx index ad577f918..f1235a55f 100644 --- a/src/runtime/c/pgf/db.cxx +++ b/src/runtime/c/pgf/db.cxx @@ -23,6 +23,10 @@ size_t getpagesize() #define ftruncate _chsize +typedef DWORD pid_t; + +#define getpid GetCurrentProcessId + static int last_error_to_errno() { @@ -286,6 +290,11 @@ typedef struct mchunk mbin; static char slovo[5] = {'S','L','O','V','O'}; +typedef struct { + pid_t pid; + object next; +} process_entry; + struct PGF_INTERNAL_DECL malloc_state { /* Each .ngf file starts with 'SLOVO' as in: @@ -316,10 +325,13 @@ struct PGF_INTERNAL_DECL malloc_state ref transient_revisions; ref transient_concr_revisions; -#ifdef _WIN32 +#ifndef _WIN32 + pthread_rwlock_t rwlock; +#else /* Stores a Reader/Writer lock for Windows */ - LONG lock; + LONG rwlock; #endif + process_entry p; }; PGF_INTERNAL @@ -385,21 +397,6 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { close(fd); throw pgf_systemerror(code, filepath); } - - try { - rwlock = ipc_new_file_rwlock(this->filepath, &is_first); - } catch (pgf_systemerror e) { -#ifndef MREMAP_MAYMOVE - if (fd < 0) { - ::free(ms); - } else -#endif - munmap(ms,file_size); - - ::free((void *) this->filepath); - close(fd); - throw e; - } #else char *name; char buf[256]; @@ -490,6 +487,8 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { throw pgf_error("Invalid file content"); } + register_process(&is_first); + if (is_first) { // We must make sure that left-over transient revisions are // released. This may happen if a client process was killed @@ -520,8 +519,11 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) { } PGF_INTERNAL -PgfDB::~PgfDB() { +PgfDB::~PgfDB() +{ if (ms != NULL) { + unregister_process(); + size_t size = ms->top + chunksize(ptr(ms,ms->top)) + sizeof(size_t); @@ -546,13 +548,94 @@ PgfDB::~PgfDB() { if (fd >= 0) close(fd); -#ifndef _WIN32 - ipc_release_file_rwlock(filepath, rwlock); -#endif - ::free((void*) filepath); } +PGF_INTERNAL +void PgfDB::register_process(bool *is_first) +{ + process_entry *pentry = &ms->p; + object *plast = NULL; + + if (ms->p.pid != 0) { + while (pentry != (process_entry *) ptr(ms,0)) { +#ifndef _WIN32 + char proc_file[32]; + sprintf(proc_file, "/proc/%d", pentry->pid); + bool alive = (access(proc_file, F_OK) == 0); +#else + HANDLE hProcess = OpenProcess(PROCESS_QUERY_INFORMATION, + FALSE,pentry->pid); + DWORD dwExitCode = STILL_ACTIVE; + if (hProcess != NULL) + GetExitCodeProcess(hProcess,&dwExitCode); + bool alive = (dwExitCode == STILL_ACTIVE); + CloseHandle(hProcess); +#endif + if (!alive) { + // if there are dead processes -> remove them + if (plast == NULL) { + if (ms->p.next == 0) { + ms->p.pid = 0; + break; + } else { + object next = pentry->next; + *pentry = *((process_entry *) ptr(ms,next)); + free_internal(next); + } + } else { + *plast = pentry->next; + free_internal(ofs(ms,pentry)); + pentry = (process_entry *) ptr(ms, *plast); + } + } else { + plast = &pentry->next; + pentry = (process_entry *) ptr(ms, *plast); + } + } + } + + if (plast == NULL) { + *is_first = true; + } else { + *is_first = false; + *plast = malloc_internal(sizeof(process_entry)); + pentry = (process_entry*) ptr(ms,*plast); + pentry->next = 0; + } + pentry->pid = getpid(); +} + +PGF_INTERNAL +void PgfDB::unregister_process() +{ + pid_t pid = getpid(); + process_entry *pentry = &ms->p; + object *plast = NULL; + + while (pentry != (process_entry *) ptr(ms,0)) { + if (pentry->pid == pid) { + if (plast == NULL) { + if (ms->p.next == 0) { + ms->p.pid = 0; + } else { + object next = pentry->next; + *pentry = *((process_entry *) ptr(ms,next)); + free_internal(next); + } + } else { + *plast = pentry->next; + free_internal(ofs(ms,pentry)); + pentry = (process_entry *) ptr(ms, *plast); + } + break; + } else { + plast = &pentry->next; + pentry = (process_entry *) ptr(ms, *plast); + } + } +} + PGF_INTERNAL ref PgfDB::get_revision(PgfText *name) { @@ -600,9 +683,25 @@ void PgfDB::init_state(size_t size) ms->transient_revisions = 0; ms->transient_concr_revisions = 0; -#if _WIN32 - ms->lock = 0; +#ifndef _WIN32 + pthread_rwlockattr_t attr; + if (pthread_rwlockattr_init(&attr) != 0) { + throw pgf_systemerror(errno); + } + if (pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) { + pthread_rwlockattr_destroy(&attr); + throw pgf_systemerror(errno); + } + if (pthread_rwlock_init(&ms->rwlock, &attr) != 0) { + pthread_rwlockattr_destroy(&attr); + throw pgf_systemerror(errno); + } + pthread_rwlockattr_destroy(&attr); +#else + ms->rwlock = 0; #endif + ms->p.pid = getpid(); + ms->p.next = 0; } /* Take a chunk off a bin list. */ @@ -1384,20 +1483,20 @@ void PgfDB::lock(DB_scope_mode m) { #ifndef _WIN32 int res = - (m == READER_SCOPE) ? pthread_rwlock_rdlock(rwlock) - : pthread_rwlock_wrlock(rwlock); + (m == READER_SCOPE) ? pthread_rwlock_rdlock(&ms->rwlock) + : pthread_rwlock_wrlock(&ms->rwlock); if (res != 0) throw pgf_systemerror(res); #else for (int i = 0; ; ++i) { - unsigned __int32 temp = ms->lock; + unsigned __int32 temp = ms->rwlock; if (m == READER_SCOPE && !Writer(temp)) { - if (InterlockedCompareExchange(&ms->lock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp) + if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) + 1), temp) == temp) return; else continue; } else if (m == WRITER_SCOPE && AllClear(temp)) { - if (InterlockedCompareExchange(&ms->lock, SetWriter(temp, true), temp) == temp) + if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, true), temp) == temp) return; else continue; @@ -1408,7 +1507,7 @@ void PgfDB::lock(DB_scope_mode m) } //The pending write operation is taking too long, so we'll drop to the kernel and wait - if (InterlockedCompareExchange(&ms->lock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp) + if (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) + 1), temp) != temp) continue; i = 0; //Reset the spincount for the next time @@ -1416,8 +1515,8 @@ void PgfDB::lock(DB_scope_mode m) do { - temp = ms->lock; - } while (InterlockedCompareExchange(&ms->lock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp); + temp = ms->rwlock; + } while (InterlockedCompareExchange(&ms->rwlock, SetWaiting(temp, WaitingCount(temp) - 1), temp) != temp); } } #endif @@ -1426,10 +1525,10 @@ void PgfDB::lock(DB_scope_mode m) void PgfDB::unlock() { #ifndef _WIN32 - pthread_rwlock_unlock(rwlock); + pthread_rwlock_unlock(&ms->rwlock); #else while (true) { - unsigned __int32 temp = ms->lock; + unsigned __int32 temp = ms->rwlock; if (ReaderCount(temp) > 0) { if (ReaderCount(temp) == 1 && WaitingCount(temp) != 0) { //Note: this isn't nor has to be thread-safe, as the worst a duplicate notification can do @@ -1441,11 +1540,11 @@ void PgfDB::unlock() } //Decrement reader count - if (InterlockedCompareExchange(&ms->lock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp) + if (InterlockedCompareExchange(&ms->rwlock, SetReaders(temp, ReaderCount(temp) - 1), temp) == temp) break; } else { while(true) { - temp = ms->lock; + temp = ms->rwlock; assert(Writer(temp)); if (WaitingCount(temp) == 0) break; @@ -1456,7 +1555,7 @@ void PgfDB::unlock() } //Decrement writer count - if (InterlockedCompareExchange(&ms->lock, SetWriter(temp, false), temp) == temp) + if (InterlockedCompareExchange(&ms->rwlock, SetWriter(temp, false), temp) == temp) break; } } diff --git a/src/runtime/c/pgf/db.h b/src/runtime/c/pgf/db.h index 96184e668..9c92c6919 100644 --- a/src/runtime/c/pgf/db.h +++ b/src/runtime/c/pgf/db.h @@ -54,10 +54,6 @@ public: } }; -#ifndef _WIN32 -#include "ipc.h" -#endif - enum DB_scope_mode {READER_SCOPE, WRITER_SCOPE}; class PgfDB { @@ -66,9 +62,7 @@ private: const char *filepath; malloc_state* ms; -#ifndef _WIN32 - pthread_rwlock_t *rwlock; -#else +#ifdef _WIN32 HANDLE hMap; HANDLE hRWEvent; #endif @@ -114,6 +108,10 @@ private: PGF_INTERNAL_DECL object malloc_internal(size_t bytes); PGF_INTERNAL_DECL void free_internal(object o); + PGF_INTERNAL_DECL void register_process(bool *is_first); + PGF_INTERNAL_DECL void unregister_process(); + + void lock(DB_scope_mode m); void unlock(); diff --git a/src/runtime/c/pgf/ipc.cxx b/src/runtime/c/pgf/ipc.cxx deleted file mode 100644 index f9bbb2520..000000000 --- a/src/runtime/c/pgf/ipc.cxx +++ /dev/null @@ -1,358 +0,0 @@ -//#define DEBUG_IPC - -#ifndef _WIN32 -#ifdef DEBUG_IPC -#include -#include -#include -#include -#define PGF_INTERNAL static - -void ipc_error() { - perror(NULL); - exit(1); -} -void ipc_toomany() { - printf("Too many open grammars"); - exit(1); -} -#else -#include "pgf/data.h" -#define ipc_error() throw pgf_systemerror(errno); -#define ipc_toomany() throw pgf_error("Too many open grammars") -#endif - -#include -#include -#include -#include -#include -#include - -#define ptr_t(x) size_t -#define ptr(o,T) (o ? (T*) (((uint8_t*) locks) + o) : NULL) -#define offs(p) (((uint8_t*) p) - ((uint8_t*) locks)) - -typedef struct { - pid_t pid; - ptr_t(process_entry) next; -} process_entry; - -typedef struct { - dev_t dev; - ino_t ino; - process_entry p; - ptr_t(lock_entry) next; - pthread_rwlock_t rwlock; -} lock_entry; - -typedef struct { - pthread_mutex_t mutex; - ptr_t(lock_entry) lock_entries; - ptr_t(lock_entry) free_lock_entries; - ptr_t(process_entry) free_process_entries; - size_t top; -} file_locks; - -static char gf_runtime_locks[] = "/gf-runtime-locks"; - -static file_locks *locks = NULL; - -static -void ipc_cleanup_dead_processes() -{ - ptr_t(lock_entry) *last = &locks->lock_entries; - lock_entry *entry = ptr(*last, lock_entry); - while (entry != NULL) { - process_entry *pentry = &entry->p; - ptr_t(process_entry) *plast = NULL; - while (pentry != NULL) { - char proc_file[32]; - sprintf(proc_file, "/proc/%d", pentry->pid); - if (access(proc_file, F_OK) != 0) { - // if there are dead processes -> remove them - if (plast == NULL) { - if (entry->p.next == 0) { - *last = entry->next; - entry->next = locks->free_lock_entries; - entry->dev = 0; - entry->ino = 0; - entry->p.pid = 0; - locks->free_lock_entries = offs(entry); - goto next; - } else { - process_entry *tmp = - ptr(pentry->next, process_entry); - *pentry = *tmp; - tmp->pid = 0; - tmp->next = locks->free_process_entries; - locks->free_process_entries = offs(tmp); - } - } else { - *plast = pentry->next; - pentry->pid = 0; - pentry->next = locks->free_process_entries; - locks->free_process_entries = offs(pentry); - pentry = ptr(*plast,process_entry); - } - } else { - plast = &pentry->next; - pentry = ptr(*plast,process_entry); - } - } - - last = &entry->next; -next: - entry = ptr(*last, lock_entry); - } -} - -PGF_INTERNAL -pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path, - bool *is_first) -{ - if (file_path == NULL) { - *is_first = true; - pthread_rwlock_t *rwlock = (pthread_rwlock_t *) - malloc(sizeof(pthread_rwlock_t)); - if (pthread_rwlock_init(rwlock, NULL) != 0) { - ipc_error(); - } - return rwlock; - } - - int pagesize = getpagesize(); - - if (locks == NULL) { - int created = 0; - - // Uncomment if you want a clean state - //shm_unlink(gf_runtime_locks); - - int fd = - shm_open(gf_runtime_locks, O_RDWR, 0); - if (errno == ENOENT) { - created = 1; - fd = shm_open(gf_runtime_locks, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); - } - if (fd < 0) { - ipc_error(); - } - - if (ftruncate(fd, pagesize) != 0) { - close(fd); - ipc_error(); - } - - locks = - (file_locks *) - mmap(NULL, pagesize, - PROT_READ|PROT_WRITE, - MAP_SHARED, - fd,0); - close(fd); - if (locks == MAP_FAILED) { - locks = NULL; - ipc_error(); - } - - if (created) { - pthread_mutexattr_t attr; - if (pthread_mutexattr_init(&attr)) { - ipc_error(); - } - if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) { - ipc_error(); - } - if (pthread_mutex_init(&locks->mutex, &attr)) { - ipc_error(); - } - pthread_mutexattr_destroy(&attr); - - locks->lock_entries = 0; - locks->free_lock_entries = 0; - locks->free_process_entries = 0; - locks->top = sizeof(file_locks); - } - } - - struct stat s; - if (stat(file_path, &s) != 0) { - ipc_error(); - } - - pthread_mutex_lock(&locks->mutex); - - ipc_cleanup_dead_processes(); - - lock_entry *entry = ptr(locks->lock_entries, lock_entry); - while (entry != NULL) { - if (entry->dev == s.st_dev && entry->ino == s.st_ino) { - break; - } - entry = ptr(entry->next, lock_entry); - } - - *is_first = false; - - if (entry == NULL) { - *is_first = true; - - if (locks->free_lock_entries) { - entry = ptr(locks->free_lock_entries, lock_entry); - locks->free_lock_entries = entry->next; - } else { - if (locks->top + sizeof(lock_entry) > pagesize) { - pthread_mutex_unlock(&locks->mutex); - ipc_toomany(); - } - entry = ptr(locks->top, lock_entry); - locks->top += sizeof(lock_entry); - - pthread_rwlockattr_t attr; - if (pthread_rwlockattr_init(&attr) != 0) { - pthread_mutex_unlock(&locks->mutex); - ipc_error(); - } - if (pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) { - pthread_rwlockattr_destroy(&attr); - pthread_mutex_unlock(&locks->mutex); - ipc_error(); - } - if (pthread_rwlock_init(&entry->rwlock, &attr) != 0) { - pthread_rwlockattr_destroy(&attr); - pthread_mutex_unlock(&locks->mutex); - ipc_error(); - } - pthread_rwlockattr_destroy(&attr); - } - - entry->dev = s.st_dev; - entry->ino = s.st_ino; - entry->p.pid = getpid(); - entry->p.next= 0; - entry->next = locks->lock_entries; - locks->lock_entries = offs(entry); - } else { - process_entry *pentry; - if (locks->free_process_entries) { - pentry = ptr(locks->free_process_entries,process_entry); - locks->free_process_entries = pentry->next; - } else { - if (locks->top+sizeof(process_entry) > pagesize) { - pthread_mutex_unlock(&locks->mutex); - ipc_toomany(); - } - pentry = ptr(locks->top,process_entry); - locks->top += sizeof(process_entry); - } - - pentry->pid = getpid(); - pentry->next = entry->p.next; - entry->p.next = offs(pentry); - } - - pthread_mutex_unlock(&locks->mutex); - - return &entry->rwlock; -} - -PGF_INTERNAL -void ipc_release_file_rwlock(const char* file_path, - pthread_rwlock_t *rwlock) -{ - if (file_path == NULL) { - pthread_rwlock_destroy(rwlock); - free(rwlock); - return; - } - - if (locks == NULL) - return; - - pthread_mutex_lock(&locks->mutex); - - ipc_cleanup_dead_processes(); - - lock_entry *entry = ptr(locks->lock_entries,lock_entry); - ptr_t(lock_entry) *last = &locks->lock_entries; - while (entry != NULL) { - if (&entry->rwlock == rwlock) { - break; - } - last = &entry->next; - entry = ptr(*last,lock_entry); - } - - if (entry != NULL) { - pid_t pid = getpid(); - process_entry *pentry = &entry->p; - ptr_t(process_entry) *plast = NULL; - while (pentry != NULL) { - if (pentry->pid == pid) { - if (plast == NULL) { - if (entry->p.next == 0) { - *last = entry->next; - entry->next = locks->free_lock_entries; - entry->dev = 0; - entry->ino = 0; - entry->p.pid = 0; - locks->free_lock_entries = offs(entry); - } else { - process_entry *tmp = - ptr(pentry->next, process_entry); - *pentry = *tmp; - tmp->pid = 0; - tmp->next = locks->free_process_entries; - locks->free_process_entries = offs(tmp); - } - } else { - *plast = pentry->next; - pentry->pid = 0; - pentry->next = locks->free_process_entries; - locks->free_process_entries = offs(pentry); - } - - break; - } - - plast = &pentry->next; - pentry = ptr(*plast,process_entry); - } - } - - pthread_mutex_unlock(&locks->mutex); -} - -#ifdef DEBUG_IPC -int main(int argc, char *argv[]) -{ - if (argc < 3 || - (strcmp(argv[1], "r") != 0 && strcmp(argv[1], "w") != 0)) { - printf("syntax: %s (r|w) \n", argv[0]); - return 1; - } - - bool is_first; - ipc_rwlock_t *rwlock = ipc_new_file_rwlock(argv[2], &is_first); - - if (strcmp(argv[1],"r") == 0) { - ipc_rwlock_rdlock(rwlock); - } else if (strcmp(argv[1],"w") == 0) { - ipc_rwlock_wrlock(rwlock); - } - - fputs("> ", stdout); - fflush(stdout); - - char buf[16]; - read(0, buf, sizeof(buf)); - - ipc_rwlock_unlock(rwlock); - - ipc_release_file_rwlock(argv[2], rwlock); - - return 0; -} -#endif -#endif diff --git a/src/runtime/c/pgf/ipc.h b/src/runtime/c/pgf/ipc.h deleted file mode 100644 index 9d78a5b84..000000000 --- a/src/runtime/c/pgf/ipc.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef IPC_H -#define IPC_H - -#ifndef _WIN32 -PGF_INTERNAL_DECL -pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path, - bool *is_first); - -PGF_INTERNAL_DECL -void ipc_release_file_rwlock(const char* file_path, - pthread_rwlock_t *rwlock); -#endif -#endif