robustness by reinitializing the locks if dead processes are found

This commit is contained in:
Krasimir Angelov
2023-02-22 17:41:50 +01:00
parent 476075246d
commit 35e47b9fac
3 changed files with 124 additions and 48 deletions

View File

@@ -116,9 +116,11 @@ struct PGF_INTERNAL_DECL malloc_state
pthread_mutex_t rev_mutex;
pthread_mutex_t write_mutex;
pthread_rwlock_t rwlock;
pid_t writer_pid;
#else
/* Stores a Reader/Writer lock for Windows */
LONG rwlock;
DWORD writer_pid;
#endif
txn_t curr_txn_id;
@@ -415,7 +417,7 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode, size_t init_size) {
throw pgf_error("Invalid file content");
}
cleanup_revisions();
cleanup_state();
}
top = ms->top;
@@ -607,7 +609,7 @@ void PgfDB::unregister_revision(object revision)
#endif
}
void PgfDB::cleanup_revisions()
void PgfDB::cleanup_state()
{
#ifndef _WIN32
pthread_mutex_lock(&ms->rev_mutex);
@@ -619,7 +621,9 @@ void PgfDB::cleanup_revisions()
fprintf(stderr, "revisions");
#endif
ms->min_txn_id = SIZE_MAX;
size_t n_revisions = 0;
txn_t min_txn_id = SIZE_MAX;
bool dead_writer = false;
// If there are dead processes, set their reference counts to 0.
for (size_t i = 0; i < ms->n_revisions; i++) {
revision_entry *entry = &ms->revisions[i];
@@ -646,18 +650,36 @@ void PgfDB::cleanup_revisions()
entry->o & ~MALLOC_ALIGN_MASK,
entry->ref_count);
#endif
if (ms->min_txn_id > entry->txn_id)
ms->min_txn_id = entry->txn_id;
if (min_txn_id > entry->txn_id)
min_txn_id = entry->txn_id;
n_revisions = (i+1);
} else {
entry->ref_count = 0;
if (entry->pid == ms->writer_pid) {
dead_writer = true;
}
}
}
}
ms->n_revisions = n_revisions;
ms->min_txn_id = min_txn_id;
#ifdef DEBUG_MEMORY_ALLOCATOR
fprintf(stderr, " minimal %ld\n", ms->min_txn_id);
#endif
if (dead_writer) {
// The writer has died, cleanup the writer mutex
init_write_mutex();
}
if (ms->n_revisions == 0) {
// This is the only process that has access to the database,
// time to cleanup the rwlock
init_rwlock_mutex();
}
#ifndef _WIN32
pthread_mutex_unlock(&ms->rev_mutex);
#else
@@ -671,6 +693,84 @@ object PgfDB::get_active_revision()
return current_db->ms->active_revision;
}
PGF_INTERNAL
int PgfDB::init_rev_mutex()
{
#ifndef _WIN32
int res;
pthread_mutexattr_t attr;
if ((res = pthread_mutexattr_init(&attr)) != 0) {
return res;
}
if (fd >= 0 &&
(res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
pthread_mutexattr_destroy(&attr);
#endif
return 0;
}
PGF_INTERNAL
int PgfDB::init_write_mutex()
{
#ifndef _WIN32
int res;
pthread_mutexattr_t attr;
if ((res = pthread_mutexattr_init(&attr)) != 0) {
return res;
}
if (fd >= 0 &&
(res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
pthread_mutexattr_destroy(&attr);
#endif
ms->writer_pid = 0;
return 0;
}
PGF_INTERNAL
int PgfDB::init_rwlock_mutex()
{
#ifdef _WIN32
ms->rwlock = 0;
#else
int res;
pthread_rwlockattr_t attr;
if ((res = pthread_rwlockattr_init(&attr)) != 0) {
return res;
}
if (fd >= 0 &&
(res = pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
pthread_rwlockattr_destroy(&attr);
return res;
}
if ((res = pthread_rwlock_init(&ms->rwlock, &attr)) != 0) {
pthread_rwlockattr_destroy(&attr);
return res;
}
pthread_rwlockattr_destroy(&attr);
#endif
return 0;
}
PGF_INTERNAL
int PgfDB::init_state()
{
@@ -679,47 +779,16 @@ int PgfDB::init_state()
ms->top = request2size(1); // we don't want to start from 0
ms->file_size = mmap_size;
#ifdef _WIN32
ms->rwlock = 0;
#else
int res;
{
pthread_mutexattr_t attr;
if ((res = pthread_mutexattr_init(&attr)) != 0) {
return res;
}
if ((res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
return res;
}
if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) {
pthread_mutexattr_destroy(&attr);
return res;
}
pthread_mutexattr_destroy(&attr);
}
if ((res = init_rev_mutex()) != 0)
return res;
{
pthread_rwlockattr_t attr;
if ((res = pthread_rwlockattr_init(&attr)) != 0) {
return res;
}
if (fd >= 0 &&
(res = pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) {
pthread_rwlockattr_destroy(&attr);
return res;
}
if ((res = pthread_rwlock_init(&ms->rwlock, &attr)) != 0) {
pthread_rwlockattr_destroy(&attr);
return res;
}
pthread_rwlockattr_destroy(&attr);
}
#endif
if ((res = init_write_mutex()) != 0)
return res;
if ((res = init_rwlock_mutex()) != 0)
return res;
ms->curr_txn_id = 1;
ms->min_txn_id = 1;
@@ -1450,6 +1519,8 @@ void PgfDB::start_transaction()
WaitForSingleObject(hWriteMutex, INFINITE);
#endif
ms->writer_pid = getpid();
top = ms->top;
free_blocks = ms->free_blocks;
free_descriptors[0] = ms->free_descriptors;
@@ -1541,6 +1612,8 @@ void PgfDB::commit(object o)
}
#endif
ms->writer_pid = 0;
pthread_mutex_unlock(&ms->write_mutex);
#else
if (free_descriptors[2] != 0) {
@@ -1575,6 +1648,8 @@ void PgfDB::commit(object o)
ms->curr_txn_id++;
}
ms->writer_pid = 0;
ReleaseMutex(hWriteMutex);
#endif
@@ -1598,6 +1673,8 @@ void PgfDB::rollback(object o)
last_free_block_size = 0;
last_free_block_txn_id = 0;
ms->writer_pid = 0;
#ifndef _WIN32
if (
#ifndef MREMAP_MAYMOVE

View File

@@ -123,8 +123,6 @@ public:
current_db->free_internal(o.as_object(), sizeof(A)+extra_bytes);
}
PGF_INTERNAL_DECL void cleanup_revisions();
PGF_INTERNAL_DECL object get_active_revision();
PGF_INTERNAL_DECL object register_revision(object o, txn_t txn_id);
PGF_INTERNAL_DECL void unregister_revision(object o);
@@ -145,7 +143,11 @@ public:
}
private:
PGF_INTERNAL_DECL int init_rev_mutex();
PGF_INTERNAL_DECL int init_write_mutex();
PGF_INTERNAL_DECL int init_rwlock_mutex();
PGF_INTERNAL_DECL int init_state();
PGF_INTERNAL_DECL void cleanup_state();
PGF_INTERNAL_DECL size_t block_descr_size(object map);
PGF_INTERNAL_DECL object new_block_descr(object o, size_t size, txn_t txn_id);

View File

@@ -153,9 +153,6 @@ PgfDB *pgf_read_ngf(const char *fpath,
{
DB_scope scope(db, WRITER_SCOPE);
db->cleanup_revisions();
ref<PgfPGF> pgf = db->get_active_revision();
*revision = db->register_revision(pgf.tagged(), PgfDB::get_txn_id());
}