Use threads instead of processes in Dataset.summaries
Dataset.summaries uses a concurrent.futures.ProcessPoolExecutor to fetch multiple files from S3 at once. ProcessPoolExecutor uses multiprocessing underneath, which defaults to using fork() on Unix. Using fork() is dangerous and prone to deadlocks: https://codewithoutrules.com/2018/09/04/python-multiprocessing/ This is a possible source of observed deadlocks during calls to Dataset.records. Using threads should not be a performance regression since the operation we're parallelizing over is network-bound, not CPU-bound, so there should not be much contention for the GIL.
This commit is contained in:
Родитель
fb68074459
Коммит
2f030ed5bc
|
@ -137,7 +137,7 @@ class Dataset:
|
||||||
datasets
|
datasets
|
||||||
:param prefix: a prefix to the
|
:param prefix: a prefix to the
|
||||||
:param clauses: mapping of fields -> callables to refine the dataset
|
:param clauses: mapping of fields -> callables to refine the dataset
|
||||||
:param max_concurrency: number of processes to spawn when collecting S3 summaries,
|
:param max_concurrency: number of threads to spawn when collecting S3 summaries,
|
||||||
defaults to 1.5 * cpu_count
|
defaults to 1.5 * cpu_count
|
||||||
"""
|
"""
|
||||||
self.bucket = bucket
|
self.bucket = bucket
|
||||||
|
@ -283,7 +283,7 @@ class Dataset:
|
||||||
# on the prefix directory)
|
# on the prefix directory)
|
||||||
clauses['prefix'] = lambda x: True
|
clauses['prefix'] = lambda x: True
|
||||||
|
|
||||||
with futures.ProcessPoolExecutor(self.max_concurrency) as executor:
|
with futures.ThreadPoolExecutor(self.max_concurrency) as executor:
|
||||||
scanned = self._scan(schema, [self.prefix], clauses, executor)
|
scanned = self._scan(schema, [self.prefix], clauses, executor)
|
||||||
keys = sc.parallelize(scanned).flatMap(self.store.list_keys)
|
keys = sc.parallelize(scanned).flatMap(self.store.list_keys)
|
||||||
return keys.take(limit) if limit else keys.collect()
|
return keys.take(limit) if limit else keys.collect()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче