From 186b151a905a00d2973db9a72c436c71908c83eb Mon Sep 17 00:00:00 2001 From: krangelov Date: Mon, 25 Oct 2021 15:51:06 +0200 Subject: [PATCH] rewrite ipc.cxx to support dynamic allocation of rwlocks --- src/runtime/c/pgf/ipc.cxx | 302 ++++++++++++++++++++++++++++++-------- 1 file changed, 238 insertions(+), 64 deletions(-) diff --git a/src/runtime/c/pgf/ipc.cxx b/src/runtime/c/pgf/ipc.cxx index c56d4f489..285090bb1 100644 --- a/src/runtime/c/pgf/ipc.cxx +++ b/src/runtime/c/pgf/ipc.cxx @@ -5,111 +5,285 @@ #include #include +//#define DEBUG_IPC + +#ifdef DEBUG_IPC +#include +#include +#include +#include +#define PGF_INTERNAL static +void ipc_error() { + perror(NULL); + exit(1); +} +void ipc_toomany() { + printf("Too many open grammars"); + exit(1); +} +#else #include "pgf/data.h" +#define ipc_error() throw pgf_systemerror(errno); +#define ipc_toomany() throw pgf_error("Too many open grammars") +#endif + +#define ptr_t(x) size_t +#define ptr(o,T) (o ? (T*) (((uint8_t*) locks) + o) : NULL) +#define offs(p) (((uint8_t*) p) - ((uint8_t*) locks)) + +typedef struct { + pid_t pid; + ptr_t(process_entry) next; +} process_entry; typedef struct { dev_t dev; ino_t ino; + process_entry p; + ptr_t(lock_entry) next; pthread_rwlock_t rwlock; -} file_locks_entry; +} lock_entry; typedef struct { pthread_mutex_t mutex; - file_locks_entry entries[]; + ptr_t(lock_entry) lock_entries; + ptr_t(lock_entry) free_lock_entries; + ptr_t(process_entry) free_process_entries; + size_t top; } file_locks; static char gf_runtime_locks[] = "/gf-runtime-locks"; +static file_locks *locks = NULL; + PGF_INTERNAL -pthread_rwlock_t *pgf_acquire_file_rwlock(const char* file_path) +pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path) { - int created = 0; - - int fd = - shm_open(gf_runtime_locks, O_RDWR, 0); - if (errno == ENOENT) { - created = 1; - fd = shm_open(gf_runtime_locks, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); - } - if (fd < 0) { - throw pgf_systemerror(errno); - } - int pagesize = getpagesize(); - int n_entries = (pagesize - sizeof(file_locks)) - / sizeof(file_locks_entry); - if (ftruncate(fd, pagesize) != 0) { - throw pgf_systemerror(errno); - } + if (locks == NULL) { + int created = 0; - file_locks *locks = - (file_locks *) - mmap(NULL, pagesize, - PROT_READ|PROT_WRITE, - MAP_SHARED, - fd,0); - if (locks == MAP_FAILED) { - throw pgf_systemerror(errno); - } + // Uncomment if you want a clean state + //shm_unlink(gf_runtime_locks); - if (created) { - pthread_mutexattr_t attr; - if (pthread_mutexattr_init(&attr)) { - throw pgf_systemerror(errno); + int fd = + shm_open(gf_runtime_locks, O_RDWR, 0); + if (errno == ENOENT) { + created = 1; + fd = shm_open(gf_runtime_locks, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); } - if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) { - throw pgf_systemerror(errno); - } - if (pthread_mutex_init(&locks->mutex, &attr)) { - throw pgf_systemerror(errno); + if (fd < 0) { + ipc_error(); } - for (int i = 0; i < n_entries; i++) { - locks->entries[i].dev = 0; - locks->entries[i].ino = 0; + if (ftruncate(fd, pagesize) != 0) { + close(fd); + ipc_error(); + } + + locks = + (file_locks *) + mmap(NULL, pagesize, + PROT_READ|PROT_WRITE, + MAP_SHARED, + fd,0); + close(fd); + if (locks == MAP_FAILED) { + locks = NULL; + ipc_error(); + } + + if (created) { + pthread_mutexattr_t attr; + if (pthread_mutexattr_init(&attr)) { + ipc_error(); + } + if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) { + ipc_error(); + } + if (pthread_mutex_init(&locks->mutex, &attr)) { + ipc_error(); + } + pthread_mutexattr_destroy(&attr); + + locks->lock_entries = 0; + locks->free_lock_entries = 0; + locks->free_process_entries = 0; + locks->top = sizeof(file_locks); } } struct stat s; if (stat(file_path, &s) != 0) { - throw pgf_systemerror(errno); + ipc_error(); } pthread_mutex_lock(&locks->mutex); - file_locks_entry *entry = NULL; - for (int i = 0; i < n_entries; i++) { - if ((locks->entries[i].dev == s.st_dev && - locks->entries[i].ino == s.st_ino) || - (locks->entries[i].dev == 0 && - locks->entries[i].ino == 0 && - entry == NULL)) { - entry = &locks->entries[i]; + lock_entry *entry = ptr(locks->lock_entries, lock_entry); + while (entry != NULL) { + if (entry->dev == s.st_dev && entry->ino == s.st_ino) { + break; } + entry = ptr(entry->next, lock_entry); } if (entry == NULL) { - throw pgf_error("Too many files"); - } + if (locks->free_lock_entries) { + entry = ptr(locks->free_lock_entries, lock_entry); + locks->free_lock_entries = entry->next; + } else { + if (locks->top + sizeof(lock_entry) > pagesize) { + pthread_mutex_unlock(&locks->mutex); + ipc_toomany(); + } + entry = ptr(locks->top, lock_entry); + locks->top += sizeof(lock_entry); - if (entry->dev == 0 && entry->ino == 0) { - entry->dev = s.st_dev; - entry->ino = s.st_ino; + pthread_rwlockattr_t attr; + if (pthread_rwlockattr_init(&attr) != 0) { + ipc_error(); + } + if (pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) { + ipc_error(); + } + if (pthread_rwlock_init(&entry->rwlock, &attr) != 0) { + ipc_error(); + } + pthread_rwlockattr_destroy(&attr); + } - pthread_rwlockattr_t attr; - if (pthread_rwlockattr_init(&attr) != 0) { - throw pgf_systemerror(errno); - } - if (pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0) { - throw pgf_systemerror(errno); - } - if (pthread_rwlock_init(&entry->rwlock, &attr) != 0) { - throw pgf_systemerror(errno); + entry->dev = s.st_dev; + entry->ino = s.st_ino; + entry->p.pid = getpid(); + entry->p.next= 0; + entry->next = locks->lock_entries; + locks->lock_entries = offs(entry); + } else { + process_entry *pentry; + if (locks->free_process_entries) { + pentry = ptr(locks->free_process_entries,process_entry); + locks->free_process_entries = pentry->next; + } else { + if (locks->top+sizeof(process_entry) > pagesize) { + pthread_mutex_unlock(&locks->mutex); + ipc_toomany(); + } + pentry = ptr(locks->top,process_entry); + locks->top += sizeof(process_entry); } + pentry->next = entry->p.next; + pentry->pid = getpid(); + entry->p.next = offs(pentry); } pthread_mutex_unlock(&locks->mutex); return &entry->rwlock; } + +PGF_INTERNAL +void ipc_release_file_rwlock(const char* file_path) +{ + if (locks == NULL) + return; + + struct stat s; + if (stat(file_path, &s) != 0) { + ipc_error(); + } + + pthread_mutex_lock(&locks->mutex); + + lock_entry *entry = ptr(locks->lock_entries,lock_entry); + ptr_t(lock_entry) *last = &locks->lock_entries; + while (entry != NULL) { + if (entry->dev == s.st_dev && entry->ino == s.st_ino) { + break; + } + entry = ptr(entry->next,lock_entry); + last = &entry->next; + } + + if (entry != NULL) { + pid_t pid = getpid(); + process_entry *pentry = &entry->p; + ptr_t(process_entry) *plast = NULL; + while (pentry != NULL) { + if (pentry->pid == pid) { + pid = -1; // only the first entry should be removed + +free_it: + if (plast == NULL) { + if (entry->p.next == 0) { + *last = entry->next; + entry->next = locks->free_lock_entries; + entry->dev = 0; + entry->ino = 0; + entry->p.pid = 0; + locks->free_lock_entries = offs(entry); + break; + } else { + process_entry *tmp = + ptr(pentry->next, process_entry); + *pentry = *tmp; + tmp->next = locks->free_process_entries; + locks->free_process_entries = offs(tmp); + } + } else { + *plast = pentry->next; + pentry->pid = 0; + pentry->next = locks->free_process_entries; + locks->free_process_entries = offs(pentry); + pentry = ptr(*plast,process_entry); + } + } else { + char proc_file[32]; + sprintf(proc_file, "/proc/%d", pentry->pid); + if (access(proc_file, F_OK) != 0) { + // if there are dead processes -> remove them too + goto free_it; + } else { + plast = &pentry->next; + pentry = ptr(*plast,process_entry); + } + } + } + } + + pthread_mutex_unlock(&locks->mutex); +} + +#ifdef DEBUG_IPC +int main(int argc, char *argv[]) +{ + if (argc < 3 || + (strcmp(argv[1], "r") != 0 && strcmp(argv[1], "w") != 0)) { + printf("syntax: %s (r|w) \n", argv[0]); + return 1; + } + + printf("%ld\n", sizeof(lock_entry)); + + pthread_rwlock_t *rwlock = ipc_new_file_rwlock(argv[2]); + + if (strcmp(argv[1],"r") == 0) { + pthread_rwlock_rdlock(rwlock); + } else if (strcmp(argv[1],"w") == 0) { + pthread_rwlock_wrlock(rwlock); + } + + fputs("> ", stdout); + fflush(stdout); + + char buf[16]; + read(0, buf, sizeof(buf)); + + pthread_rwlock_unlock(rwlock); + + ipc_release_file_rwlock(argv[2]); + + return 0; +} +#endif