Process data with Parallel Execution and Row/Chunk Processing
Source:R/process_data.R
ProcessData.Rd
This function processes a tibble (or data frame) in parallel using the future
package
along with progressr
for progress reporting. The data is split into chunks via a helper function
SplitData
. Depending on the by.rows
flag, each chunk is processed either row-by-row
(using lapply
) or as an entire chunk. In either case, the processed results are bound into a single tibble.
Usage
ProcessData(
data,
func,
by.rows = TRUE,
min.multisession = 1,
n.workers = NULL,
limit = 100,
use.multisession = TRUE,
n.message = 1,
start.message = NULL,
process.message = NULL,
end.message = NULL,
restore.defaults = TRUE,
handler = NULL,
silent = FALSE,
log = list(),
...
)
Arguments
- data
Data to be processed.
- func
A function that processes either a single row (if
by.rows = TRUE
) or an entire chunk (ifby.rows = FALSE
). Additional arguments are passed tofunc
via...
.- by.rows
Logical. If
TRUE
,func
is applied to each row individually within each chunk; ifFALSE
,func
is applied to the entire chunk. Default isTRUE
.- min.multisession
Integer. The minimum number of rows in
data
required to use multisession (parallel execution). Ifnrow(data)
is less than this value, parallel processing is disabled. Default is1
.- n.workers
Integer. The number of workers to use for parallel processing. Defaults to
max(1, future::availableCores() - 1)
.- limit
Integer. If
(nrow(data) / limit)
is less than or equal ton.workers
, the data is split inton.workers
; otherwise, it is split usinglimit
. Default is100
.- use.multisession
Logical. If
TRUE
(default) parallel processing is used; otherwise, sequential processing is employed.- n.message
Integer. Display a process message every nth iteration. Defaults to
1
. Turn off messages with 0.- start.message
Optional character string. A custom message to log at the start of processing.
- process.message
Optional column name. A custom message to log during processing.
- end.message
Optional character string. A custom message to log at the end of processing.
- restore.defaults
Logical. If
TRUE
(default), the original future plan and progress handlers are restored after processing.- handler
Character string. The progress handler to be used by the
progressr
package. Defaults to"txtprogressbar"
unlesssilent
isTRUE
.- silent
Logical. If
TRUE
, progress messages and logging are suppressed. Default isFALSE
.- log
List. An initial log (default is an empty list) to which log messages are appended.
- ...
Additional arguments passed to the processing function
func
.
Value
A list with two components:
- results
A tibble resulting from binding all processed rows from each chunk.
- log
A list of log messages generated during processing.
Details
The function first checks if the data has enough rows to warrant parallel processing.
It then determines the number of workers and chunks and splits the data using the helper function
SplitData
. The processing is executed with future.apply::future_lapply
in parallel. If by.rows
is TRUE
,
each row within a chunk is processed individually and then bound together using dplyr::bind_rows
;
if by.rows
is FALSE
, the entire chunk is processed at once. Finally, all chunk results are combined
into a single tibble.
Examples
if (FALSE) { # \dontrun{
example.result <- ProcessData(
data = tibble::tibble(
key = LETTERS[1:3],
value = seq(3)
),
func = \(data) {
data$value <- data$value * 10
data
},
by.rows = FALSE,
min.multisession = 10,
n.workers = 4,
limit = 100,
use.multisession = TRUE,
start.message = "Starting example data processing",
end.message = "Finished example data processing",
handler = "txtprogressbar",
silent = FALSE
)
# The result is a single tibble.
processed.tibble <- example.result$results
process.log <- example.result$log
if (any(nrow(processed.tibble))) print(processed.tibble)
} # }