mapply package
Top-level package containing init to patch Pandas.
Example usage:
import pandas as pd
import mapply
mapply.init(
n_workers=-1,
chunk_size=100,
max_chunks_per_worker=10,
progressbar=False
)
df = pd.DataFrame({"A": list(range(100))})
df["squared"] = df.A.mapply(lambda x: x ** 2)
- mapply.init(*, n_workers=-1, chunk_size=100, max_chunks_per_worker=8, progressbar=True, apply_name='mapply')[source]
Patch Pandas, adding multi-core methods to PandasObject.
Subsequent calls to this function will create/overwrite methods with new settings.
- Parameters:
n_workers (int) – Maximum amount of workers (processes) to spawn. Might be lowered depending on chunk_size and max_chunks_per_worker. Will throw a warning if set higher than is sensible (see
mapply.parallel.sensible_cpu_count()
).chunk_size (int) – Minimum amount of columns/rows per chunk. Higher value means a higher threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.
max_chunks_per_worker (int) – Upper limit on amount of chunks per worker. Will lower n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar (bool) – Whether to wrap the chunks in a
tqdm.auto.tqdm()
.apply_name (str) – Method name for the patched apply function.
Submodules
mapply.mapply module
Submodule containing code to run PandasObject.apply() in parallel.
Standalone usage (without init):
import pandas as pd
from mapply.mapply import mapply
df = pd.DataFrame({"A": list(range(100))})
df["squared"] = mapply(df.A, lambda x: x ** 2, progressbar=False)
- mapply.mapply.mapply(df_or_series, func, axis=0, *, n_workers=-1, chunk_size=100, max_chunks_per_worker=8, progressbar=True, args=(), **kwargs)[source]
Run apply on n_workers. Split in chunks if sensible, gather results, and concat.
When using
mapply.init()
, the signature of this method will behave the same aspandas.DataFrame.apply()
/pandas.Series.apply()
/pandas.core.groupby.GroupBy.apply()
.- Parameters:
df_or_series (Any) – Argument reserved to the class instance, a.k.a. ‘self’.
func (Callable) – func to apply to each column or row.
n_workers (int) – Maximum amount of workers (processes) to spawn. Might be lowered depending on chunk_size and max_chunks_per_worker. Will throw a warning if set higher than is sensible (see
mapply.parallel.sensible_cpu_count()
).chunk_size (int) – Minimum amount of columns/rows per chunk. Higher value means a higher threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.
max_chunks_per_worker (int) – Upper limit on amount of chunks per worker. Will lower n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar (bool) – Whether to wrap the chunks in a
tqdm.auto.tqdm()
.args (tuple[Any, ...]) – Additional positional arguments to pass to func.
**kwargs (Any) – Additional keyword arguments to pass to apply/func.
- Returns:
Series or DataFrame resulting from applying func along given axis.
- Raises:
ValueError – if a Series is passed in combination with axis=1
- Return type:
mapply.parallel module
Submodule containing code to distribute computation over multiple processes using pathos.multiprocessing.ProcessPool
.
Standalone usage:
from mapply.parallel import multiprocessing_imap
def some_heavy_computation(x, power):
return pow(x, power)
multicore_list = list(
multiprocessing_imap(
some_heavy_computation,
range(100),
power=2.5,
progressbar=False,
n_workers=-1,
)
)
- mapply.parallel.multiprocessing_imap(func, iterable, *, n_workers=-1, progressbar=True, args=(), **kwargs)[source]
Execute func on each element in iterable on n_workers, ensuring order.
- Parameters:
func (Callable) – Function to apply to each element in iterable.
iterable (Iterable[Any]) – Input iterable on which to execute func.
n_workers (int) – Amount of workers (processes) to spawn.
progressbar (bool) – Whether to wrap the chunks in a tqdm.auto.tqdm.
args (tuple[Any, ...]) – Additional positional arguments to pass to func.
**kwargs (Any) – Additional keyword arguments to pass to func.
- Yields:
Results in same order as input iterable.
- Raises:
Exception – Any error occurred during computation (will terminate the pool early).
KeyboardInterrupt – Any KeyboardInterrupt sent by the user (will terminate the pool early).
- Return type: