📚 今日目標
- 掌握代碼性能分析和優化方法
- 學習並行計算和分佈式計算
- 掌握內存管理和優化技巧
- 學習C++集成(Rcpp)
- 實踐高性能數據處理
⚡ 第一部分:代碼性能分析
1.1 基準測試工具
# 加載必要的包
install.packages(c("microbenchmark", "bench", "profvis", "tictoc"))
library(microbenchmark)
library(bench)
library(profvis)
library(tictoc)
# 1. 使用system.time()進行簡單計時
simple_function <- function(n) {
sum <- 0
for (i in 1:n) {
sum <- sum + i
}
return(sum)
}
# 計時執行
system.time({
result <- simple_function(1e6)
})
cat("結果:", result, "\n")
# 2. 使用tictoc包進行計時
tic("循環求和")
result <- simple_function(1e6)
toc()
# 3. 使用microbenchmark進行精確基準測試
microbenchmark_result <- microbenchmark(
# 方法1: 循環
循環 = {
sum <- 0
for (i in 1:10000) {
sum <- sum + i
}
sum
},
# 方法2: 向量化
向量化 = sum(1:10000),
# 方法3: 公式
公式 = 10000 * (10000 + 1) / 2,
times = 100, # 每個方法運行100次
unit = "ms" # 時間單位:毫秒
)
print(microbenchmark_result)
# 可視化結果
boxplot(microbenchmark_result,
main = "不同求和方法性能比較",
xlab = "方法",
ylab = "時間 (毫秒)")
# 4. 使用bench包進行更詳細的基準測試
bench_result <- bench::mark(
循環 = {
sum <- 0
for (i in 1:10000) {
sum <- sum + i
}
sum
},
向量化 = sum(1:10000),
公式 = 10000 * (10000 + 1) / 2,
iterations = 100,
check = FALSE
)
print(bench_result)
# 可視化bench結果
plot(bench_result)
# 5. 內存使用分析
memory_test <- bench::mark(
list = as.list(1:10000),
vector = 1:10000,
iterations = 10
)
print(memory_test)
# 提取內存使用信息
memory_summary <- data.frame(
方法 = c("列表", "向量"),
內存 = c(
as.numeric(memory_test$mem_alloc[[1]]) / 1024, # KB
as.numeric(memory_test$mem_alloc[[2]]) / 1024
)
)
print(memory_summary)
1.2 代碼剖析工具
# 使用profvis進行代碼剖析
library(profvis)
# 創建一個性能有問題的函數
slow_function <- function(n) {
result <- list()
# 不高效的循環
for (i in 1:n) {
# 每次循環都創建新向量
temp <- sample(1:100, 1000, replace = TRUE)
result[[i]] <- mean(temp)
}
return(unlist(result))
}
# 運行性能剖析
profvis({
# 運行慢函數
result <- slow_function(1000)
# 運行優化版本
fast_result <- fast_function(1000)
})
# 優化版本
fast_function <- function(n) {
# 預分配內存
result <- numeric(n)
for (i in 1:n) {
# 使用更高效的函數
result[i] <- mean(sample(1:100, 1000, replace = TRUE))
}
return(result)
}
# 比較兩個函數的性能
comparison <- microbenchmark(
慢版本 = slow_function(100),
快版本 = fast_function(100),
times = 10
)
print(comparison)
# 2. 使用Rprof進行命令行剖析
# 創建一個分析文件
Rprof("profile.out")
# 運行要分析的代碼
for (i in 1:100) {
slow_function(10)
}
# 停止剖析
Rprof(NULL)
# 查看分析結果
summaryRprof("profile.out")
# 3. 火焰圖分析
install.packages("profr")
library(profr)
# 使用profr進行剖析
prof_result <- profr({
for (i in 1:50) {
slow_function(20)
}
})
# 查看剖析結果
plot(prof_result)
# 4. 函數調用分析
# 使用trace函數跟蹤函數調用
trace_sum <- 0
trace("sum", tracer = quote({
cat("調用sum函數,參數長度:", length(..1), "\n")
trace_sum <<- trace_sum + 1
}), print = FALSE)
# 運行一些代碼
result <- sum(1:100)
result2 <- sum(c(1, 2, 3, 4, 5))
# 取消跟蹤
untrace("sum")
cat("sum函數被調用了", trace_sum, "次\n")
# 5. 內存使用跟蹤
# 使用pryr包跟蹤內存使用
install.packages("pryr")
library(pryr)
# 查看對象內存使用
x <- 1:1000000
object_size(x)
# 比較不同數據結構的內存使用
list_vs_vector <- function(n) {
list_result <- as.list(1:n)
vector_result <- 1:n
cat("列表大小:", object_size(list_result) / 1024 / 1024, "MB\n")
cat("向量大小:", object_size(vector_result) / 1024 / 1024, "MB\n")
return(list(list = list_result, vector = vector_result))
}
# 測試
mem_test <- list_vs_vector(1000000)
# 6. 垃圾回收分析
# 強制垃圾回收
gc()
# 查看垃圾回收統計
gc_info <- gc()
print(gc_info)
# 監控內存使用變化
mem_before <- mem_used()
x <- rnorm(1000000)
mem_after <- mem_used()
cat("內存增加:", (mem_after - mem_before) / 1024 / 1024, "MB\n")
# 清理
rm(x)
gc()
🔄 第二部分:並行計算
2.1 基礎並行計算
# 1. 使用parallel包進行並行計算
library(parallel)
# 檢測CPU核心數
n_cores <- detectCores()
cat("檢測到", n_cores, "個CPU核心\n")
# 2. 使用mclapply進行並行lapply(Unix-like系統)
# 注意:mclapply在Windows上可能不支持
if (.Platform$OS.type != "windows") {
# 創建一個計算密集型函數
compute_intensive <- function(x) {
Sys.sleep(0.1) # 模擬耗時操作
return(x^2)
}
# 順序執行
tic("順序執行")
sequential_result <- lapply(1:20, compute_intensive)
toc()
# 並行執行
tic("並行執行 (4核心)")
parallel_result <- mclapply(1:20, compute_intensive, mc.cores = 4)
toc()
# 驗證結果
identical(unlist(sequential_result), unlist(parallel_result))
}
# 3. 使用parLapply進行跨平台並行(Windows兼容)
cl <- makeCluster(4) # 創建4個節點的集羣
clusterExport(cl, "compute_intensive") # 導出函數到各個節點
tic("parLapply並行")
par_result <- parLapply(cl, 1:20, compute_intensive)
toc()
# 停止集羣
stopCluster(cl)
# 4. 並行foreach循環
install.packages("foreach")
install.packages("doParallel")
library(foreach)
library(doParallel)
# 註冊並行後端
cl <- makeCluster(4)
registerDoParallel(cl)
# 使用foreach進行並行計算
tic("foreach並行")
foreach_result <- foreach(i = 1:20, .combine = c) %dopar% {
compute_intensive(i)
}
toc()
# 停止集羣
stopCluster(cl)
# 5. 比較不同並行方法的性能
compare_parallel_methods <- function(n_tasks = 20, sleep_time = 0.1) {
# 修改計算函數
compute_task <- function(x) {
Sys.sleep(sleep_time)
return(x^2)
}
results <- list()
# 順序執行
tic("順序執行")
sequential <- lapply(1:n_tasks, compute_task)
seq_time <- toc(quiet = TRUE)$toc - toc(quiet = TRUE)$tic
results$sequential <- seq_time
# parLapply並行
cl <- makeCluster(4)
clusterExport(cl, "compute_task")
tic("parLapply並行")
par <- parLapply(cl, 1:n_tasks, compute_task)
par_time <- toc(quiet = TRUE)$toc - toc(quiet = TRUE)$tic
results$parLapply <- par_time
stopCluster(cl)
# foreach並行
cl <- makeCluster(4)
registerDoParallel(cl)
tic("foreach並行")
foreach_result <- foreach(i = 1:n_tasks, .combine = c) %dopar% {
compute_task(i)
}
foreach_time <- toc(quiet = TRUE)$toc - toc(quiet = TRUE)$tic
results$foreach <- foreach_time
stopCluster(cl)
# 計算加速比
results$speedup_par <- seq_time / par_time
results$speedup_foreach <- seq_time / foreach_time
return(results)
}
# 運行比較
parallel_comparison <- compare_parallel_methods(40, 0.05)
print(parallel_comparison)
2.2 高級並行模式
# 1. 任務並行 vs 數據並行
# 任務並行:不同任務在不同核心上執行
# 數據並行:相同任務在不同數據子集上執行
# 任務並行示例
task_parallel <- function() {
library(parallel)
# 定義不同的任務
task1 <- function() {
Sys.sleep(1)
return("任務1完成")
}
task2 <- function() {
Sys.sleep(2)
return("任務2完成")
}
task3 <- function() {
Sys.sleep(0.5)
return("任務3完成")
}
# 並行執行任務
cl <- makeCluster(3)
tic("任務並行")
results <- clusterApply(cl, list(task1, task2, task3), function(f) f())
toc()
stopCluster(cl)
return(results)
}
# 運行任務並行
task_results <- task_parallel()
print(task_results)
# 數據並行示例
data_parallel <- function(n = 1000000, n_cores = 4) {
library(parallel)
# 生成大數據
data <- rnorm(n)
# 分割數據
chunk_size <- ceiling(n / n_cores)
chunks <- split(data, ceiling(seq_along(data) / chunk_size))
# 定義處理函數
process_chunk <- function(chunk) {
# 模擬計算密集型操作
mean_value <- mean(chunk)
sd_value <- sd(chunk)
list(mean = mean_value, sd = sd_value)
}
# 並行處理數據塊
cl <- makeCluster(n_cores)
tic("數據並行")
results <- parLapply(cl, chunks, process_chunk)
toc()
stopCluster(cl)
# 合併結果
combined_mean <- mean(sapply(results, function(x) x$mean))
combined_sd <- sqrt(mean(sapply(results, function(x) x$sd^2)))
return(list(mean = combined_mean, sd = combined_sd))
}
# 運行數據並行
data_results <- data_parallel(1000000, 4)
print(data_results)
# 2. 負載均衡並行
load_balanced_parallel <- function(tasks, n_cores = 4) {
library(parallel)
# 創建集羣
cl <- makeCluster(n_cores)
# 使用clusterApplyLB進行負載均衡
tic("負載均衡並行")
results <- clusterApplyLB(cl, tasks, function(task) {
Sys.sleep(task$duration) # 模擬任務持續時間
return(paste("任務", task$id, "完成"))
})
toc()
stopCluster(cl)
return(results)
}
# 創建不同時長的任務
tasks <- lapply(1:12, function(i) {
list(id = i, duration = runif(1, 0.1, 2))
})
# 運行負載均衡並行
lb_results <- load_balanced_parallel(tasks, 4)
print(unlist(lb_results))
# 3. 並行隨機數生成
# 設置隨機種子以確保可重複性
set.seed(123, kind = "L'Ecuyer-CMRG")
parallel_random <- function(n_sim = 10000, n_cores = 4) {
library(parallel)
# 創建集羣並設置隨機種子
cl <- makeCluster(n_cores)
clusterSetRNGStream(cl, iseed = 123)
# 並行模擬
results <- parLapply(cl, 1:n_cores, function(i) {
# 每個核心執行部分模擬
n_per_core <- n_sim / n_cores
simulations <- numeric(n_per_core)
for (j in 1:n_per_core) {
simulations[j] <- mean(rnorm(1000))
}
return(simulations)
})
stopCluster(cl)
# 合併結果
all_simulations <- unlist(results)
return(list(
mean = mean(all_simulations),
sd = sd(all_simulations),
n = length(all_simulations)
))
}
# 運行並行隨機模擬
random_results <- parallel_random(10000, 4)
print(random_results)
# 4. 並行文件處理
parallel_file_processing <- function(file_paths, n_cores = 4) {
library(parallel)
# 定義文件處理函數
process_file <- function(file_path) {
# 模擬文件處理
Sys.sleep(runif(1, 0.5, 2))
# 讀取文件(這裏只是示例)
file_info <- file.info(file_path)
return(list(
file = basename(file_path),
size = file_info$size,
processed = TRUE
))
}
# 創建集羣
cl <- makeCluster(n_cores)
# 並行處理文件
tic("並行文件處理")
results <- parLapply(cl, file_paths, process_file)
toc()
stopCluster(cl)
return(results)
}
# 創建模擬文件列表(實際使用時替換為真實文件路徑)
# file_list <- list.files("data/", full.names = TRUE)
# file_results <- parallel_file_processing(file_list, 4)
2.3 分佈式計算
# 1. 使用future包進行異步計算
install.packages("future")
install.packages("future.apply")
library(future)
library(future.apply)
# 設置並行計劃
plan(multisession, workers = 4) # 多會話並行
# 使用future_lapply進行並行計算
slow_square <- function(x) {
Sys.sleep(0.1)
return(x^2)
}
tic("future並行")
future_results <- future_lapply(1:20, slow_square)
toc()
# 2. 異步執行
# 使用%<-%運算符進行異步賦值
future_value %<-% {
Sys.sleep(2)
rnorm(1000)
}
# 主線程可以繼續執行其他任務
cat("異步任務已啓動,繼續執行其他任務...\n")
for (i in 1:5) {
cat("處理其他任務", i, "...\n")
Sys.sleep(0.5)
}
# 獲取異步結果
cat("異步任務結果長度:", length(future_value), "\n")
# 3. 使用sparklyr進行分佈式計算
# 注意:需要安裝Spark
# install.packages("sparklyr")
# library(sparklyr)
#
# # 連接Spark集羣
# sc <- spark_connect(master = "local")
#
# # 將數據複製到Spark
# iris_tbl <- copy_to(sc, iris, "iris_spark")
#
# # 在Spark上執行計算
# spark_result <- iris_tbl %>%
# group_by(Species) %>%
# summarize(
# count = n(),
# avg_sepal_length = mean(Sepal_Length)
# ) %>%
# collect()
#
# print(spark_result)
#
# # 斷開連接
# spark_disconnect(sc)
# 4. 使用rhadoop進行Hadoop集成
# 注意:需要Hadoop環境
# install.packages("rmr2")
# library(rmr2)
#
# # 設置Hadoop配置
# rmr.options(backend = "hadoop")
#
# # 在Hadoop上運行MapReduce
# wordcount <- function(input, output = NULL) {
# mapreduce(
# input = input,
# output = output,
# input.format = "text",
# map = function(k, v) {
# words <- unlist(strsplit(v, "\\s+"))
# keyval(words, 1)
# },
# reduce = function(k, vv) {
# keyval(k, sum(vv))
# }
# )
# }
#
# # 運行wordcount
# # result <- wordcount("hdfs://input.txt", "hdfs://output")
# 5. 分佈式數據框處理
distributed_data_processing <- function(n_rows = 1e6, n_cores = 4) {
library(parallel)
library(dplyr)
# 生成大數據
big_data <- data.frame(
id = 1:n_rows,
value1 = rnorm(n_rows),
value2 = runif(n_rows),
category = sample(letters[1:10], n_rows, replace = TRUE)
)
# 分割數據
chunk_size <- ceiling(n_rows / n_cores)
data_chunks <- split(big_data,
ceiling(seq_along(big_data$id) / chunk_size))
# 創建處理函數
process_chunk <- function(chunk) {
library(dplyr)
result <- chunk %>%
group_by(category) %>%
summarize(
count = n(),
mean_value1 = mean(value1),
mean_value2 = mean(value2),
sd_value1 = sd(value1)
)
return(as.data.frame(result))
}
# 並行處理
cl <- makeCluster(n_cores)
clusterEvalQ(cl, library(dplyr))
tic("分佈式數據處理")
chunk_results <- parLapply(cl, data_chunks, process_chunk)
toc()
stopCluster(cl)
# 合併結果
final_result <- bind_rows(chunk_results) %>%
group_by(category) %>%
summarize(
total_count = sum(count),
weighted_mean1 = sum(mean_value1 * count) / sum(count),
weighted_mean2 = sum(mean_value2 * count) / sum(count),
pooled_sd1 = sqrt(sum((sd_value1^2 + mean_value1^2) * count) /
sum(count) - weighted_mean1^2)
)
return(final_result)
}
# 運行分佈式處理
distributed_result <- distributed_data_processing(1000000, 4)
print(distributed_result)
💾 第三部分:內存管理與優化
3.1 內存優化技巧
# 1. 對象大小分析
library(pryr)
# 查看對象大小
analyze_memory <- function() {
# 創建不同大小的對象
small_vec <- 1:1000
medium_vec <- 1:10000
large_vec <- 1:100000
huge_vec <- 1:1000000
# 計算大小
sizes <- data.frame(
對象 = c("small_vec", "medium_vec", "large_vec", "huge_vec"),
長度 = c(1000, 10000, 100000, 1000000),
內存_MB = c(
object_size(small_vec) / 1024 / 1024,
object_size(medium_vec) / 1024 / 1024,
object_size(large_vec) / 1024 / 1024,
object_size(huge_vec) / 1024 / 1024
)
)
return(sizes)
}
memory_sizes <- analyze_memory()
print(memory_sizes)
# 2. 避免內存拷貝
# 使用tracemem跟蹤內存拷貝
x <- 1:10
tracemem(x)
# 修改對象 - 會創建副本
x[5] <- 100
# 停止跟蹤
untracemem(x)
# 3. 使用原地修改減少內存使用
library(data.table)
# data.table的引用語義
dt <- data.table(a = 1:10, b = letters[1:10])
tracemem(dt)
# 原地修改列
dt[, a := a * 2]
# 注意:data.table的:=操作符進行原地修改,不會創建副本
untracemem(dt)
# 4. 預分配內存
# 不推薦的寫法:動態擴展
slow_vector_growth <- function(n) {
result <- c()
for (i in 1:n) {
result <- c(result, i) # 每次循環都創建新向量
}
return(result)
}
# 推薦的寫法:預分配內存
fast_vector_growth <- function(n) {
result <- numeric(n) # 預分配內存
for (i in 1:n) {
result[i] <- i
}
return(result)
}
# 比較性能
n <- 10000
benchmark_result <- microbenchmark(
動態擴展 = slow_vector_growth(n),
預分配 = fast_vector_growth(n),
times = 10
)
print(benchmark_result)
# 5. 內存映射文件
# 使用bigmemory包處理大數據
install.packages("bigmemory")
library(bigmemory)
# 創建內存映射矩陣
big_matrix <- big.matrix(nrow = 10000, ncol = 1000,
init = 0,
backingfile = "big_matrix.bin",
descriptorfile = "big_matrix.desc")
# 填充數據
for (i in 1:1000) {
big_matrix[, i] <- rnorm(10000)
}
# 查看大小
object_size(big_matrix) # 小對象,實際數據在磁盤上
# 可以進行矩陣運算
col_means <- colmean(big_matrix)
row_sums <- rowsum(big_matrix)
# 6. 分塊處理大數據
chunk_processing <- function(data, chunk_size, process_func) {
n <- nrow(data)
chunks <- ceiling(n / chunk_size)
results <- list()
for (i in 1:chunks) {
start <- (i - 1) * chunk_size + 1
end <- min(i * chunk_size, n)
chunk <- data[start:end, ]
results[[i]] <- process_func(chunk)
# 及時清理內存
rm(chunk)
gc()
cat(sprintf("處理進度: %.1f%%\n", i/chunks * 100))
}
return(do.call(rbind, results))
}
# 示例:分塊計算統計量
process_chunk_stats <- function(chunk) {
data.frame(
mean_x = mean(chunk$x),
mean_y = mean(chunk$y),
count = nrow(chunk)
)
}
# 創建大數據
big_data <- data.frame(
x = rnorm(1000000),
y = runif(1000000)
)
# 分塊處理
chunked_results <- chunk_processing(big_data, 10000, process_chunk_stats)
# 合併結果
final_stats <- data.frame(
overall_mean_x = sum(chunked_results$mean_x * chunked_results$count) /
sum(chunked_results$count),
overall_mean_y = sum(chunked_results$mean_y * chunked_results$count) /
sum(chunked_results$count),
total_count = sum(chunked_results$count)
)
print(final_stats)
3.2 垃圾回收優化
# 1. 手動垃圾回收
monitor_memory <- function() {
# 查看當前內存使用
mem_before <- gc(reset = TRUE)
# 創建一些對象
x <- rnorm(1000000)
y <- matrix(rnorm(1000000), nrow = 1000)
z <- list(x = x, y = y)
# 查看內存使用變化
mem_after <- gc()
cat("內存使用變化:\n")
cat(" 已使用:", (mem_after[2, 2] - mem_before[2, 2]) / 1024, "MB\n")
cat(" 最大值:", (mem_after[2, 6] - mem_before[2, 6]) / 1024, "MB\n")
# 刪除對象
rm(x, y, z)
# 強制垃圾回收
gc(verbose = TRUE)
}
monitor_memory()
# 2. 內存使用監控函數
continuous_memory_monitor <- function(interval = 1, duration = 10) {
library(pryr)
memory_history <- data.frame(
time = numeric(),
mem_used = numeric(),
stringsAsFactors = FALSE
)
start_time <- Sys.time()
while (difftime(Sys.time(), start_time, units = "secs") < duration) {
current_time <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
current_mem <- mem_used() / 1024 / 1024 # MB
memory_history <- rbind(memory_history,
data.frame(time = current_time,
mem_used = current_mem))
Sys.sleep(interval)
}
return(memory_history)
}
# 運行內存監控
# memory_usage <- continuous_memory_monitor(0.5, 5)
# plot(memory_usage$time, memory_usage$mem_used, type = "l",
# xlab = "時間 (秒)", ylab = "內存使用 (MB)")
# 3. 優化循環中的內存使用
optimized_loop <- function(n_iterations) {
# 預分配結果向量
results <- numeric(n_iterations)
for (i in 1:n_iterations) {
# 在循環內創建臨時對象
temp_data <- rnorm(1000)
# 計算結果
results[i] <- mean(temp_data)
# 顯式刪除臨時對象
rm(temp_data)
# 定期垃圾回收
if (i %% 100 == 0) {
gc()
}
}
return(results)
}
# 4. 使用環境減少內存拷貝
# 環境使用引用語義
create_counter <- function() {
count <- 0
list(
increment = function() {
count <<- count + 1
},
get_count = function() {
return(count)
},
reset = function() {
count <<- 0
}
)
}
# 測試計數器
counter <- create_counter()
counter$increment()
counter$increment()
cat("計數值:", counter$get_count(), "\n")
# 5. 內存泄漏檢測
detect_memory_leak <- function() {
# 創建可能泄漏的函數
leaky_function <- function() {
# 全局賦值可能造成內存泄漏
global_var <<- rnorm(100000)
}
# 多次調用
for (i in 1:10) {
leaky_function()
# 每次調用後檢查內存
gc_info <- gc()
cat(sprintf("調用 %d 後內存: %.2f MB\n",
i, gc_info[2, 2] / 1024))
}
# 清理
rm(global_var, envir = .GlobalEnv)
gc()
}
# 運行內存泄漏檢測
detect_memory_leak()
# 6. 使用ff包處理超大向量
install.packages("ff")
library(ff)
# 創建ff向量
ff_vector <- ff(vmode = "double", length = 10000000)
# 填充數據
for (i in 1:10) {
start <- (i - 1) * 1000000 + 1
end <- i * 1000000
ff_vector[start:end] <- rnorm(1000000)
cat(sprintf("填充進度: %d%%\n", i * 10))
}
# 計算統計量
ff_mean <- mean(ff_vector[])
ff_sd <- sd(ff_vector[])
cat("ff向量均值:", ff_mean, "\n")
cat("ff向量標準差:", ff_sd, "\n")
# 釋放內存
rm(ff_vector)
gc()
🚀 第四部分:Rcpp與C++集成
4.1 Rcpp基礎
# 1. 安裝和加載Rcpp
install.packages("Rcpp")
library(Rcpp)
# 2. 簡單的C++函數
cppFunction('
double cpp_sum(NumericVector x) {
int n = x.size();
double total = 0;
for(int i = 0; i < n; ++i) {
total += x[i];
}
return total;
}
')
# 測試C++函數
x <- rnorm(1000000)
system.time(cpp_result <- cpp_sum(x))
system.time(r_result <- sum(x))
cat("C++結果:", cpp_result, "\n")
cat("R結果:", r_result, "\n")
cat("結果一致:", all.equal(cpp_result, r_result), "\n")
# 3. 矩陣運算
cppFunction('
NumericMatrix cpp_matrix_mult(NumericMatrix A, NumericMatrix B) {
int nrow = A.nrow();
int ncol = B.ncol();
int inner = A.ncol();
NumericMatrix C(nrow, ncol);
for (int i = 0; i < nrow; ++i) {
for (int j = 0; j < ncol; ++j) {
double sum = 0;
for (int k = 0; k < inner; ++k) {
sum += A(i, k) * B(k, j);
}
C(i, j) = sum;
}
}
return C;
}
')
# 測試矩陣乘法
A <- matrix(rnorm(1000), nrow = 100, ncol = 10)
B <- matrix(rnorm(500), nrow = 10, ncol = 50)
# 比較性能
microbenchmark(
R = A %*% B,
Cpp = cpp_matrix_mult(A, B),
times = 100
)
# 4. 帶有條件的循環
cppFunction('
NumericVector cpp_filter_greater(NumericVector x, double threshold) {
int n = x.size();
int count = 0;
// 第一次循環:計算滿足條件的元素數量
for(int i = 0; i < n; ++i) {
if(x[i] > threshold) {
count++;
}
}
// 創建結果向量
NumericVector result(count);
int j = 0;
// 第二次循環:填充結果
for(int i = 0; i < n; ++i) {
if(x[i] > threshold) {
result[j] = x[i];
j++;
}
}
return result;
}
')
# 測試過濾函數
x <- rnorm(1000000)
threshold <- 0.5
microbenchmark(
R = x[x > threshold],
Cpp = cpp_filter_greater(x, threshold),
times = 100
)
4.2 高級Rcpp特性
# 1. 使用RcppArmadillo進行線性代數運算
install.packages("RcppArmadillo")
library(RcppArmadillo)
# 定義Armadillo函數
cppFunction(depends = "RcppArmadillo", '
arma::mat arma_matrix_mult(const arma::mat& A, const arma::mat& B) {
return A * B;
}
')
# 測試Armadillo矩陣乘法
A <- matrix(rnorm(10000), nrow = 100)
B <- matrix(rnorm(10000), nrow = 100)
microbenchmark(
R = A %*% B,
Armadillo = arma_matrix_mult(A, B),
times = 100
)
# 2. 使用RcppSugar簡化代碼
cppFunction('
NumericVector cpp_sugar_example(NumericVector x) {
// 使用RcppSugar語法
return x * x + 2 * x + 1;
}
')
# 測試
x <- 1:10
cpp_sugar_example(x)
# 3. 處理數據框
cppFunction('
DataFrame cpp_summarize(DataFrame df) {
// 從數據框提取列
NumericVector x = df["x"];
NumericVector y = df["y"];
int n = x.size();
// 計算統計量
double x_mean = mean(x);
double y_mean = mean(y);
double correlation = cor(x, y)[0];
// 返回結果數據框
return DataFrame::create(
_["statistic"] = CharacterVector::create("x_mean", "y_mean", "correlation"),
_["value"] = NumericVector::create(x_mean, y_mean, correlation)
);
}
')
# 測試數據框處理
test_df <- data.frame(x = rnorm(1000), y = rnorm(1000))
cpp_summarize(test_df)
# 4. 使用RcppParallel進行並行計算
install.packages("RcppParallel")
library(RcppParallel)
# 定義並行計算函數
cppFunction(depends = "RcppParallel", '
#include <RcppParallel.h>
using namespace RcppParallel;
struct SquareRoot : public Worker {
const RVector<double> input;
RVector<double> output;
SquareRoot(const Rcpp::NumericVector& input, Rcpp::NumericVector& output)
: input(input), output(output) {}
void operator()(std::size_t begin, std::size_t end) {
for (std::size_t i = begin; i < end; ++i) {
output[i] = std::sqrt(input[i]);
}
}
};
// [[Rcpp::export]]
Rcpp::NumericVector parallel_sqrt(Rcpp::NumericVector x) {
Rcpp::NumericVector output(x.size());
SquareRoot squareRoot(x, output);
parallelFor(0, x.size(), squareRoot);
return output;
}
')
# 測試並行計算
x <- runif(10000000, 0, 100)
microbenchmark(
R = sqrt(x),
RcppParallel = parallel_sqrt(x),
times = 10
)
# 5. 調用外部C++庫
# 創建C++源文件
writeLines('
#include <Rcpp.h>
#include <algorithm> // 使用標準庫算法
// [[Rcpp::export]]
Rcpp