Skip to contents

This function processes a tibble (or data frame) in parallel using the future package along with progressr for progress reporting. The data is split into chunks via a helper function SplitData. Depending on the by.rows flag, each chunk is processed either row-by-row (using lapply) or as an entire chunk. In either case, the processed results are bound into a single tibble.

Usage

ProcessData(
  data,
  func,
  by.rows = TRUE,
  min.multisession = 1,
  n.workers = NULL,
  limit = 100,
  use.multisession = TRUE,
  n.message = 1,
  start.message = NULL,
  process.message = NULL,
  end.message = NULL,
  restore.defaults = TRUE,
  handler = NULL,
  silent = FALSE,
  log = list(),
  ...
)

Arguments

data

Data to be processed.

func

A function that processes either a single row (if by.rows = TRUE) or an entire chunk (if by.rows = FALSE). Additional arguments are passed to func via ....

by.rows

Logical. If TRUE, func is applied to each row individually within each chunk; if FALSE, func is applied to the entire chunk. Default is TRUE.

min.multisession

Integer. The minimum number of rows in data required to use multisession (parallel execution). If nrow(data) is less than this value, parallel processing is disabled. Default is 1.

n.workers

Integer. The number of workers to use for parallel processing. Defaults to max(1, future::availableCores() - 1).

limit

Integer. If (nrow(data) / limit) is less than or equal to n.workers, the data is split into n.workers; otherwise, it is split using limit. Default is 100.

use.multisession

Logical. If TRUE (default) parallel processing is used; otherwise, sequential processing is employed.

n.message

Integer. Display a process message every nth iteration. Defaults to 1. Turn off messages with 0.

start.message

Optional character string. A custom message to log at the start of processing.

process.message

Optional column name. A custom message to log during processing.

end.message

Optional character string. A custom message to log at the end of processing.

restore.defaults

Logical. If TRUE (default), the original future plan and progress handlers are restored after processing.

handler

Character string. The progress handler to be used by the progressr package. Defaults to "txtprogressbar" unless silent is TRUE.

silent

Logical. If TRUE, progress messages and logging are suppressed. Default is FALSE.

log

List. An initial log (default is an empty list) to which log messages are appended.

...

Additional arguments passed to the processing function func.

Value

A list with two components:

results

A tibble resulting from binding all processed rows from each chunk.

log

A list of log messages generated during processing.

Details

The function first checks if the data has enough rows to warrant parallel processing. It then determines the number of workers and chunks and splits the data using the helper function SplitData. The processing is executed with future.apply::future_lapply in parallel. If by.rows is TRUE, each row within a chunk is processed individually and then bound together using dplyr::bind_rows; if by.rows is FALSE, the entire chunk is processed at once. Finally, all chunk results are combined into a single tibble.

Examples

if (FALSE) { # \dontrun{
example.result <- ProcessData(
  data =  tibble::tibble(
    key = LETTERS[1:3],
    value = seq(3)
  ),
  func = \(data) {
    data$value <- data$value * 10
    data
  },
  by.rows = FALSE,
  min.multisession = 10,
  n.workers = 4,
  limit = 100,
  use.multisession = TRUE,
  start.message = "Starting example data processing",
  end.message   = "Finished example data processing",
  handler = "txtprogressbar",
  silent = FALSE
)
# The result is a single tibble.
processed.tibble <- example.result$results
process.log <- example.result$log
if (any(nrow(processed.tibble))) print(processed.tibble)
} # }