diff --git a/read-cache.c b/read-cache.c index 3ace29d58f..7acc2c86f4 100644 --- a/read-cache.c +++ b/read-cache.c @@ -1720,7 +1720,8 @@ int read_index(struct index_state *istate) return read_index_from(istate, get_index_file(), get_git_dir()); } -static struct cache_entry *create_from_disk(struct index_state *istate, +static struct cache_entry *create_from_disk(struct mem_pool *ce_mem_pool, + unsigned int version, struct ondisk_cache_entry *ondisk, unsigned long *ent_size, const struct cache_entry *previous_ce) @@ -1737,7 +1738,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate, * number of bytes to be stripped from the end of the previous name, * and the bytes to append to the result, to come up with its name. */ - int expand_name_field = istate->version == 4; + int expand_name_field = version == 4; /* On-disk flags are just 16 bits */ flags = get_be16(&ondisk->flags); @@ -1761,16 +1762,17 @@ static struct cache_entry *create_from_disk(struct index_state *istate, const unsigned char *cp = (const unsigned char *)name; size_t strip_len, previous_len; - previous_len = previous_ce ? previous_ce->ce_namelen : 0; + /* If we're at the begining of a block, ignore the previous name */ strip_len = decode_varint(&cp); - if (previous_len < strip_len) { - if (previous_ce) + if (previous_ce) { + previous_len = previous_ce->ce_namelen; + if (previous_len < strip_len) die(_("malformed name field in the index, near path '%s'"), - previous_ce->name); - else - die(_("malformed name field in the index in the first path")); + previous_ce->name); + copy_len = previous_len - strip_len; + } else { + copy_len = 0; } - copy_len = previous_len - strip_len; name = (const char *)cp; } @@ -1780,7 +1782,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate, len += copy_len; } - ce = mem_pool__ce_alloc(istate->ce_mem_pool, len); + ce = mem_pool__ce_alloc(ce_mem_pool, len); ce->ce_stat_data.sd_ctime.sec = get_be32(&ondisk->ctime.sec); ce->ce_stat_data.sd_mtime.sec = get_be32(&ondisk->mtime.sec); @@ -1948,6 +1950,52 @@ static void *load_index_extensions(void *_data) return NULL; } +/* + * A helper function that will load the specified range of cache entries + * from the memory mapped file and add them to the given index. + */ +static unsigned long load_cache_entry_block(struct index_state *istate, + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, + unsigned long start_offset, const struct cache_entry *previous_ce) +{ + int i; + unsigned long src_offset = start_offset; + + for (i = offset; i < offset + nr; i++) { + struct ondisk_cache_entry *disk_ce; + struct cache_entry *ce; + unsigned long consumed; + + disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); + ce = create_from_disk(ce_mem_pool, istate->version, disk_ce, &consumed, previous_ce); + set_index_entry(istate, i, ce); + + src_offset += consumed; + previous_ce = ce; + } + return src_offset - start_offset; +} + +static unsigned long load_all_cache_entries(struct index_state *istate, + const char *mmap, size_t mmap_size, unsigned long src_offset) +{ + unsigned long consumed; + + if (istate->version == 4) { + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size_from_compressed(istate->cache_nr)); + } else { + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size(mmap_size, istate->cache_nr)); + } + + consumed = load_cache_entry_block(istate, istate->ce_mem_pool, + 0, istate->cache_nr, mmap, src_offset, NULL); + return consumed; +} + +#ifndef NO_PTHREADS + /* * Mostly randomly chosen maximum thread counts: we * cap the parallelism to online_cpus() threads, and we want @@ -1957,20 +2005,123 @@ static void *load_index_extensions(void *_data) #define THREAD_COST (10000) +struct load_cache_entries_thread_data +{ + pthread_t pthread; + struct index_state *istate; + struct mem_pool *ce_mem_pool; + int offset; + const char *mmap; + struct index_entry_offset_table *ieot; + int ieot_start; /* starting index into the ieot array */ + int ieot_blocks; /* count of ieot entries to process */ + unsigned long consumed; /* return # of bytes in index file processed */ +}; + +/* + * A thread proc to run the load_cache_entries() computation + * across multiple background threads. + */ +static void *load_cache_entries_thread(void *_data) +{ + struct load_cache_entries_thread_data *p = _data; + int i; + + /* iterate across all ieot blocks assigned to this thread */ + for (i = p->ieot_start; i < p->ieot_start + p->ieot_blocks; i++) { + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, + p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); + p->offset += p->ieot->entries[i].nr; + } + return NULL; +} + +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) +{ + int i, offset, ieot_blocks, ieot_start, err; + struct load_cache_entries_thread_data *data; + unsigned long consumed = 0; + + /* a little sanity checking */ + if (istate->name_hash_initialized) + BUG("the name hash isn't thread safe"); + + mem_pool_init(&istate->ce_mem_pool, 0); + + /* ensure we have no more threads than we have blocks to process */ + if (nr_threads > ieot->nr) + nr_threads = ieot->nr; + data = xcalloc(nr_threads, sizeof(*data)); + + offset = ieot_start = 0; + ieot_blocks = DIV_ROUND_UP(ieot->nr, nr_threads); + for (i = 0; i < nr_threads; i++) { + struct load_cache_entries_thread_data *p = &data[i]; + int nr, j; + + if (ieot_start + ieot_blocks > ieot->nr) + ieot_blocks = ieot->nr - ieot_start; + + p->istate = istate; + p->offset = offset; + p->mmap = mmap; + p->ieot = ieot; + p->ieot_start = ieot_start; + p->ieot_blocks = ieot_blocks; + + /* create a mem_pool for each thread */ + nr = 0; + for (j = p->ieot_start; j < p->ieot_start + p->ieot_blocks; j++) + nr += p->ieot->entries[j].nr; + if (istate->version == 4) { + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size_from_compressed(nr)); + } else { + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size(mmap_size, nr)); + } + + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); + if (err) + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); + + /* increment by the number of cache entries in the ieot block being processed */ + for (j = 0; j < ieot_blocks; j++) + offset += ieot->entries[ieot_start + j].nr; + ieot_start += ieot_blocks; + } + + for (i = 0; i < nr_threads; i++) { + struct load_cache_entries_thread_data *p = &data[i]; + + err = pthread_join(p->pthread, NULL); + if (err) + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); + consumed += p->consumed; + } + + free(data); + + return consumed; +} +#endif + /* remember to discard_cache() before reading a different cache! */ int do_read_index(struct index_state *istate, const char *path, int must_exist) { - int fd, i; + int fd; struct stat st; unsigned long src_offset; const struct cache_header *hdr; const char *mmap; size_t mmap_size; - const struct cache_entry *previous_ce = NULL; struct load_index_extensions p; size_t extension_offset = 0; #ifndef NO_PTHREADS - int nr_threads; + int nr_threads, cpus; + struct index_entry_offset_table *ieot = NULL; #endif if (istate->initialized) @@ -2012,10 +2163,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) p.mmap = mmap; p.mmap_size = mmap_size; + src_offset = sizeof(*hdr); + #ifndef NO_PTHREADS nr_threads = git_config_get_index_threads(); - if (!nr_threads) - nr_threads = online_cpus(); + + /* TODO: does creating more threads than cores help? */ + if (!nr_threads) { + nr_threads = istate->cache_nr / THREAD_COST; + cpus = online_cpus(); + if (nr_threads > cpus) + nr_threads = cpus; + } if (nr_threads > 1) { extension_offset = read_eoie_extension(mmap, mmap_size); @@ -2030,29 +2189,24 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) nr_threads--; } } + + /* + * Locate and read the index entry offset table so that we can use it + * to multi-thread the reading of the cache entries. + */ + if (extension_offset && nr_threads > 1) + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); + + if (ieot) { + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); + free(ieot); + } else { + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); + } +#else + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); #endif - if (istate->version == 4) { - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size_from_compressed(istate->cache_nr)); - } else { - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size(mmap_size, istate->cache_nr)); - } - - src_offset = sizeof(*hdr); - for (i = 0; i < istate->cache_nr; i++) { - struct ondisk_cache_entry *disk_ce; - struct cache_entry *ce; - unsigned long consumed; - - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); - set_index_entry(istate, i, ce); - - src_offset += consumed; - previous_ce = ce; - } istate->timestamp.sec = st.st_mtime; istate->timestamp.nsec = ST_MTIME_NSEC(st); @@ -2549,7 +2703,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile, struct strbuf previous_name_buf = STRBUF_INIT, *previous_name; int drop_cache_tree = istate->drop_cache_tree; off_t offset; - int ieot_blocks = 1; + int ieot_entries = 1; struct index_entry_offset_table *ieot = NULL; int nr, nr_threads; @@ -2602,6 +2756,8 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile, ieot_blocks = cpus - 1; } else { ieot_blocks = nr_threads; + if (ieot_blocks > istate->cache_nr) + ieot_blocks = istate->cache_nr; } /* @@ -2611,7 +2767,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile, if (ieot_blocks > 1) { ieot = xcalloc(1, sizeof(struct index_entry_offset_table) + (ieot_blocks * sizeof(struct index_entry_offset))); - ieot_blocks = DIV_ROUND_UP(entries, ieot_blocks); + ieot_entries = DIV_ROUND_UP(entries, ieot_blocks); } } #endif @@ -2644,7 +2800,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile, drop_cache_tree = 1; } - if (ieot && i && (i % ieot_blocks == 0)) { + if (ieot && i && (i % ieot_entries == 0)) { ieot->entries[ieot->nr].nr = nr; ieot->entries[ieot->nr].offset = offset; ieot->nr++;