mapply package

Top-level package containing init to patch Pandas.

Example usage:

import pandas as pd
import mapply

mapply.init(
    n_workers=-1,
    chunk_size=100,
    max_chunks_per_worker=10,
    progressbar=False
)

df = pd.DataFrame({"A": list(range(100))})

df["squared"] = df.A.mapply(lambda x: x ** 2)
mapply.init(*, n_workers=-1, chunk_size=100, max_chunks_per_worker=8, progressbar=True, apply_name='mapply')[source]

Patch Pandas, adding multi-core methods to PandasObject.

Subsequent calls to this function will create/overwrite methods with new settings.

Parameters:
  • n_workers (int) – Maximum amount of workers (processes) to spawn. Might be lowered depending on chunk_size and max_chunks_per_worker. Will throw a warning if set higher than is sensible (see mapply.parallel.sensible_cpu_count()).

  • chunk_size (int) – Minimum amount of columns/rows per chunk. Higher value means a higher threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.

  • max_chunks_per_worker (int) – Upper limit on amount of chunks per worker. Will lower n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.

  • progressbar (bool) – Whether to wrap the chunks in a tqdm.auto.tqdm().

  • apply_name (str) – Method name for the patched apply function.

Submodules

mapply.mapply module

Submodule containing code to run PandasObject.apply() in parallel.

Standalone usage (without init):

import pandas as pd
from mapply.mapply import mapply

df = pd.DataFrame({"A": list(range(100))})

df["squared"] = mapply(df.A, lambda x: x ** 2, progressbar=False)
mapply.mapply.mapply(df_or_series, func, axis=0, *, n_workers=-1, chunk_size=100, max_chunks_per_worker=8, progressbar=True, args=(), **kwargs)[source]

Run apply on n_workers. Split in chunks if sensible, gather results, and concat.

When using mapply.init(), the signature of this method will behave the same as pandas.DataFrame.apply()/pandas.Series.apply()/pandas.core.groupby.GroupBy.apply().

Parameters:
  • df_or_series (Any) – Argument reserved to the class instance, a.k.a. ‘self’.

  • func (Callable) – func to apply to each column or row.

  • axis (int | str) – Axis along which func is applied.

  • n_workers (int) – Maximum amount of workers (processes) to spawn. Might be lowered depending on chunk_size and max_chunks_per_worker. Will throw a warning if set higher than is sensible (see mapply.parallel.sensible_cpu_count()).

  • chunk_size (int) – Minimum amount of columns/rows per chunk. Higher value means a higher threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.

  • max_chunks_per_worker (int) – Upper limit on amount of chunks per worker. Will lower n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.

  • progressbar (bool) – Whether to wrap the chunks in a tqdm.auto.tqdm().

  • args (tuple[Any, ...]) – Additional positional arguments to pass to func.

  • **kwargs (Any) – Additional keyword arguments to pass to apply/func.

Returns:

Series or DataFrame resulting from applying func along given axis.

Raises:

ValueError – if a Series is passed in combination with axis=1

Return type:

Any

mapply.parallel module

Submodule containing code to distribute computation over multiple processes using pathos.multiprocessing.ProcessPool.

Standalone usage:

from mapply.parallel import multiprocessing_imap

def some_heavy_computation(x, power):
    return pow(x, power)

multicore_list = list(
    multiprocessing_imap(
        some_heavy_computation,
        range(100),
        power=2.5,
        progressbar=False,
        n_workers=-1,
    )
)
mapply.parallel.multiprocessing_imap(func, iterable, *, n_workers=-1, progressbar=True, args=(), **kwargs)[source]

Execute func on each element in iterable on n_workers, ensuring order.

Parameters:
  • func (Callable) – Function to apply to each element in iterable.

  • iterable (Iterable[Any]) – Input iterable on which to execute func.

  • n_workers (int) – Amount of workers (processes) to spawn.

  • progressbar (bool) – Whether to wrap the chunks in a tqdm.auto.tqdm.

  • args (tuple[Any, ...]) – Additional positional arguments to pass to func.

  • **kwargs (Any) – Additional keyword arguments to pass to func.

Yields:

Results in same order as input iterable.

Raises:
  • Exception – Any error occurred during computation (will terminate the pool early).

  • KeyboardInterrupt – Any KeyboardInterrupt sent by the user (will terminate the pool early).

Return type:

Iterator[Any]

mapply.parallel.sensible_cpu_count()[source]

Count amount of physical CPUs (+1 on hyperthreading systems to prioritize the workers over e.g. system processes).

Return type:

int