diff --git a/src/runtime/c/pgf/db.cxx b/src/runtime/c/pgf/db.cxx index dd3cda30a..f5685d9ce 100644 --- a/src/runtime/c/pgf/db.cxx +++ b/src/runtime/c/pgf/db.cxx @@ -116,9 +116,11 @@ struct PGF_INTERNAL_DECL malloc_state pthread_mutex_t rev_mutex; pthread_mutex_t write_mutex; pthread_rwlock_t rwlock; + pid_t writer_pid; #else /* Stores a Reader/Writer lock for Windows */ LONG rwlock; + DWORD writer_pid; #endif txn_t curr_txn_id; @@ -415,7 +417,7 @@ PgfDB::PgfDB(const char* filepath, int flags, int mode, size_t init_size) { throw pgf_error("Invalid file content"); } - cleanup_revisions(); + cleanup_state(); } top = ms->top; @@ -607,7 +609,7 @@ void PgfDB::unregister_revision(object revision) #endif } -void PgfDB::cleanup_revisions() +void PgfDB::cleanup_state() { #ifndef _WIN32 pthread_mutex_lock(&ms->rev_mutex); @@ -619,7 +621,9 @@ void PgfDB::cleanup_revisions() fprintf(stderr, "revisions"); #endif - ms->min_txn_id = SIZE_MAX; + size_t n_revisions = 0; + txn_t min_txn_id = SIZE_MAX; + bool dead_writer = false; // If there are dead processes, set their reference counts to 0. for (size_t i = 0; i < ms->n_revisions; i++) { revision_entry *entry = &ms->revisions[i]; @@ -646,18 +650,36 @@ void PgfDB::cleanup_revisions() entry->o & ~MALLOC_ALIGN_MASK, entry->ref_count); #endif - if (ms->min_txn_id > entry->txn_id) - ms->min_txn_id = entry->txn_id; + if (min_txn_id > entry->txn_id) + min_txn_id = entry->txn_id; + n_revisions = (i+1); } else { entry->ref_count = 0; + if (entry->pid == ms->writer_pid) { + dead_writer = true; + } } } } + ms->n_revisions = n_revisions; + ms->min_txn_id = min_txn_id; + #ifdef DEBUG_MEMORY_ALLOCATOR fprintf(stderr, " minimal %ld\n", ms->min_txn_id); #endif + if (dead_writer) { + // The writer has died, cleanup the writer mutex + init_write_mutex(); + } + + if (ms->n_revisions == 0) { + // This is the only process that has access to the database, + // time to cleanup the rwlock + init_rwlock_mutex(); + } + #ifndef _WIN32 pthread_mutex_unlock(&ms->rev_mutex); #else @@ -671,6 +693,84 @@ object PgfDB::get_active_revision() return current_db->ms->active_revision; } +PGF_INTERNAL +int PgfDB::init_rev_mutex() +{ +#ifndef _WIN32 + int res; + + pthread_mutexattr_t attr; + if ((res = pthread_mutexattr_init(&attr)) != 0) { + return res; + } + if (fd >= 0 && + (res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { + pthread_mutexattr_destroy(&attr); + return res; + } + if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) { + pthread_mutexattr_destroy(&attr); + return res; + } + pthread_mutexattr_destroy(&attr); +#endif + + return 0; +} + +PGF_INTERNAL +int PgfDB::init_write_mutex() +{ +#ifndef _WIN32 + int res; + + pthread_mutexattr_t attr; + if ((res = pthread_mutexattr_init(&attr)) != 0) { + return res; + } + if (fd >= 0 && + (res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { + pthread_mutexattr_destroy(&attr); + return res; + } + if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) { + pthread_mutexattr_destroy(&attr); + return res; + } + pthread_mutexattr_destroy(&attr); +#endif + + ms->writer_pid = 0; + + return 0; +} + +PGF_INTERNAL +int PgfDB::init_rwlock_mutex() +{ +#ifdef _WIN32 + ms->rwlock = 0; +#else + int res; + + pthread_rwlockattr_t attr; + if ((res = pthread_rwlockattr_init(&attr)) != 0) { + return res; + } + if (fd >= 0 && + (res = pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { + pthread_rwlockattr_destroy(&attr); + return res; + } + if ((res = pthread_rwlock_init(&ms->rwlock, &attr)) != 0) { + pthread_rwlockattr_destroy(&attr); + return res; + } + pthread_rwlockattr_destroy(&attr); +#endif + return 0; +} + PGF_INTERNAL int PgfDB::init_state() { @@ -679,47 +779,16 @@ int PgfDB::init_state() ms->top = request2size(1); // we don't want to start from 0 ms->file_size = mmap_size; -#ifdef _WIN32 - ms->rwlock = 0; -#else int res; - { - pthread_mutexattr_t attr; - if ((res = pthread_mutexattr_init(&attr)) != 0) { - return res; - } - if ((res = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { - return res; - } - if ((res = pthread_mutex_init(&ms->rev_mutex, &attr)) != 0) { - pthread_mutexattr_destroy(&attr); - return res; - } - if ((res = pthread_mutex_init(&ms->write_mutex, &attr)) != 0) { - pthread_mutexattr_destroy(&attr); - return res; - } - pthread_mutexattr_destroy(&attr); - } + if ((res = init_rev_mutex()) != 0) + return res; - { - pthread_rwlockattr_t attr; - if ((res = pthread_rwlockattr_init(&attr)) != 0) { - return res; - } - if (fd >= 0 && - (res = pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) != 0) { - pthread_rwlockattr_destroy(&attr); - return res; - } - if ((res = pthread_rwlock_init(&ms->rwlock, &attr)) != 0) { - pthread_rwlockattr_destroy(&attr); - return res; - } - pthread_rwlockattr_destroy(&attr); - } -#endif + if ((res = init_write_mutex()) != 0) + return res; + + if ((res = init_rwlock_mutex()) != 0) + return res; ms->curr_txn_id = 1; ms->min_txn_id = 1; @@ -1450,6 +1519,8 @@ void PgfDB::start_transaction() WaitForSingleObject(hWriteMutex, INFINITE); #endif + ms->writer_pid = getpid(); + top = ms->top; free_blocks = ms->free_blocks; free_descriptors[0] = ms->free_descriptors; @@ -1541,6 +1612,8 @@ void PgfDB::commit(object o) } #endif + ms->writer_pid = 0; + pthread_mutex_unlock(&ms->write_mutex); #else if (free_descriptors[2] != 0) { @@ -1575,6 +1648,8 @@ void PgfDB::commit(object o) ms->curr_txn_id++; } + ms->writer_pid = 0; + ReleaseMutex(hWriteMutex); #endif @@ -1598,6 +1673,8 @@ void PgfDB::rollback(object o) last_free_block_size = 0; last_free_block_txn_id = 0; + ms->writer_pid = 0; + #ifndef _WIN32 if ( #ifndef MREMAP_MAYMOVE diff --git a/src/runtime/c/pgf/db.h b/src/runtime/c/pgf/db.h index d48dbb9d4..4106cca6a 100644 --- a/src/runtime/c/pgf/db.h +++ b/src/runtime/c/pgf/db.h @@ -123,8 +123,6 @@ public: current_db->free_internal(o.as_object(), sizeof(A)+extra_bytes); } - PGF_INTERNAL_DECL void cleanup_revisions(); - PGF_INTERNAL_DECL object get_active_revision(); PGF_INTERNAL_DECL object register_revision(object o, txn_t txn_id); PGF_INTERNAL_DECL void unregister_revision(object o); @@ -145,7 +143,11 @@ public: } private: + PGF_INTERNAL_DECL int init_rev_mutex(); + PGF_INTERNAL_DECL int init_write_mutex(); + PGF_INTERNAL_DECL int init_rwlock_mutex(); PGF_INTERNAL_DECL int init_state(); + PGF_INTERNAL_DECL void cleanup_state(); PGF_INTERNAL_DECL size_t block_descr_size(object map); PGF_INTERNAL_DECL object new_block_descr(object o, size_t size, txn_t txn_id); diff --git a/src/runtime/c/pgf/pgf.cxx b/src/runtime/c/pgf/pgf.cxx index c59154211..bfd45e4d5 100644 --- a/src/runtime/c/pgf/pgf.cxx +++ b/src/runtime/c/pgf/pgf.cxx @@ -153,9 +153,6 @@ PgfDB *pgf_read_ngf(const char *fpath, { DB_scope scope(db, WRITER_SCOPE); - - db->cleanup_revisions(); - ref pgf = db->get_active_revision(); *revision = db->register_revision(pgf.tagged(), PgfDB::get_txn_id()); }