forked from GitHub/gf-core
now we use inter-process locking
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
#include <pthread.h>
|
||||
|
||||
#include "data.h"
|
||||
#include "ipc.h"
|
||||
|
||||
PGF_INTERNAL __thread unsigned char* current_base __attribute__((tls_model("initial-exec"))) = NULL;
|
||||
PGF_INTERNAL __thread PgfDB* current_db __attribute__((tls_model("initial-exec"))) = NULL;
|
||||
@@ -344,12 +345,12 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode) {
|
||||
throw pgf_systemerror(code, filepath);
|
||||
}
|
||||
|
||||
int res = pthread_rwlock_init(&rwlock, NULL);
|
||||
if (res != 0) {
|
||||
try {
|
||||
rwlock = ipc_new_file_rwlock(this->filepath);
|
||||
} catch (pgf_systemerror e) {
|
||||
::free((void *) this->filepath);
|
||||
int code = errno;
|
||||
close(fd);
|
||||
throw pgf_systemerror(code);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (is_new) {
|
||||
@@ -389,7 +390,7 @@ PgfDB::~PgfDB() {
|
||||
if (fd >= 0)
|
||||
close(fd);
|
||||
|
||||
pthread_rwlock_destroy(&rwlock);
|
||||
ipc_release_file_rwlock(filepath, rwlock);
|
||||
|
||||
::free((void*) filepath);
|
||||
}
|
||||
@@ -1108,8 +1109,8 @@ void PgfDB::sync()
|
||||
DB_scope::DB_scope(PgfDB *db, DB_scope_mode m)
|
||||
{
|
||||
int res =
|
||||
(m == READER_SCOPE) ? pthread_rwlock_rdlock(&db->rwlock)
|
||||
: pthread_rwlock_wrlock(&db->rwlock);
|
||||
(m == READER_SCOPE) ? pthread_rwlock_rdlock(db->rwlock)
|
||||
: pthread_rwlock_wrlock(db->rwlock);
|
||||
if (res != 0)
|
||||
throw pgf_systemerror(res);
|
||||
|
||||
@@ -1125,9 +1126,7 @@ DB_scope::~DB_scope()
|
||||
{
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wterminate"
|
||||
int res = pthread_rwlock_unlock(¤t_db->rwlock);
|
||||
if (res != 0)
|
||||
throw pgf_systemerror(res);
|
||||
pthread_rwlock_unlock(current_db->rwlock);
|
||||
|
||||
current_db = save_db;
|
||||
current_base = current_db ? (unsigned char*) current_db->ms
|
||||
|
||||
@@ -60,7 +60,7 @@ private:
|
||||
const char *filepath;
|
||||
malloc_state* ms;
|
||||
|
||||
pthread_rwlock_t rwlock;
|
||||
pthread_rwlock_t *rwlock;
|
||||
|
||||
friend class PgfReader;
|
||||
|
||||
|
||||
@@ -59,6 +59,15 @@ static file_locks *locks = NULL;
|
||||
PGF_INTERNAL
|
||||
pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path)
|
||||
{
|
||||
if (file_path == NULL) {
|
||||
pthread_rwlock_t *rwlock = (pthread_rwlock_t *)
|
||||
malloc(sizeof(pthread_rwlock_t));
|
||||
if (pthread_rwlock_init(rwlock, NULL) != 0) {
|
||||
ipc_error();
|
||||
}
|
||||
return rwlock;
|
||||
}
|
||||
|
||||
int pagesize = getpagesize();
|
||||
|
||||
if (locks == NULL) {
|
||||
@@ -161,21 +170,54 @@ pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path)
|
||||
entry->next = locks->lock_entries;
|
||||
locks->lock_entries = offs(entry);
|
||||
} else {
|
||||
process_entry *pentry;
|
||||
if (locks->free_process_entries) {
|
||||
pentry = ptr(locks->free_process_entries,process_entry);
|
||||
locks->free_process_entries = pentry->next;
|
||||
} else {
|
||||
if (locks->top+sizeof(process_entry) > pagesize) {
|
||||
pthread_mutex_unlock(&locks->mutex);
|
||||
ipc_toomany();
|
||||
process_entry *pentry = &entry->p;
|
||||
ptr_t(process_entry) *plast = NULL;
|
||||
while (pentry != NULL) {
|
||||
char proc_file[32];
|
||||
sprintf(proc_file, "/proc/%d", pentry->pid);
|
||||
if (access(proc_file, F_OK) != 0) {
|
||||
// if there are dead processes -> remove them
|
||||
if (plast == NULL) {
|
||||
if (entry->p.next == 0) {
|
||||
entry->p.pid = 0;
|
||||
break;
|
||||
} else {
|
||||
process_entry *tmp =
|
||||
ptr(pentry->next, process_entry);
|
||||
*pentry = *tmp;
|
||||
tmp->pid = 0;
|
||||
tmp->next = locks->free_process_entries;
|
||||
locks->free_process_entries = offs(tmp);
|
||||
}
|
||||
} else {
|
||||
*plast = pentry->next;
|
||||
pentry->pid = 0;
|
||||
pentry->next = locks->free_process_entries;
|
||||
locks->free_process_entries = offs(pentry);
|
||||
pentry = ptr(*plast,process_entry);
|
||||
}
|
||||
} else {
|
||||
plast = &pentry->next;
|
||||
pentry = ptr(*plast,process_entry);
|
||||
}
|
||||
pentry = ptr(locks->top,process_entry);
|
||||
locks->top += sizeof(process_entry);
|
||||
}
|
||||
pentry->next = entry->p.next;
|
||||
pentry->pid = getpid();
|
||||
entry->p.next = offs(pentry);
|
||||
|
||||
if (plast != NULL) {
|
||||
if (locks->free_process_entries) {
|
||||
pentry = ptr(locks->free_process_entries,process_entry);
|
||||
locks->free_process_entries = pentry->next;
|
||||
} else {
|
||||
if (locks->top+sizeof(process_entry) > pagesize) {
|
||||
pthread_mutex_unlock(&locks->mutex);
|
||||
ipc_toomany();
|
||||
}
|
||||
pentry = ptr(locks->top,process_entry);
|
||||
locks->top += sizeof(process_entry);
|
||||
}
|
||||
*plast = offs(pentry);
|
||||
}
|
||||
pentry->pid = getpid();
|
||||
pentry->next = 0;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&locks->mutex);
|
||||
@@ -184,8 +226,15 @@ pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path)
|
||||
}
|
||||
|
||||
PGF_INTERNAL
|
||||
void ipc_release_file_rwlock(const char* file_path)
|
||||
void ipc_release_file_rwlock(const char* file_path,
|
||||
pthread_rwlock_t *rwlock)
|
||||
{
|
||||
if (file_path == NULL) {
|
||||
pthread_rwlock_destroy(rwlock);
|
||||
free(rwlock);
|
||||
return;
|
||||
}
|
||||
|
||||
if (locks == NULL)
|
||||
return;
|
||||
|
||||
@@ -228,6 +277,7 @@ free_it:
|
||||
process_entry *tmp =
|
||||
ptr(pentry->next, process_entry);
|
||||
*pentry = *tmp;
|
||||
tmp->pid = 0;
|
||||
tmp->next = locks->free_process_entries;
|
||||
locks->free_process_entries = offs(tmp);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ PGF_INTERNAL_DECL
|
||||
pthread_rwlock_t *ipc_new_file_rwlock(const char* file_path);
|
||||
|
||||
PGF_INTERNAL
|
||||
void ipc_release_file_rwlock(const char* file_path);
|
||||
void ipc_release_file_rwlock(const char* file_path,
|
||||
pthread_rwlock_t *rwlock);
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user