Merge branch 'mt/parallel-checkout-part-2'

The checkout machinery has been taught to perform the actual
write-out of the files in parallel when able.

* mt/parallel-checkout-part-2:
  parallel-checkout: add design documentation
  parallel-checkout: support progress displaying
  parallel-checkout: add configuration options
  parallel-checkout: make it truly parallel
  unpack-trees: add basic support for parallel checkout
This commit is contained in:
Junio C Hamano 2021-04-30 13:50:26 +09:00
Родитель 59bb0aa93e 68e66f2987
Коммит a1cac26cc6
12 изменённых файлов: 1240 добавлений и 5 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -33,6 +33,7 @@
/git-check-mailmap
/git-check-ref-format
/git-checkout
/git-checkout--worker
/git-checkout-index
/git-cherry
/git-cherry-pick

Просмотреть файл

@ -91,6 +91,7 @@ TECH_DOCS += technical/multi-pack-index
TECH_DOCS += technical/pack-format
TECH_DOCS += technical/pack-heuristics
TECH_DOCS += technical/pack-protocol
TECH_DOCS += technical/parallel-checkout
TECH_DOCS += technical/partial-clone
TECH_DOCS += technical/protocol-capabilities
TECH_DOCS += technical/protocol-common

Просмотреть файл

@ -21,3 +21,24 @@ checkout.guess::
Provides the default value for the `--guess` or `--no-guess`
option in `git checkout` and `git switch`. See
linkgit:git-switch[1] and linkgit:git-checkout[1].
checkout.workers::
The number of parallel workers to use when updating the working tree.
The default is one, i.e. sequential execution. If set to a value less
than one, Git will use as many workers as the number of logical cores
available. This setting and `checkout.thresholdForParallelism` affect
all commands that perform checkout. E.g. checkout, clone, reset,
sparse-checkout, etc.
+
Note: parallel checkout usually delivers better performance for repositories
located on SSDs or over NFS. For repositories on spinning disks and/or machines
with a small number of cores, the default sequential checkout often performs
better. The size and compression level of a repository might also influence how
well the parallel version performs.
checkout.thresholdForParallelism::
When running parallel checkout with a small number of files, the cost
of subprocess spawning and inter-process communication might outweigh
the parallelization gains. This setting allows to define the minimum
number of files for which parallel checkout should be attempted. The
default is 100.

Просмотреть файл

@ -0,0 +1,270 @@
Parallel Checkout Design Notes
==============================
The "Parallel Checkout" feature attempts to use multiple processes to
parallelize the work of uncompressing the blobs, applying in-core
filters, and writing the resulting contents to the working tree during a
checkout operation. It can be used by all checkout-related commands,
such as `clone`, `checkout`, `reset`, `sparse-checkout`, and others.
These commands share the following basic structure:
* Step 1: Read the current index file into memory.
* Step 2: Modify the in-memory index based upon the command, and
temporarily mark all cache entries that need to be updated.
* Step 3: Populate the working tree to match the new candidate index.
This includes iterating over all of the to-be-updated cache entries
and delete, create, or overwrite the associated files in the working
tree.
* Step 4: Write the new index to disk.
Step 3 is the focus of the "parallel checkout" effort described here.
Sequential Implementation
-------------------------
For the purposes of discussion here, the current sequential
implementation of Step 3 is divided in 3 parts, each one implemented in
its own function:
* Step 3a: `unpack-trees.c:check_updates()` contains a series of
sequential loops iterating over the `cache_entry`'s array. The main
loop in this function calls the Step 3b function for each of the
to-be-updated entries.
* Step 3b: `entry.c:checkout_entry()` examines the existing working tree
for file conflicts, collisions, and unsaved changes. It removes files
and creates leading directories as necessary. It calls the Step 3c
function for each entry to be written.
* Step 3c: `entry.c:write_entry()` loads the blob into memory, smudges
it if necessary, creates the file in the working tree, writes the
smudged contents, calls `fstat()` or `lstat()`, and updates the
associated `cache_entry` struct with the stat information gathered.
It wouldn't be safe to perform Step 3b in parallel, as there could be
race conditions between file creations and removals. Instead, the
parallel checkout framework lets the sequential code handle Step 3b,
and uses parallel workers to replace the sequential
`entry.c:write_entry()` calls from Step 3c.
Rejected Multi-Threaded Solution
--------------------------------
The most "straightforward" implementation would be to spread the set of
to-be-updated cache entries across multiple threads. But due to the
thread-unsafe functions in the ODB code, we would have to use locks to
coordinate the parallel operation. An early prototype of this solution
showed that the multi-threaded checkout would bring performance
improvements over the sequential code, but there was still too much lock
contention. A `perf` profiling indicated that around 20% of the runtime
during a local Linux clone (on an SSD) was spent in locking functions.
For this reason this approach was rejected in favor of using multiple
child processes, which led to a better performance.
Multi-Process Solution
----------------------
Parallel checkout alters the aforementioned Step 3 to use multiple
`checkout--worker` background processes to distribute the work. The
long-running worker processes are controlled by the foreground Git
command using the existing run-command API.
Overview
~~~~~~~~
Step 3b is only slightly altered; for each entry to be checked out, the
main process performs the following steps:
* M1: Check whether there is any untracked or unclean file in the
working tree which would be overwritten by this entry, and decide
whether to proceed (removing the file(s)) or not.
* M2: Create the leading directories.
* M3: Load the conversion attributes for the entry's path.
* M4: Check, based on the entry's type and conversion attributes,
whether the entry is eligible for parallel checkout (more on this
later). If it is eligible, enqueue the entry and the loaded
attributes to later write the entry in parallel. If not, write the
entry right away, using the default sequential code.
Note: we save the conversion attributes associated with each entry
because the workers don't have access to the main process' index state,
so they can't load the attributes by themselves (and the attributes are
needed to properly smudge the entry). Additionally, this has a positive
impact on performance as (1) we don't need to load the attributes twice
and (2) the attributes machinery is optimized to handle paths in
sequential order.
After all entries have passed through the above steps, the main process
checks if the number of enqueued entries is sufficient to spread among
the workers. If not, it just writes them sequentially. Otherwise, it
spawns the workers and distributes the queued entries uniformly in
continuous chunks. This aims to minimize the chances of two workers
writing to the same directory simultaneously, which could increase lock
contention in the kernel.
Then, for each assigned item, each worker:
* W1: Checks if there is any non-directory file in the leading part of
the entry's path or if there already exists a file at the entry' path.
If so, mark the entry with `PC_ITEM_COLLIDED` and skip it (more on
this later).
* W2: Creates the file (with O_CREAT and O_EXCL).
* W3: Loads the blob into memory (inflating and delta reconstructing
it).
* W4: Applies any required in-process filter, like end-of-line
conversion and re-encoding.
* W5: Writes the result to the file descriptor opened at W2.
* W6: Calls `fstat()` or lstat()` on the just-written path, and sends
the result back to the main process, together with the end status of
the operation and the item's identification number.
Note that, when possible, steps W3 to W5 are delegated to the streaming
machinery, removing the need to keep the entire blob in memory.
If the worker fails to read the blob or to write it to the working tree,
it removes the created file to avoid leaving empty files behind. This is
the *only* time a worker is allowed to remove a file.
As mentioned earlier, it is the responsibility of the main process to
remove any file that blocks the checkout operation (or abort if the
removal(s) would cause data loss and the user didn't ask to `--force`).
This is crucial to avoid race conditions and also to properly detect
path collisions at Step W1.
After the workers finish writing the items and sending back the required
information, the main process handles the results in two steps:
- First, it updates the in-memory index with the `lstat()` information
sent by the workers. (This must be done first as this information
might me required in the following step.)
- Then it writes the items which collided on disk (i.e. items marked
with `PC_ITEM_COLLIDED`). More on this below.
Path Collisions
---------------
Path collisions happen when two different paths correspond to the same
entry in the file system. E.g. the paths 'a' and 'A' would collide in a
case-insensitive file system.
The sequential checkout deals with collisions in the same way that it
deals with files that were already present in the working tree before
checkout. Basically, it checks if the path that it wants to write
already exists on disk, makes sure the existing file doesn't have
unsaved data, and then overwrites it. (To be more pedantic: it deletes
the existing file and creates the new one.) So, if there are multiple
colliding files to be checked out, the sequential code will write each
one of them but only the last will actually survive on disk.
Parallel checkout aims to reproduce the same behavior. However, we
cannot let the workers racily write to the same file on disk. Instead,
the workers detect when the entry that they want to check out would
collide with an existing file, and mark it with `PC_ITEM_COLLIDED`.
Later, the main process can sequentially feed these entries back to
`checkout_entry()` without the risk of race conditions. On clone, this
also has the effect of marking the colliding entries to later emit a
warning for the user, like the classic sequential checkout does.
The workers are able to detect both collisions among the entries being
concurrently written and collisions between a parallel-eligible entry
and an ineligible entry. The general idea for collision detection is
quite straightforward: for each parallel-eligible entry, the main
process must remove all files that prevent this entry from being written
(before enqueueing it). This includes any non-directory file in the
leading path of the entry. Later, when a worker gets assigned the entry,
it looks again for the non-directories files and for an already existing
file at the entry's path. If any of these checks finds something, the
worker knows that there was a path collision.
Because parallel checkout can distinguish path collisions from the case
where the file was already present in the working tree before checkout,
we could alternatively choose to skip the checkout of colliding entries.
However, each entry that doesn't get written would have NULL `lstat()`
fields on the index. This could cause performance penalties for
subsequent commands that need to refresh the index, as they would have
to go to the file system to see if the entry is dirty. Thus, if we have
N entries in a colliding group and we decide to write and `lstat()` only
one of them, every subsequent `git-status` will have to read, convert,
and hash the written file N - 1 times. By checking out all colliding
entries (like the sequential code does), we only pay the overhead once,
during checkout.
Eligible Entries for Parallel Checkout
--------------------------------------
As previously mentioned, not all entries passed to `checkout_entry()`
will be considered eligible for parallel checkout. More specifically, we
exclude:
- Symbolic links; to avoid race conditions that, in combination with
path collisions, could cause workers to write files at the wrong
place. For example, if we were to concurrently check out a symlink
'a' -> 'b' and a regular file 'A/f' in a case-insensitive file system,
we could potentially end up writing the file 'A/f' at 'a/f', due to a
race condition.
- Regular files that require external filters (either "one shot" filters
or long-running process filters). These filters are black-boxes to Git
and may have their own internal locking or non-concurrent assumptions.
So it might not be safe to run multiple instances in parallel.
+
Besides, long-running filters may use the delayed checkout feature to
postpone the return of some filtered blobs. The delayed checkout queue
and the parallel checkout queue are not compatible and should remain
separate.
+
Note: regular files that only require internal filters, like end-of-line
conversion and re-encoding, are eligible for parallel checkout.
Ineligible entries are checked out by the classic sequential codepath
*before* spawning workers.
Note: submodules's files are also eligible for parallel checkout (as
long as they don't fall into any of the excluding categories mentioned
above). But since each submodule is checked out in its own child
process, we don't mix the superproject's and the submodules' files in
the same parallel checkout process or queue.
The API
-------
The parallel checkout API was designed with the goal of minimizing
changes to the current users of the checkout machinery. This means that
they don't have to call a different function for sequential or parallel
checkout. As already mentioned, `checkout_entry()` will automatically
insert the given entry in the parallel checkout queue when this feature
is enabled and the entry is eligible; otherwise, it will just write the
entry right away, using the sequential code. In general, callers of the
parallel checkout API should look similar to this:
----------------------------------------------
int pc_workers, pc_threshold, err = 0;
struct checkout state;
get_parallel_checkout_configs(&pc_workers, &pc_threshold);
/*
* This check is not strictly required, but it
* should save some time in sequential mode.
*/
if (pc_workers > 1)
init_parallel_checkout();
for (each cache_entry ce to-be-updated)
err |= checkout_entry(ce, &state, NULL, NULL);
err |= run_parallel_checkout(&state, pc_workers, pc_threshold, NULL, NULL);
----------------------------------------------

Просмотреть файл

@ -948,6 +948,7 @@ LIB_OBJS += pack-revindex.o
LIB_OBJS += pack-write.o
LIB_OBJS += packfile.o
LIB_OBJS += pager.o
LIB_OBJS += parallel-checkout.o
LIB_OBJS += parse-options-cb.o
LIB_OBJS += parse-options.o
LIB_OBJS += patch-delta.o
@ -1064,6 +1065,7 @@ BUILTIN_OBJS += builtin/check-attr.o
BUILTIN_OBJS += builtin/check-ignore.o
BUILTIN_OBJS += builtin/check-mailmap.o
BUILTIN_OBJS += builtin/check-ref-format.o
BUILTIN_OBJS += builtin/checkout--worker.o
BUILTIN_OBJS += builtin/checkout-index.o
BUILTIN_OBJS += builtin/checkout.o
BUILTIN_OBJS += builtin/clean.o

Просмотреть файл

@ -123,6 +123,7 @@ int cmd_bugreport(int argc, const char **argv, const char *prefix);
int cmd_bundle(int argc, const char **argv, const char *prefix);
int cmd_cat_file(int argc, const char **argv, const char *prefix);
int cmd_checkout(int argc, const char **argv, const char *prefix);
int cmd_checkout__worker(int argc, const char **argv, const char *prefix);
int cmd_checkout_index(int argc, const char **argv, const char *prefix);
int cmd_check_attr(int argc, const char **argv, const char *prefix);
int cmd_check_ignore(int argc, const char **argv, const char *prefix);

145
builtin/checkout--worker.c Normal file
Просмотреть файл

@ -0,0 +1,145 @@
#include "builtin.h"
#include "config.h"
#include "entry.h"
#include "parallel-checkout.h"
#include "parse-options.h"
#include "pkt-line.h"
static void packet_to_pc_item(const char *buffer, int len,
struct parallel_checkout_item *pc_item)
{
const struct pc_item_fixed_portion *fixed_portion;
const char *variant;
char *encoding;
if (len < sizeof(struct pc_item_fixed_portion))
BUG("checkout worker received too short item (got %dB, exp %dB)",
len, (int)sizeof(struct pc_item_fixed_portion));
fixed_portion = (struct pc_item_fixed_portion *)buffer;
if (len - sizeof(struct pc_item_fixed_portion) !=
fixed_portion->name_len + fixed_portion->working_tree_encoding_len)
BUG("checkout worker received corrupted item");
variant = buffer + sizeof(struct pc_item_fixed_portion);
/*
* Note: the main process uses zero length to communicate that the
* encoding is NULL. There is no use case that requires sending an
* actual empty string, since convert_attrs() never sets
* ca.working_tree_enconding to "".
*/
if (fixed_portion->working_tree_encoding_len) {
encoding = xmemdupz(variant,
fixed_portion->working_tree_encoding_len);
variant += fixed_portion->working_tree_encoding_len;
} else {
encoding = NULL;
}
memset(pc_item, 0, sizeof(*pc_item));
pc_item->ce = make_empty_transient_cache_entry(fixed_portion->name_len);
pc_item->ce->ce_namelen = fixed_portion->name_len;
pc_item->ce->ce_mode = fixed_portion->ce_mode;
memcpy(pc_item->ce->name, variant, pc_item->ce->ce_namelen);
oidcpy(&pc_item->ce->oid, &fixed_portion->oid);
pc_item->id = fixed_portion->id;
pc_item->ca.crlf_action = fixed_portion->crlf_action;
pc_item->ca.ident = fixed_portion->ident;
pc_item->ca.working_tree_encoding = encoding;
}
static void report_result(struct parallel_checkout_item *pc_item)
{
struct pc_item_result res;
size_t size;
res.id = pc_item->id;
res.status = pc_item->status;
if (pc_item->status == PC_ITEM_WRITTEN) {
res.st = pc_item->st;
size = sizeof(res);
} else {
size = PC_ITEM_RESULT_BASE_SIZE;
}
packet_write(1, (const char *)&res, size);
}
/* Free the worker-side malloced data, but not pc_item itself. */
static void release_pc_item_data(struct parallel_checkout_item *pc_item)
{
free((char *)pc_item->ca.working_tree_encoding);
discard_cache_entry(pc_item->ce);
}
static void worker_loop(struct checkout *state)
{
struct parallel_checkout_item *items = NULL;
size_t i, nr = 0, alloc = 0;
while (1) {
int len = packet_read(0, NULL, NULL, packet_buffer,
sizeof(packet_buffer), 0);
if (len < 0)
BUG("packet_read() returned negative value");
else if (!len)
break;
ALLOC_GROW(items, nr + 1, alloc);
packet_to_pc_item(packet_buffer, len, &items[nr++]);
}
for (i = 0; i < nr; i++) {
struct parallel_checkout_item *pc_item = &items[i];
write_pc_item(pc_item, state);
report_result(pc_item);
release_pc_item_data(pc_item);
}
packet_flush(1);
free(items);
}
static const char * const checkout_worker_usage[] = {
N_("git checkout--worker [<options>]"),
NULL
};
int cmd_checkout__worker(int argc, const char **argv, const char *prefix)
{
struct checkout state = CHECKOUT_INIT;
struct option checkout_worker_options[] = {
OPT_STRING(0, "prefix", &state.base_dir, N_("string"),
N_("when creating files, prepend <string>")),
OPT_END()
};
if (argc == 2 && !strcmp(argv[1], "-h"))
usage_with_options(checkout_worker_usage,
checkout_worker_options);
git_config(git_default_config, NULL);
argc = parse_options(argc, argv, prefix, checkout_worker_options,
checkout_worker_usage, 0);
if (argc > 0)
usage_with_options(checkout_worker_usage, checkout_worker_options);
if (state.base_dir)
state.base_dir_len = strlen(state.base_dir);
/*
* Setting this on a worker won't actually update the index. We just
* need to tell the checkout machinery to lstat() the written entries,
* so that we can send this data back to the main process.
*/
state.refresh_cache = 1;
worker_loop(&state);
return 0;
}

17
entry.c
Просмотреть файл

@ -7,6 +7,7 @@
#include "progress.h"
#include "fsmonitor.h"
#include "entry.h"
#include "parallel-checkout.h"
static void create_directories(const char *path, int path_len,
const struct checkout *state)
@ -428,8 +429,17 @@ static void mark_colliding_entries(const struct checkout *state,
for (i = 0; i < state->istate->cache_nr; i++) {
struct cache_entry *dup = state->istate->cache[i];
if (dup == ce)
break;
if (dup == ce) {
/*
* Parallel checkout doesn't create the files in index
* order. So the other side of the collision may appear
* after the given cache_entry in the array.
*/
if (parallel_checkout_status() == PC_RUNNING)
continue;
else
break;
}
if (dup->ce_flags & (CE_MATCHED | CE_VALID | CE_SKIP_WORKTREE))
continue;
@ -538,6 +548,9 @@ int checkout_entry_ca(struct cache_entry *ce, struct conv_attrs *ca,
ca = &ca_buf;
}
if (!enqueue_checkout(ce, ca))
return 0;
return write_entry(ce, path.buf, ca, state, 0);
}

2
git.c
Просмотреть файл

@ -490,6 +490,8 @@ static struct cmd_struct commands[] = {
{ "check-mailmap", cmd_check_mailmap, RUN_SETUP },
{ "check-ref-format", cmd_check_ref_format, NO_PARSEOPT },
{ "checkout", cmd_checkout, RUN_SETUP | NEED_WORK_TREE },
{ "checkout--worker", cmd_checkout__worker,
RUN_SETUP | NEED_WORK_TREE | SUPPORT_SUPER_PREFIX },
{ "checkout-index", cmd_checkout_index,
RUN_SETUP | NEED_WORK_TREE},
{ "cherry", cmd_cherry, RUN_SETUP },

655
parallel-checkout.c Normal file
Просмотреть файл

@ -0,0 +1,655 @@
#include "cache.h"
#include "config.h"
#include "entry.h"
#include "parallel-checkout.h"
#include "pkt-line.h"
#include "progress.h"
#include "run-command.h"
#include "sigchain.h"
#include "streaming.h"
#include "thread-utils.h"
struct pc_worker {
struct child_process cp;
size_t next_item_to_complete, nr_items_to_complete;
};
struct parallel_checkout {
enum pc_status status;
struct parallel_checkout_item *items; /* The parallel checkout queue. */
size_t nr, alloc;
struct progress *progress;
unsigned int *progress_cnt;
};
static struct parallel_checkout parallel_checkout;
enum pc_status parallel_checkout_status(void)
{
return parallel_checkout.status;
}
static const int DEFAULT_THRESHOLD_FOR_PARALLELISM = 100;
static const int DEFAULT_NUM_WORKERS = 1;
void get_parallel_checkout_configs(int *num_workers, int *threshold)
{
if (git_config_get_int("checkout.workers", num_workers))
*num_workers = DEFAULT_NUM_WORKERS;
else if (*num_workers < 1)
*num_workers = online_cpus();
if (git_config_get_int("checkout.thresholdForParallelism", threshold))
*threshold = DEFAULT_THRESHOLD_FOR_PARALLELISM;
}
void init_parallel_checkout(void)
{
if (parallel_checkout.status != PC_UNINITIALIZED)
BUG("parallel checkout already initialized");
parallel_checkout.status = PC_ACCEPTING_ENTRIES;
}
static void finish_parallel_checkout(void)
{
if (parallel_checkout.status == PC_UNINITIALIZED)
BUG("cannot finish parallel checkout: not initialized yet");
free(parallel_checkout.items);
memset(&parallel_checkout, 0, sizeof(parallel_checkout));
}
static int is_eligible_for_parallel_checkout(const struct cache_entry *ce,
const struct conv_attrs *ca)
{
enum conv_attrs_classification c;
size_t packed_item_size;
/*
* Symlinks cannot be checked out in parallel as, in case of path
* collision, they could racily replace leading directories of other
* entries being checked out. Submodules are checked out in child
* processes, which have their own parallel checkout queues.
*/
if (!S_ISREG(ce->ce_mode))
return 0;
packed_item_size = sizeof(struct pc_item_fixed_portion) + ce->ce_namelen +
(ca->working_tree_encoding ? strlen(ca->working_tree_encoding) : 0);
/*
* The amount of data we send to the workers per checkout item is
* typically small (75~300B). So unless we find an insanely huge path
* of 64KB, we should never reach the 65KB limit of one pkt-line. If
* that does happen, we let the sequential code handle the item.
*/
if (packed_item_size > LARGE_PACKET_DATA_MAX)
return 0;
c = classify_conv_attrs(ca);
switch (c) {
case CA_CLASS_INCORE:
return 1;
case CA_CLASS_INCORE_FILTER:
/*
* It would be safe to allow concurrent instances of
* single-file smudge filters, like rot13, but we should not
* assume that all filters are parallel-process safe. So we
* don't allow this.
*/
return 0;
case CA_CLASS_INCORE_PROCESS:
/*
* The parallel queue and the delayed queue are not compatible,
* so they must be kept completely separated. And we can't tell
* if a long-running process will delay its response without
* actually asking it to perform the filtering. Therefore, this
* type of filter is not allowed in parallel checkout.
*
* Furthermore, there should only be one instance of the
* long-running process filter as we don't know how it is
* managing its own concurrency. So, spreading the entries that
* requisite such a filter among the parallel workers would
* require a lot more inter-process communication. We would
* probably have to designate a single process to interact with
* the filter and send all the necessary data to it, for each
* entry.
*/
return 0;
case CA_CLASS_STREAMABLE:
return 1;
default:
BUG("unsupported conv_attrs classification '%d'", c);
}
}
int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca)
{
struct parallel_checkout_item *pc_item;
if (parallel_checkout.status != PC_ACCEPTING_ENTRIES ||
!is_eligible_for_parallel_checkout(ce, ca))
return -1;
ALLOC_GROW(parallel_checkout.items, parallel_checkout.nr + 1,
parallel_checkout.alloc);
pc_item = &parallel_checkout.items[parallel_checkout.nr];
pc_item->ce = ce;
memcpy(&pc_item->ca, ca, sizeof(pc_item->ca));
pc_item->status = PC_ITEM_PENDING;
pc_item->id = parallel_checkout.nr;
parallel_checkout.nr++;
return 0;
}
size_t pc_queue_size(void)
{
return parallel_checkout.nr;
}
static void advance_progress_meter(void)
{
if (parallel_checkout.progress) {
(*parallel_checkout.progress_cnt)++;
display_progress(parallel_checkout.progress,
*parallel_checkout.progress_cnt);
}
}
static int handle_results(struct checkout *state)
{
int ret = 0;
size_t i;
int have_pending = 0;
/*
* We first update the successfully written entries with the collected
* stat() data, so that they can be found by mark_colliding_entries(),
* in the next loop, when necessary.
*/
for (i = 0; i < parallel_checkout.nr; i++) {
struct parallel_checkout_item *pc_item = &parallel_checkout.items[i];
if (pc_item->status == PC_ITEM_WRITTEN)
update_ce_after_write(state, pc_item->ce, &pc_item->st);
}
for (i = 0; i < parallel_checkout.nr; i++) {
struct parallel_checkout_item *pc_item = &parallel_checkout.items[i];
switch(pc_item->status) {
case PC_ITEM_WRITTEN:
/* Already handled */
break;
case PC_ITEM_COLLIDED:
/*
* The entry could not be checked out due to a path
* collision with another entry. Since there can only
* be one entry of each colliding group on the disk, we
* could skip trying to check out this one and move on.
* However, this would leave the unwritten entries with
* null stat() fields on the index, which could
* potentially slow down subsequent operations that
* require refreshing it: git would not be able to
* trust st_size and would have to go to the filesystem
* to see if the contents match (see ie_modified()).
*
* Instead, let's pay the overhead only once, now, and
* call checkout_entry_ca() again for this file, to
* have its stat() data stored in the index. This also
* has the benefit of adding this entry and its
* colliding pair to the collision report message.
* Additionally, this overwriting behavior is consistent
* with what the sequential checkout does, so it doesn't
* add any extra overhead.
*/
ret |= checkout_entry_ca(pc_item->ce, &pc_item->ca,
state, NULL, NULL);
advance_progress_meter();
break;
case PC_ITEM_PENDING:
have_pending = 1;
/* fall through */
case PC_ITEM_FAILED:
ret = -1;
break;
default:
BUG("unknown checkout item status in parallel checkout");
}
}
if (have_pending)
error("parallel checkout finished with pending entries");
return ret;
}
static int reset_fd(int fd, const char *path)
{
if (lseek(fd, 0, SEEK_SET) != 0)
return error_errno("failed to rewind descriptor of '%s'", path);
if (ftruncate(fd, 0))
return error_errno("failed to truncate file '%s'", path);
return 0;
}
static int write_pc_item_to_fd(struct parallel_checkout_item *pc_item, int fd,
const char *path)
{
int ret;
struct stream_filter *filter;
struct strbuf buf = STRBUF_INIT;
char *blob;
unsigned long size;
ssize_t wrote;
/* Sanity check */
assert(is_eligible_for_parallel_checkout(pc_item->ce, &pc_item->ca));
filter = get_stream_filter_ca(&pc_item->ca, &pc_item->ce->oid);
if (filter) {
if (stream_blob_to_fd(fd, &pc_item->ce->oid, filter, 1)) {
/* On error, reset fd to try writing without streaming */
if (reset_fd(fd, path))
return -1;
} else {
return 0;
}
}
blob = read_blob_entry(pc_item->ce, &size);
if (!blob)
return error("cannot read object %s '%s'",
oid_to_hex(&pc_item->ce->oid), pc_item->ce->name);
/*
* checkout metadata is used to give context for external process
* filters. Files requiring such filters are not eligible for parallel
* checkout, so pass NULL. Note: if that changes, the metadata must also
* be passed from the main process to the workers.
*/
ret = convert_to_working_tree_ca(&pc_item->ca, pc_item->ce->name,
blob, size, &buf, NULL);
if (ret) {
size_t newsize;
free(blob);
blob = strbuf_detach(&buf, &newsize);
size = newsize;
}
wrote = write_in_full(fd, blob, size);
free(blob);
if (wrote < 0)
return error("unable to write file '%s'", path);
return 0;
}
static int close_and_clear(int *fd)
{
int ret = 0;
if (*fd >= 0) {
ret = close(*fd);
*fd = -1;
}
return ret;
}
void write_pc_item(struct parallel_checkout_item *pc_item,
struct checkout *state)
{
unsigned int mode = (pc_item->ce->ce_mode & 0100) ? 0777 : 0666;
int fd = -1, fstat_done = 0;
struct strbuf path = STRBUF_INIT;
const char *dir_sep;
strbuf_add(&path, state->base_dir, state->base_dir_len);
strbuf_add(&path, pc_item->ce->name, pc_item->ce->ce_namelen);
dir_sep = find_last_dir_sep(path.buf);
/*
* The leading dirs should have been already created by now. But, in
* case of path collisions, one of the dirs could have been replaced by
* a symlink (checked out after we enqueued this entry for parallel
* checkout). Thus, we must check the leading dirs again.
*/
if (dir_sep && !has_dirs_only_path(path.buf, dir_sep - path.buf,
state->base_dir_len)) {
pc_item->status = PC_ITEM_COLLIDED;
goto out;
}
fd = open(path.buf, O_WRONLY | O_CREAT | O_EXCL, mode);
if (fd < 0) {
if (errno == EEXIST || errno == EISDIR) {
/*
* Errors which probably represent a path collision.
* Suppress the error message and mark the item to be
* retried later, sequentially. ENOTDIR and ENOENT are
* also interesting, but the above has_dirs_only_path()
* call should have already caught these cases.
*/
pc_item->status = PC_ITEM_COLLIDED;
} else {
error_errno("failed to open file '%s'", path.buf);
pc_item->status = PC_ITEM_FAILED;
}
goto out;
}
if (write_pc_item_to_fd(pc_item, fd, path.buf)) {
/* Error was already reported. */
pc_item->status = PC_ITEM_FAILED;
close_and_clear(&fd);
unlink(path.buf);
goto out;
}
fstat_done = fstat_checkout_output(fd, state, &pc_item->st);
if (close_and_clear(&fd)) {
error_errno("unable to close file '%s'", path.buf);
pc_item->status = PC_ITEM_FAILED;
goto out;
}
if (state->refresh_cache && !fstat_done && lstat(path.buf, &pc_item->st) < 0) {
error_errno("unable to stat just-written file '%s'", path.buf);
pc_item->status = PC_ITEM_FAILED;
goto out;
}
pc_item->status = PC_ITEM_WRITTEN;
out:
strbuf_release(&path);
}
static void send_one_item(int fd, struct parallel_checkout_item *pc_item)
{
size_t len_data;
char *data, *variant;
struct pc_item_fixed_portion *fixed_portion;
const char *working_tree_encoding = pc_item->ca.working_tree_encoding;
size_t name_len = pc_item->ce->ce_namelen;
size_t working_tree_encoding_len = working_tree_encoding ?
strlen(working_tree_encoding) : 0;
/*
* Any changes in the calculation of the message size must also be made
* in is_eligible_for_parallel_checkout().
*/
len_data = sizeof(struct pc_item_fixed_portion) + name_len +
working_tree_encoding_len;
data = xcalloc(1, len_data);
fixed_portion = (struct pc_item_fixed_portion *)data;
fixed_portion->id = pc_item->id;
fixed_portion->ce_mode = pc_item->ce->ce_mode;
fixed_portion->crlf_action = pc_item->ca.crlf_action;
fixed_portion->ident = pc_item->ca.ident;
fixed_portion->name_len = name_len;
fixed_portion->working_tree_encoding_len = working_tree_encoding_len;
/*
* We use hashcpy() instead of oidcpy() because the hash[] positions
* after `the_hash_algo->rawsz` might not be initialized. And Valgrind
* would complain about passing uninitialized bytes to a syscall
* (write(2)). There is no real harm in this case, but the warning could
* hinder the detection of actual errors.
*/
hashcpy(fixed_portion->oid.hash, pc_item->ce->oid.hash);
variant = data + sizeof(*fixed_portion);
if (working_tree_encoding_len) {
memcpy(variant, working_tree_encoding, working_tree_encoding_len);
variant += working_tree_encoding_len;
}
memcpy(variant, pc_item->ce->name, name_len);
packet_write(fd, data, len_data);
free(data);
}
static void send_batch(int fd, size_t start, size_t nr)
{
size_t i;
sigchain_push(SIGPIPE, SIG_IGN);
for (i = 0; i < nr; i++)
send_one_item(fd, &parallel_checkout.items[start + i]);
packet_flush(fd);
sigchain_pop(SIGPIPE);
}
static struct pc_worker *setup_workers(struct checkout *state, int num_workers)
{
struct pc_worker *workers;
int i, workers_with_one_extra_item;
size_t base_batch_size, batch_beginning = 0;
ALLOC_ARRAY(workers, num_workers);
for (i = 0; i < num_workers; i++) {
struct child_process *cp = &workers[i].cp;
child_process_init(cp);
cp->git_cmd = 1;
cp->in = -1;
cp->out = -1;
cp->clean_on_exit = 1;
strvec_push(&cp->args, "checkout--worker");
if (state->base_dir_len)
strvec_pushf(&cp->args, "--prefix=%s", state->base_dir);
if (start_command(cp))
die("failed to spawn checkout worker");
}
base_batch_size = parallel_checkout.nr / num_workers;
workers_with_one_extra_item = parallel_checkout.nr % num_workers;
for (i = 0; i < num_workers; i++) {
struct pc_worker *worker = &workers[i];
size_t batch_size = base_batch_size;
/* distribute the extra work evenly */
if (i < workers_with_one_extra_item)
batch_size++;
send_batch(worker->cp.in, batch_beginning, batch_size);
worker->next_item_to_complete = batch_beginning;
worker->nr_items_to_complete = batch_size;
batch_beginning += batch_size;
}
return workers;
}
static void finish_workers(struct pc_worker *workers, int num_workers)
{
int i;
/*
* Close pipes before calling finish_command() to let the workers
* exit asynchronously and avoid spending extra time on wait().
*/
for (i = 0; i < num_workers; i++) {
struct child_process *cp = &workers[i].cp;
if (cp->in >= 0)
close(cp->in);
if (cp->out >= 0)
close(cp->out);
}
for (i = 0; i < num_workers; i++) {
int rc = finish_command(&workers[i].cp);
if (rc > 128) {
/*
* For a normal non-zero exit, the worker should have
* already printed something useful to stderr. But a
* death by signal should be mentioned to the user.
*/
error("checkout worker %d died of signal %d", i, rc - 128);
}
}
free(workers);
}
static inline void assert_pc_item_result_size(int got, int exp)
{
if (got != exp)
BUG("wrong result size from checkout worker (got %dB, exp %dB)",
got, exp);
}
static void parse_and_save_result(const char *buffer, int len,
struct pc_worker *worker)
{
struct pc_item_result *res;
struct parallel_checkout_item *pc_item;
struct stat *st = NULL;
if (len < PC_ITEM_RESULT_BASE_SIZE)
BUG("too short result from checkout worker (got %dB, exp >=%dB)",
len, (int)PC_ITEM_RESULT_BASE_SIZE);
res = (struct pc_item_result *)buffer;
/*
* Worker should send either the full result struct on success, or
* just the base (i.e. no stat data), otherwise.
*/
if (res->status == PC_ITEM_WRITTEN) {
assert_pc_item_result_size(len, (int)sizeof(struct pc_item_result));
st = &res->st;
} else {
assert_pc_item_result_size(len, (int)PC_ITEM_RESULT_BASE_SIZE);
}
if (!worker->nr_items_to_complete)
BUG("received result from supposedly finished checkout worker");
if (res->id != worker->next_item_to_complete)
BUG("unexpected item id from checkout worker (got %"PRIuMAX", exp %"PRIuMAX")",
(uintmax_t)res->id, (uintmax_t)worker->next_item_to_complete);
worker->next_item_to_complete++;
worker->nr_items_to_complete--;
pc_item = &parallel_checkout.items[res->id];
pc_item->status = res->status;
if (st)
pc_item->st = *st;
if (res->status != PC_ITEM_COLLIDED)
advance_progress_meter();
}
static void gather_results_from_workers(struct pc_worker *workers,
int num_workers)
{
int i, active_workers = num_workers;
struct pollfd *pfds;
CALLOC_ARRAY(pfds, num_workers);
for (i = 0; i < num_workers; i++) {
pfds[i].fd = workers[i].cp.out;
pfds[i].events = POLLIN;
}
while (active_workers) {
int nr = poll(pfds, num_workers, -1);
if (nr < 0) {
if (errno == EINTR)
continue;
die_errno("failed to poll checkout workers");
}
for (i = 0; i < num_workers && nr > 0; i++) {
struct pc_worker *worker = &workers[i];
struct pollfd *pfd = &pfds[i];
if (!pfd->revents)
continue;
if (pfd->revents & POLLIN) {
int len = packet_read(pfd->fd, NULL, NULL,
packet_buffer,
sizeof(packet_buffer), 0);
if (len < 0) {
BUG("packet_read() returned negative value");
} else if (!len) {
pfd->fd = -1;
active_workers--;
} else {
parse_and_save_result(packet_buffer,
len, worker);
}
} else if (pfd->revents & POLLHUP) {
pfd->fd = -1;
active_workers--;
} else if (pfd->revents & (POLLNVAL | POLLERR)) {
die("error polling from checkout worker");
}
nr--;
}
}
free(pfds);
}
static void write_items_sequentially(struct checkout *state)
{
size_t i;
for (i = 0; i < parallel_checkout.nr; i++) {
struct parallel_checkout_item *pc_item = &parallel_checkout.items[i];
write_pc_item(pc_item, state);
if (pc_item->status != PC_ITEM_COLLIDED)
advance_progress_meter();
}
}
int run_parallel_checkout(struct checkout *state, int num_workers, int threshold,
struct progress *progress, unsigned int *progress_cnt)
{
int ret;
if (parallel_checkout.status != PC_ACCEPTING_ENTRIES)
BUG("cannot run parallel checkout: uninitialized or already running");
parallel_checkout.status = PC_RUNNING;
parallel_checkout.progress = progress;
parallel_checkout.progress_cnt = progress_cnt;
if (parallel_checkout.nr < num_workers)
num_workers = parallel_checkout.nr;
if (num_workers <= 1 || parallel_checkout.nr < threshold) {
write_items_sequentially(state);
} else {
struct pc_worker *workers = setup_workers(state, num_workers);
gather_results_from_workers(workers, num_workers);
finish_workers(workers, num_workers);
}
ret = handle_results(state);
finish_parallel_checkout();
return ret;
}

111
parallel-checkout.h Normal file
Просмотреть файл

@ -0,0 +1,111 @@
#ifndef PARALLEL_CHECKOUT_H
#define PARALLEL_CHECKOUT_H
#include "convert.h"
struct cache_entry;
struct checkout;
struct progress;
/****************************************************************
* Users of parallel checkout
****************************************************************/
enum pc_status {
PC_UNINITIALIZED = 0,
PC_ACCEPTING_ENTRIES,
PC_RUNNING,
};
enum pc_status parallel_checkout_status(void);
void get_parallel_checkout_configs(int *num_workers, int *threshold);
/*
* Put parallel checkout into the PC_ACCEPTING_ENTRIES state. Should be used
* only when in the PC_UNINITIALIZED state.
*/
void init_parallel_checkout(void);
/*
* Return -1 if parallel checkout is currently not accepting entries or if the
* entry is not eligible for parallel checkout. Otherwise, enqueue the entry
* for later write and return 0.
*/
int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca);
size_t pc_queue_size(void);
/*
* Write all the queued entries, returning 0 on success. If the number of
* entries is smaller than the specified threshold, the operation is performed
* sequentially.
*/
int run_parallel_checkout(struct checkout *state, int num_workers, int threshold,
struct progress *progress, unsigned int *progress_cnt);
/****************************************************************
* Interface with checkout--worker
****************************************************************/
enum pc_item_status {
PC_ITEM_PENDING = 0,
PC_ITEM_WRITTEN,
/*
* The entry could not be written because there was another file
* already present in its path or leading directories. Since
* checkout_entry_ca() removes such files from the working tree before
* enqueueing the entry for parallel checkout, it means that there was
* a path collision among the entries being written.
*/
PC_ITEM_COLLIDED,
PC_ITEM_FAILED,
};
struct parallel_checkout_item {
/*
* In main process ce points to a istate->cache[] entry. Thus, it's not
* owned by us. In workers they own the memory, which *must be* released.
*/
struct cache_entry *ce;
struct conv_attrs ca;
size_t id; /* position in parallel_checkout.items[] of main process */
/* Output fields, sent from workers. */
enum pc_item_status status;
struct stat st;
};
/*
* The fixed-size portion of `struct parallel_checkout_item` that is sent to the
* workers. Following this will be 2 strings: ca.working_tree_encoding and
* ce.name; These are NOT null terminated, since we have the size in the fixed
* portion.
*
* Note that not all fields of conv_attrs and cache_entry are passed, only the
* ones that will be required by the workers to smudge and write the entry.
*/
struct pc_item_fixed_portion {
size_t id;
struct object_id oid;
unsigned int ce_mode;
enum convert_crlf_action crlf_action;
int ident;
size_t working_tree_encoding_len;
size_t name_len;
};
/*
* The fields of `struct parallel_checkout_item` that are returned by the
* workers. Note: `st` must be the last one, as it is omitted on error.
*/
struct pc_item_result {
size_t id;
enum pc_item_status status;
struct stat st;
};
#define PC_ITEM_RESULT_BASE_SIZE offsetof(struct pc_item_result, st)
void write_pc_item(struct parallel_checkout_item *pc_item,
struct checkout *state);
#endif /* PARALLEL_CHECKOUT_H */

Просмотреть файл

@ -17,6 +17,7 @@
#include "object-store.h"
#include "promisor-remote.h"
#include "entry.h"
#include "parallel-checkout.h"
/*
* Error messages expected by scripts out of plumbing commands such as
@ -398,7 +399,7 @@ static int check_updates(struct unpack_trees_options *o,
int errs = 0;
struct progress *progress;
struct checkout state = CHECKOUT_INIT;
int i;
int i, pc_workers, pc_threshold;
trace_performance_enter();
state.force = 1;
@ -441,7 +442,6 @@ static int check_updates(struct unpack_trees_options *o,
if (should_update_submodules())
load_gitmodules_file(index, &state);
enable_delayed_checkout(&state);
if (has_promisor_remote()) {
/*
* Prefetch the objects that are to be checked out in the loop
@ -464,18 +464,31 @@ static int check_updates(struct unpack_trees_options *o,
to_fetch.oid, to_fetch.nr);
oid_array_clear(&to_fetch);
}
get_parallel_checkout_configs(&pc_workers, &pc_threshold);
enable_delayed_checkout(&state);
if (pc_workers > 1)
init_parallel_checkout();
for (i = 0; i < index->cache_nr; i++) {
struct cache_entry *ce = index->cache[i];
if (ce->ce_flags & CE_UPDATE) {
size_t last_pc_queue_size = pc_queue_size();
if (ce->ce_flags & CE_WT_REMOVE)
BUG("both update and delete flags are set on %s",
ce->name);
display_progress(progress, ++cnt);
ce->ce_flags &= ~CE_UPDATE;
errs |= checkout_entry(ce, &state, NULL, NULL);
if (last_pc_queue_size == pc_queue_size())
display_progress(progress, ++cnt);
}
}
if (pc_workers > 1)
errs |= run_parallel_checkout(&state, pc_workers, pc_threshold,
progress, &cnt);
stop_progress(&progress);
errs |= finish_delayed_checkout(&state, NULL);
git_attr_set_direction(GIT_ATTR_CHECKIN);