fscache: update fscache to be thread specific instead of global

The threading model for fscache has been to have a single, global cache.
This puts requirements on it to be thread safe so that callers like
preload-index can call it from multiple threads.  This was implemented
with a single mutex and completion events which introduces contention
between the calling threads.

Simplify the threading model by making fscache thread specific.  This allows
us to remove the global mutex and synchronization events entirely and instead
associate a fscache with every thread that requests one. This works well with
the current multi-threading which divides the cache entries into blocks with
a separate thread processing each block.

At the end of each worker thread, if there is a fscache on the primary
thread, merge the cached results from the worker into the primary thread
cache. This enables us to reuse the cache later especially when scanning for
untracked files.

In testing, this reduced the time spent in preload_index() by about 25% and
also reduced the CPU utilization significantly.  On a repo with ~200K files,
it reduced overall status times by ~12%.

Signed-off-by: Ben Peart <benpeart@microsoft.com>
This commit is contained in:
Ben Peart 2018-10-04 15:38:08 -04:00 коммит произвёл Johannes Schindelin
Родитель b7a239eff3
Коммит ba885053a7
4 изменённых файлов: 215 добавлений и 121 удалений

Просмотреть файл

@ -5,14 +5,24 @@
#include "../../dir.h"
#include "config.h"
static int initialized;
static volatile long enabled;
static struct hashmap map;
static volatile long initialized;
static DWORD dwTlsIndex;
static CRITICAL_SECTION mutex;
static unsigned int lstat_requests;
static unsigned int opendir_requests;
static unsigned int fscache_requests;
static unsigned int fscache_misses;
/*
* Store one fscache per thread to avoid thread contention and locking.
* This is ok because multi-threaded access is 1) uncommon and 2) always
* splitting up the cache entries across multiple threads so there isn't
* any overlap between threads anyway.
*/
struct fscache {
volatile long enabled;
struct hashmap map;
unsigned int lstat_requests;
unsigned int opendir_requests;
unsigned int fscache_requests;
unsigned int fscache_misses;
};
static struct trace_key trace_fscache = TRACE_KEY_INIT(FSCACHE);
/*
@ -32,8 +42,6 @@ struct fsentry {
union {
/* Reference count of the directory listing. */
volatile long refcnt;
/* Handle to wait on the loading thread. */
HANDLE hwait;
struct {
/* More stat members (only used for file entries). */
off64_t st_size;
@ -251,86 +259,63 @@ static struct fsentry *fsentry_create_list(const struct fsentry *dir,
/*
* Adds a directory listing to the cache.
*/
static void fscache_add(struct fsentry *fse)
static void fscache_add(struct fscache *cache, struct fsentry *fse)
{
if (fse->list)
fse = fse->list;
for (; fse; fse = fse->next)
hashmap_add(&map, &fse->ent);
hashmap_add(&cache->map, &fse->ent);
}
/*
* Clears the cache.
*/
static void fscache_clear(void)
static void fscache_clear(struct fscache *cache)
{
hashmap_clear_and_free(&map, struct fsentry, ent);
hashmap_init(&map, (hashmap_cmp_fn)fsentry_cmp, NULL, 0);
lstat_requests = opendir_requests = 0;
fscache_misses = fscache_requests = 0;
hashmap_clear_and_free(&cache->map, struct fsentry, ent);
hashmap_init(&cache->map, (hashmap_cmp_fn)fsentry_cmp, NULL, 0);
cache->lstat_requests = cache->opendir_requests = 0;
cache->fscache_misses = cache->fscache_requests = 0;
}
/*
* Checks if the cache is enabled for the given path.
*/
int fscache_enabled(const char *path)
static int do_fscache_enabled(struct fscache *cache, const char *path)
{
return enabled > 0 && !is_absolute_path(path);
return cache->enabled > 0 && !is_absolute_path(path);
}
/*
* Looks up a cache entry, waits if its being loaded by another thread.
* The mutex must be owned by the calling thread.
*/
static struct fsentry *fscache_get_wait(struct fsentry *key)
int fscache_enabled(const char *path)
{
struct fsentry *fse = hashmap_get_entry(&map, key, ent, NULL);
struct fscache *cache = fscache_getcache();
/* return if its a 'real' entry (future entries have refcnt == 0) */
if (!fse || fse->list || fse->u.refcnt)
return fse;
/* create an event and link our key to the future entry */
key->u.hwait = CreateEvent(NULL, TRUE, FALSE, NULL);
key->next = fse->next;
fse->next = key;
/* wait for the loading thread to signal us */
LeaveCriticalSection(&mutex);
WaitForSingleObject(key->u.hwait, INFINITE);
CloseHandle(key->u.hwait);
EnterCriticalSection(&mutex);
/* repeat cache lookup */
return hashmap_get_entry(&map, key, ent, NULL);
return cache ? do_fscache_enabled(cache, path) : 0;
}
/*
* Looks up or creates a cache entry for the specified key.
*/
static struct fsentry *fscache_get(struct fsentry *key)
static struct fsentry *fscache_get(struct fscache *cache, struct fsentry *key)
{
struct fsentry *fse, *future, *waiter;
struct fsentry *fse;
int dir_not_found;
EnterCriticalSection(&mutex);
fscache_requests++;
cache->fscache_requests++;
/* check if entry is in cache */
fse = fscache_get_wait(key);
fse = hashmap_get_entry(&cache->map, key, ent, NULL);
if (fse) {
if (fse->st_mode)
fsentry_addref(fse);
else
fse = NULL; /* non-existing directory */
LeaveCriticalSection(&mutex);
return fse;
}
/* if looking for a file, check if directory listing is in cache */
if (!fse && key->list) {
fse = fscache_get_wait(key->list);
fse = hashmap_get_entry(&cache->map, key->list, ent, NULL);
if (fse) {
LeaveCriticalSection(&mutex);
/*
* dir entry without file entry, or dir does not
* exist -> file doesn't exist
@ -340,25 +325,8 @@ static struct fsentry *fscache_get(struct fsentry *key)
}
}
/* add future entry to indicate that we're loading it */
future = key->list ? key->list : key;
future->next = NULL;
future->u.refcnt = 0;
hashmap_add(&map, &future->ent);
/* create the directory listing (outside mutex!) */
LeaveCriticalSection(&mutex);
fse = fsentry_create_list(future, &dir_not_found);
EnterCriticalSection(&mutex);
/* remove future entry and signal waiting threads */
hashmap_remove(&map, &future->ent, NULL);
waiter = future->next;
while (waiter) {
HANDLE h = waiter->u.hwait;
waiter = waiter->next;
SetEvent(h);
}
/* create the directory listing */
fse = fsentry_create_list(key->list ? key->list : key, &dir_not_found);
/* leave on error (errno set by fsentry_create_list) */
if (!fse) {
@ -372,19 +340,18 @@ static struct fsentry *fscache_get(struct fsentry *key)
key->list->dirent.d_name,
key->list->len);
fse->st_mode = 0;
hashmap_add(&map, &fse->ent);
hashmap_add(&cache->map, &fse->ent);
}
LeaveCriticalSection(&mutex);
return NULL;
}
/* add directory listing to the cache */
fscache_misses++;
fscache_add(fse);
cache->fscache_misses++;
fscache_add(cache, fse);
/* lookup file entry if requested (fse already points to directory) */
if (key->list)
fse = hashmap_get_entry(&map, key, ent, NULL);
fse = hashmap_get_entry(&cache->map, key, ent, NULL);
if (fse && !fse->st_mode)
fse = NULL; /* non-existing directory */
@ -395,59 +362,104 @@ static struct fsentry *fscache_get(struct fsentry *key)
else
errno = ENOENT;
LeaveCriticalSection(&mutex);
return fse;
}
/*
* Enables or disables the cache. Note that the cache is read-only, changes to
* Enables the cache. Note that the cache is read-only, changes to
* the working directory are NOT reflected in the cache while enabled.
*/
int fscache_enable(int enable, size_t initial_size)
int fscache_enable(size_t initial_size)
{
int result;
int fscache;
struct fscache *cache;
int result = 0;
/* allow the cache to be disabled entirely */
fscache = git_env_bool("GIT_TEST_FSCACHE", -1);
if (fscache != -1)
core_fscache = fscache;
if (!core_fscache)
return 0;
/*
* refcount the global fscache initialization so that the
* opendir and lstat function pointers are redirected if
* any threads are using the fscache.
*/
if (!initialized) {
int fscache = git_env_bool("GIT_TEST_FSCACHE", -1);
/* allow the cache to be disabled entirely */
if (fscache != -1)
core_fscache = fscache;
if (!core_fscache)
return 0;
InitializeCriticalSection(&mutex);
lstat_requests = opendir_requests = 0;
fscache_misses = fscache_requests = 0;
if (!dwTlsIndex) {
dwTlsIndex = TlsAlloc();
if (dwTlsIndex == TLS_OUT_OF_INDEXES) {
LeaveCriticalSection(&mutex);
return 0;
}
}
/* redirect opendir and lstat to the fscache implementations */
opendir = fscache_opendir;
lstat = fscache_lstat;
}
InterlockedIncrement(&initialized);
/* refcount the thread specific initialization */
cache = fscache_getcache();
if (cache) {
InterlockedIncrement(&cache->enabled);
} else {
cache = (struct fscache *)xcalloc(1, sizeof(*cache));
cache->enabled = 1;
/*
* avoid having to rehash by leaving room for the parent dirs.
* '4' was determined empirically by testing several repos
*/
hashmap_init(&map, (hashmap_cmp_fn) fsentry_cmp, NULL, initial_size * 4);
initialized = 1;
hashmap_init(&cache->map, (hashmap_cmp_fn)fsentry_cmp, NULL, initial_size * 4);
if (!TlsSetValue(dwTlsIndex, cache))
BUG("TlsSetValue error");
}
result = enable ? InterlockedIncrement(&enabled)
: InterlockedDecrement(&enabled);
trace_printf_key(&trace_fscache, "fscache: enable\n");
return result;
}
if (enable && result == 1) {
/* redirect opendir and lstat to the fscache implementations */
opendir = fscache_opendir;
lstat = fscache_lstat;
} else if (!enable && !result) {
/*
* Disables the cache.
*/
void fscache_disable(void)
{
struct fscache *cache;
if (!core_fscache)
return;
/* update the thread specific fscache initialization */
cache = fscache_getcache();
if (!cache)
BUG("fscache_disable() called on a thread where fscache has not been initialized");
if (!cache->enabled)
BUG("fscache_disable() called on an fscache that is already disabled");
InterlockedDecrement(&cache->enabled);
if (!cache->enabled) {
TlsSetValue(dwTlsIndex, NULL);
trace_printf_key(&trace_fscache, "fscache_disable: lstat %u, opendir %u, "
"total requests/misses %u/%u\n",
cache->lstat_requests, cache->opendir_requests,
cache->fscache_requests, cache->fscache_misses);
fscache_clear(cache);
free(cache);
}
/* update the global fscache initialization */
InterlockedDecrement(&initialized);
if (!initialized) {
/* reset opendir and lstat to the original implementations */
opendir = dirent_opendir;
lstat = mingw_lstat;
EnterCriticalSection(&mutex);
trace_printf_key(&trace_fscache, "fscache: lstat %u, opendir %u, "
"total requests/misses %u/%u\n",
lstat_requests, opendir_requests,
fscache_requests, fscache_misses);
fscache_clear();
LeaveCriticalSection(&mutex);
}
trace_printf_key(&trace_fscache, "fscache: enable(%d)\n", enable);
return result;
trace_printf_key(&trace_fscache, "fscache: disable\n");
return;
}
/*
@ -455,10 +467,10 @@ int fscache_enable(int enable, size_t initial_size)
*/
void fscache_flush(void)
{
if (enabled) {
EnterCriticalSection(&mutex);
fscache_clear();
LeaveCriticalSection(&mutex);
struct fscache *cache = fscache_getcache();
if (cache && cache->enabled) {
fscache_clear(cache);
}
}
@ -471,11 +483,12 @@ int fscache_lstat(const char *filename, struct stat *st)
int dirlen, base, len;
struct heap_fsentry key[2];
struct fsentry *fse;
struct fscache *cache = fscache_getcache();
if (!fscache_enabled(filename))
if (!cache || !do_fscache_enabled(cache, filename))
return mingw_lstat(filename, st);
lstat_requests++;
cache->lstat_requests++;
/* split filename into path + name */
len = strlen(filename);
if (len && is_dir_sep(filename[len - 1]))
@ -488,7 +501,7 @@ int fscache_lstat(const char *filename, struct stat *st)
/* lookup entry for path + name in cache */
fsentry_init(&key[0].u.ent, NULL, filename, dirlen);
fsentry_init(&key[1].u.ent, &key[0].u.ent, filename + base, len - base);
fse = fscache_get(&key[1].u.ent);
fse = fscache_get(cache, &key[1].u.ent);
if (!fse) {
errno = ENOENT;
return -1;
@ -553,11 +566,12 @@ DIR *fscache_opendir(const char *dirname)
struct fsentry *list;
fscache_DIR *dir;
int len;
struct fscache *cache = fscache_getcache();
if (!fscache_enabled(dirname))
if (!cache || !do_fscache_enabled(cache, dirname))
return dirent_opendir(dirname);
opendir_requests++;
cache->opendir_requests++;
/* prepare name (strip trailing '/', replace '.') */
len = strlen(dirname);
if ((len == 1 && dirname[0] == '.') ||
@ -566,7 +580,7 @@ DIR *fscache_opendir(const char *dirname)
/* get directory listing from cache */
fsentry_init(&key.u.ent, NULL, dirname, len);
list = fscache_get(&key.u.ent);
list = fscache_get(cache, &key.u.ent);
if (!list)
return NULL;
@ -577,3 +591,53 @@ DIR *fscache_opendir(const char *dirname)
dir->pfsentry = list;
return (DIR*) dir;
}
struct fscache *fscache_getcache(void)
{
return (struct fscache *)TlsGetValue(dwTlsIndex);
}
void fscache_merge(struct fscache *dest)
{
struct hashmap_iter iter;
struct hashmap_entry *e;
struct fscache *cache = fscache_getcache();
/*
* Only do the merge if fscache was enabled and we have a dest
* cache to merge into.
*/
if (!dest) {
fscache_enable(0);
return;
}
if (!cache)
BUG("fscache_merge() called on a thread where fscache has not been initialized");
TlsSetValue(dwTlsIndex, NULL);
trace_printf_key(&trace_fscache, "fscache_merge: lstat %u, opendir %u, "
"total requests/misses %u/%u\n",
cache->lstat_requests, cache->opendir_requests,
cache->fscache_requests, cache->fscache_misses);
/*
* This is only safe because the primary thread we're merging into
* isn't being used so the critical section only needs to prevent
* the the child threads from stomping on each other.
*/
EnterCriticalSection(&mutex);
hashmap_iter_init(&cache->map, &iter);
while ((e = hashmap_iter_next(&iter)))
hashmap_add(&dest->map, e);
dest->lstat_requests += cache->lstat_requests;
dest->opendir_requests += cache->opendir_requests;
dest->fscache_requests += cache->fscache_requests;
dest->fscache_misses += cache->fscache_misses;
LeaveCriticalSection(&mutex);
free(cache);
InterlockedDecrement(&initialized);
}

Просмотреть файл

@ -1,9 +1,16 @@
#ifndef FSCACHE_H
#define FSCACHE_H
int fscache_enable(int enable, size_t initial_size);
#define enable_fscache(initial_size) fscache_enable(1, initial_size)
#define disable_fscache() fscache_enable(0, 0)
/*
* The fscache is thread specific. enable_fscache() must be called
* for each thread where caching is desired.
*/
int fscache_enable(size_t initial_size);
#define enable_fscache(initial_size) fscache_enable(initial_size)
void fscache_disable(void);
#define disable_fscache() fscache_disable()
int fscache_enabled(const char *path);
#define is_fscache_enabled(path) fscache_enabled(path)
@ -14,4 +21,13 @@ void fscache_flush(void);
DIR *fscache_opendir(const char *dir);
int fscache_lstat(const char *file_name, struct stat *buf);
/* opaque fscache structure */
struct fscache;
struct fscache *fscache_getcache(void);
#define getcache_fscache() fscache_getcache()
void fscache_merge(struct fscache *dest);
#define merge_fscache(dest) fscache_merge(dest)
#endif

Просмотреть файл

@ -1540,6 +1540,10 @@ static inline int is_missing_file_error(int errno_)
* data or even file content without the need to synchronize with the file
* system.
*/
/* opaque fscache structure */
struct fscache;
#ifndef enable_fscache
#define enable_fscache(x) /* noop */
#endif
@ -1556,6 +1560,14 @@ static inline int is_missing_file_error(int errno_)
#define flush_fscache() /* noop */
#endif
#ifndef getcache_fscache
#define getcache_fscache() (NULL) /* noop */
#endif
#ifndef merge_fscache
#define merge_fscache(dest) /* noop */
#endif
int cmd_main(int, const char **);
/*

Просмотреть файл

@ -10,6 +10,8 @@
#include "thread-utils.h"
#include "repository.h"
static struct fscache *fscache;
/*
* Mostly randomly chosen maximum thread counts: we
* cap the parallelism to 20 threads, and we want
@ -47,6 +49,7 @@ static void *preload_thread(void *_data)
nr = index->cache_nr - p->offset;
last_nr = nr;
enable_fscache(nr);
do {
struct cache_entry *ce = *cep++;
struct stat st;
@ -90,6 +93,7 @@ static void *preload_thread(void *_data)
pthread_mutex_unlock(&pd->mutex);
}
cache_def_clear(&cache);
merge_fscache(fscache);
return NULL;
}
@ -105,6 +109,7 @@ void preload_index(struct index_state *index,
if (!HAVE_THREADS || !core_preload_index)
return;
fscache = getcache_fscache();
threads = index->cache_nr / THREAD_COST;
if ((index->cache_nr > 1) && (threads < 2) && git_env_bool("GIT_TEST_PRELOAD_INDEX", 0))
threads = 2;
@ -126,7 +131,6 @@ void preload_index(struct index_state *index,
pthread_mutex_init(&pd.mutex, NULL);
}
enable_fscache(index->cache_nr);
for (i = 0; i < threads; i++) {
struct thread_data *p = data+i;
int err;
@ -162,8 +166,6 @@ void preload_index(struct index_state *index,
trace2_data_intmax("index", NULL, "preload/sum_lstat", t2_sum_lstat);
trace2_region_leave("index", "preload", NULL);
disable_fscache();
}
int repo_read_index_preload(struct repository *repo,