locking cleanup: convert SharedMemCache to use ScopedMutex
This commit is contained in:
@@ -335,9 +335,8 @@ template<size_t kBlockSize>
|
||||
GoogleString SharedMemCache<kBlockSize>::DumpStats() {
|
||||
SectorStats aggregate;
|
||||
for (size_t c = 0; c < sectors_.size(); ++c) {
|
||||
sectors_[c]->mutex()->Lock();
|
||||
ScopedMutex lock(sectors_[c]->mutex());
|
||||
aggregate.Add(*sectors_[c]->sector_stats());
|
||||
sectors_[c]->mutex()->Unlock();
|
||||
}
|
||||
|
||||
return aggregate.Dump(entries_per_sector_* num_sectors_,
|
||||
@@ -352,11 +351,10 @@ bool SharedMemCache<kBlockSize>::AddSectorToSnapshot(
|
||||
|
||||
Sector<kBlockSize>* sector = sectors_[sector_num];
|
||||
SectorStats* stats = sector->sector_stats();
|
||||
sector->mutex()->Lock();
|
||||
ScopedMutex lock(sector->mutex());
|
||||
DCHECK(!(last_checkpoint_ms > stats->last_checkpoint_ms));
|
||||
if (last_checkpoint_ms < stats->last_checkpoint_ms) {
|
||||
// Another thread already snapshotted this sector; do nothing.
|
||||
sector->mutex()->Unlock();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -389,7 +387,6 @@ bool SharedMemCache<kBlockSize>::AddSectorToSnapshot(
|
||||
}
|
||||
|
||||
stats->last_checkpoint_ms = timer_->NowMs();
|
||||
sector->mutex()->Unlock();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -472,7 +469,8 @@ void SharedMemCache<kBlockSize>::PutRawHash(const GoogleString& raw_hash,
|
||||
|
||||
Sector<kBlockSize>* sector = sectors_[pos.sector];
|
||||
SectorStats* stats = sector->sector_stats();
|
||||
sector->mutex()->Lock();
|
||||
|
||||
ScopedMutex lock(sector->mutex());
|
||||
++stats->num_put;
|
||||
int64 last_checkpoint_ms = stats->last_checkpoint_ms;
|
||||
|
||||
@@ -493,7 +491,6 @@ void SharedMemCache<kBlockSize>::PutRawHash(const GoogleString& raw_hash,
|
||||
last_checkpoint_ms, pos.sector);
|
||||
} else {
|
||||
++stats->num_put_concurrent_create;
|
||||
sector->mutex()->Unlock();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -519,7 +516,6 @@ void SharedMemCache<kBlockSize>::PutRawHash(const GoogleString& raw_hash,
|
||||
if (best_key == kInvalidEntry) {
|
||||
// All slots busy. Giving up.
|
||||
++stats->num_put_concurrent_full_set;
|
||||
sector->mutex()->Unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -634,6 +630,7 @@ void SharedMemCache<kBlockSize>::RestoreFromDisk() {
|
||||
// cache at all. This is fine; restoring the snapshots is best-effort.
|
||||
}
|
||||
|
||||
// Expects sector->mutex() held on entry, leaves it held on exit.
|
||||
template<size_t kBlockSize>
|
||||
void SharedMemCache<kBlockSize>::PutIntoEntry(
|
||||
Sector<kBlockSize>* sector, EntryNum entry_num,
|
||||
@@ -659,7 +656,6 @@ void SharedMemCache<kBlockSize>::PutIntoEntry(
|
||||
sector->ReturnBlocksToFreeList(blocks);
|
||||
entry->creating = false;
|
||||
MarkEntryFree(sector, entry_num);
|
||||
sector->mutex()->Unlock();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -686,21 +682,18 @@ void SharedMemCache<kBlockSize>::PutIntoEntry(
|
||||
entry->first_block = kInvalidBlock;
|
||||
}
|
||||
|
||||
sector->mutex()->Unlock();
|
||||
|
||||
// Now we can write out the data. We can do it w/o the lock held
|
||||
// Now we can write out the data. We can release the lock while we do that,
|
||||
// since we've already removed them from the freelist, and the LRU/directory
|
||||
// entry is locked, so can't be concurrently freed.
|
||||
|
||||
sector->mutex()->Unlock();
|
||||
for (size_t b = 0; b < want_blocks; ++b) {
|
||||
size_t bytes = sector->BytesInPortion(entry->byte_size, b, want_blocks);
|
||||
std::memcpy(sector->BlockBytes(blocks[b]), data + b * kBlockSize, bytes);
|
||||
}
|
||||
sector->mutex()->Lock();
|
||||
|
||||
// We're done, clear creating bit.
|
||||
sector->mutex()->Lock();
|
||||
entry->creating = false;
|
||||
sector->mutex()->Unlock();
|
||||
}
|
||||
|
||||
template<size_t kBlockSize>
|
||||
@@ -709,37 +702,38 @@ void SharedMemCache<kBlockSize>::Get(const GoogleString& key,
|
||||
GoogleString raw_hash = ToRawHash(key);
|
||||
Position pos;
|
||||
ExtractPosition(raw_hash, &pos);
|
||||
CacheInterface::KeyState key_state = kNotFound;
|
||||
Sector<kBlockSize>* sector = sectors_[pos.sector];
|
||||
sector->mutex()->Lock();
|
||||
SectorStats* stats = sector->sector_stats();
|
||||
++stats->num_get;
|
||||
{
|
||||
ScopedMutex lock(sector->mutex());
|
||||
SectorStats* stats = sector->sector_stats();
|
||||
++stats->num_get;
|
||||
|
||||
for (int p = 0; p < kAssociativity; ++p) {
|
||||
EntryNum cand_key = pos.keys[p];
|
||||
CacheEntry* cand = sector->EntryAt(cand_key);
|
||||
if (KeyMatch(cand, raw_hash)) {
|
||||
++stats->num_get_hit;
|
||||
GetFromEntry(key, sector, cand_key, callback);
|
||||
return;
|
||||
for (int p = 0; p < kAssociativity; ++p) {
|
||||
EntryNum cand_key = pos.keys[p];
|
||||
CacheEntry* cand = sector->EntryAt(cand_key);
|
||||
if (KeyMatch(cand, raw_hash)) {
|
||||
++stats->num_get_hit;
|
||||
key_state = GetFromEntry(key, sector, cand_key, callback->value());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sector->mutex()->Unlock();
|
||||
ValidateAndReportResult(key, kNotFound, callback);
|
||||
ValidateAndReportResult(key, key_state, callback);
|
||||
}
|
||||
|
||||
// Expects sector->mutex() held on entry, leaves it held on exit.
|
||||
template<size_t kBlockSize>
|
||||
void SharedMemCache<kBlockSize>::GetFromEntry(
|
||||
CacheInterface::KeyState SharedMemCache<kBlockSize>::GetFromEntry(
|
||||
const GoogleString& key,
|
||||
Sector<kBlockSize>* sector,
|
||||
EntryNum entry_num,
|
||||
Callback* callback) {
|
||||
SharedString* out) {
|
||||
CacheEntry* entry = sector->EntryAt(entry_num);
|
||||
if (entry->creating) {
|
||||
// For now, consider concurrent creation a miss.
|
||||
sector->mutex()->Unlock();
|
||||
ValidateAndReportResult(key, kNotFound, callback);
|
||||
return;
|
||||
return kNotFound;
|
||||
}
|
||||
++entry->open_count;
|
||||
|
||||
@@ -748,11 +742,10 @@ void SharedMemCache<kBlockSize>::GetFromEntry(
|
||||
BlockVector blocks;
|
||||
sector->BlockListForEntry(entry, &blocks);
|
||||
|
||||
// Drop the lock as the entry is now opened for reading.
|
||||
// We can release the lock while we do the read, as the entry is now open for
|
||||
// reading.
|
||||
sector->mutex()->Unlock();
|
||||
|
||||
// Collect the contents.
|
||||
SharedString* out = callback->value();
|
||||
out->DetachAndClear();
|
||||
out->Extend(entry->byte_size);
|
||||
|
||||
@@ -763,13 +756,12 @@ void SharedMemCache<kBlockSize>::GetFromEntry(
|
||||
out->WriteAt(pos, sector->BlockBytes(blocks[b]), bytes);
|
||||
pos += bytes;
|
||||
}
|
||||
sector->mutex()->Lock();
|
||||
|
||||
// Now reduce the reference count.
|
||||
// TODO(morlovich): atomic ops?
|
||||
sector->mutex()->Lock();
|
||||
--entry->open_count;
|
||||
sector->mutex()->Unlock();
|
||||
ValidateAndReportResult(key, kAvailable, callback);
|
||||
|
||||
return kAvailable;
|
||||
}
|
||||
|
||||
template<size_t kBlockSize>
|
||||
@@ -779,7 +771,7 @@ void SharedMemCache<kBlockSize>::Delete(const GoogleString& key) {
|
||||
ExtractPosition(raw_hash, &pos);
|
||||
|
||||
Sector<kBlockSize>* sector = sectors_[pos.sector];
|
||||
sector->mutex()->Lock();
|
||||
ScopedMutex lock(sector->mutex());
|
||||
|
||||
for (int p = 0; p < kAssociativity; ++p) {
|
||||
EntryNum cand_key = pos.keys[p];
|
||||
@@ -788,10 +780,9 @@ void SharedMemCache<kBlockSize>::Delete(const GoogleString& key) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
sector->mutex()->Unlock();
|
||||
}
|
||||
|
||||
// Called with lock held.
|
||||
template<size_t kBlockSize>
|
||||
void SharedMemCache<kBlockSize>::DeleteEntry(Sector<kBlockSize>* sector,
|
||||
EntryNum entry_num) {
|
||||
@@ -800,7 +791,6 @@ void SharedMemCache<kBlockSize>::DeleteEntry(Sector<kBlockSize>* sector,
|
||||
// A multiple writers (Put or Delete) race. Let the other one proceed,
|
||||
// drop this one. (Call to EnsureReadyForWriting below will deal with any
|
||||
// outstanding readers).
|
||||
sector->mutex()->Unlock();
|
||||
return;
|
||||
}
|
||||
EnsureReadyForWriting(sector, entry);
|
||||
@@ -809,14 +799,13 @@ void SharedMemCache<kBlockSize>::DeleteEntry(Sector<kBlockSize>* sector,
|
||||
sector->ReturnBlocksToFreeList(blocks);
|
||||
entry->creating = false;
|
||||
MarkEntryFree(sector, entry_num);
|
||||
sector->mutex()->Unlock();
|
||||
}
|
||||
|
||||
template<size_t kBlockSize>
|
||||
void SharedMemCache<kBlockSize>::SanityCheck() {
|
||||
for (int i = 0; i < num_sectors_; ++i) {
|
||||
Sector<kBlockSize>* sector = sectors_[i];
|
||||
sector->mutex()->Lock();
|
||||
ScopedMutex lock(sector->mutex());
|
||||
|
||||
// Make sure that all blocks are accounted for exactly once.
|
||||
|
||||
@@ -844,8 +833,6 @@ void SharedMemCache<kBlockSize>::SanityCheck() {
|
||||
i != block_occur.end(); ++i) {
|
||||
CHECK_EQ(1, i->second);
|
||||
}
|
||||
|
||||
sector->mutex()->Unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -178,27 +178,26 @@ class SharedMemCache : public CacheInterface {
|
||||
void PutRawHash(const GoogleString& raw_hash, int64 last_use_timestamp_ms,
|
||||
SharedString* value, bool checkpoint_ok);
|
||||
|
||||
// Finish a get, with the entry matching and sector lock held.
|
||||
// Releases lock when done.
|
||||
void GetFromEntry(const GoogleString& key,
|
||||
SharedMemCacheData::Sector<kBlockSize>* sector,
|
||||
SharedMemCacheData::EntryNum entry_num, Callback* callback)
|
||||
UNLOCK_FUNCTION(sector->mutex());
|
||||
// Finish a get, with the entry matching and sector lock held. Releases lock
|
||||
// while performing the read, but takes it again before returning.
|
||||
CacheInterface::KeyState GetFromEntry(
|
||||
const GoogleString& key,
|
||||
SharedMemCacheData::Sector<kBlockSize>* sector,
|
||||
SharedMemCacheData::EntryNum entry_num,
|
||||
SharedString* out) EXCLUSIVE_LOCKS_REQUIRED(sector->mutex());
|
||||
|
||||
// Finish a put into the given entry. Lock is expected to be held at entry,
|
||||
// will be released when done. The hash in the entry must also be already
|
||||
// correct at time of entry.
|
||||
// will still be held when done. The hash in the entry must also be already
|
||||
// correct at time of call.
|
||||
void PutIntoEntry(SharedMemCacheData::Sector<kBlockSize>* sector,
|
||||
SharedMemCacheData::EntryNum entry_num,
|
||||
int64 last_use_timestamp_ms, SharedString* value)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(sector->mutex())
|
||||
UNLOCK_FUNCTION(sector->mutex());
|
||||
EXCLUSIVE_LOCKS_REQUIRED(sector->mutex());
|
||||
|
||||
// Finish a delete, with the entry matching and sector lock held.
|
||||
// Releases lock when done.
|
||||
void DeleteEntry(SharedMemCacheData::Sector<kBlockSize>* sector,
|
||||
SharedMemCacheData::EntryNum entry_num)
|
||||
UNLOCK_FUNCTION(sector->mutex());
|
||||
EXCLUSIVE_LOCKS_REQUIRED(sector->mutex());
|
||||
|
||||
// Attempts to allocate at least the given number of blocks, and appends any
|
||||
// blocks it manages to allocate to *blocks. Returns whether successful.
|
||||
|
||||
Reference in New Issue
Block a user