Process data with Parallel Execution and Row/Chunk Processing
Source:R/process_data.R
ProcessData.RdThis function processes a tibble (or data frame) in parallel using the future package
along with progressr for progress reporting. The data is split into chunks via a helper function
SplitData. Depending on the by.rows flag, each chunk is processed either row-by-row
(using lapply) or as an entire chunk. In either case, the processed results are bound into a single tibble.
Usage
ProcessData(
data,
func,
by.rows = TRUE,
min.multisession = 1,
n.workers = NULL,
limit = 100,
use.multisession = TRUE,
n.message = 1,
start.message = NULL,
process.message = NULL,
end.message = NULL,
restore.defaults = TRUE,
handler = NULL,
silent = FALSE,
log = list(),
...
)Arguments
- data
Data to be processed.
- func
A function that processes either a single row (if
by.rows = TRUE) or an entire chunk (ifby.rows = FALSE). Additional arguments are passed tofuncvia....- by.rows
Logical. If
TRUE,funcis applied to each row individually within each chunk; ifFALSE,funcis applied to the entire chunk. Default isTRUE.- min.multisession
Integer. The minimum number of rows in
datarequired to use multisession (parallel execution). Ifnrow(data)is less than this value, parallel processing is disabled. Default is1.- n.workers
Integer. The number of workers to use for parallel processing. Defaults to
max(1, future::availableCores() - 1).- limit
Integer. If
(nrow(data) / limit)is less than or equal ton.workers, the data is split inton.workers; otherwise, it is split usinglimit. Default is100.- use.multisession
Logical. If
TRUE(default) parallel processing is used; otherwise, sequential processing is employed.- n.message
Integer. Display a process message every nth iteration. Defaults to
1. Turn off messages with 0.- start.message
Optional character string. A custom message to log at the start of processing.
- process.message
Optional column name. A custom message to log during processing.
- end.message
Optional character string. A custom message to log at the end of processing.
- restore.defaults
Logical. If
TRUE(default), the original future plan and progress handlers are restored after processing.- handler
Character string. The progress handler to be used by the
progressrpackage. Defaults to"txtprogressbar"unlesssilentisTRUE.- silent
Logical. If
TRUE, progress messages and logging are suppressed. Default isFALSE.- log
List. An initial log (default is an empty list) to which log messages are appended.
- ...
Additional arguments passed to the processing function
func.
Value
A list with two components:
- results
A tibble resulting from binding all processed rows from each chunk.
- log
A list of log messages generated during processing.
Details
The function first checks if the data has enough rows to warrant parallel processing.
It then determines the number of workers and chunks and splits the data using the helper function
SplitData. The processing is executed with future.apply::future_lapply in parallel. If by.rows is TRUE,
each row within a chunk is processed individually and then bound together using dplyr::bind_rows;
if by.rows is FALSE, the entire chunk is processed at once. Finally, all chunk results are combined
into a single tibble.
Examples
if (FALSE) { # \dontrun{
example.result <- ProcessData(
data = tibble::tibble(
key = LETTERS[1:3],
value = seq(3)
),
func = \(data) {
data$value <- data$value * 10
data
},
by.rows = FALSE,
min.multisession = 10,
n.workers = 4,
limit = 100,
use.multisession = TRUE,
start.message = "Starting example data processing",
end.message = "Finished example data processing",
handler = "txtprogressbar",
silent = FALSE
)
# The result is a single tibble.
processed.tibble <- example.result$results
process.log <- example.result$log
if (any(nrow(processed.tibble))) print(processed.tibble)
} # }