📚 今日目標
- 學習連接不同類型數據庫(SQLite、MySQL、PostgreSQL)
- 掌握使用DBI和odbc包進行數據庫操作
- 學習使用dplyr操作數據庫
- 掌握SQL查詢與R代碼的整合
- 學習大數據處理策略和優化
🗄️ 第一部分:數據庫基礎
1.1 數據庫連接概述
# R語言連接數據庫的主要方式:
# 1. DBI + 數據庫驅動包(RSQLite, RMySQL, RPostgreSQL等)
# 2. odbc包(統一接口)
# 3. RJDBC(Java數據庫連接)
# 安裝必要的數據庫包
install.packages(c("DBI", "RSQLite", "odbc", "RMySQL", "RPostgres",
"dbplyr", "dplyr", "pool", "keyring"))
# 加載基礎包
library(DBI)
1.2 SQLite數據庫連接
# SQLite是輕量級文件數據庫,適合學習和測試
# 安裝SQLite驅動
install.packages("RSQLite")
library(RSQLite)
# 連接到內存數據庫(臨時)
con_memory <- dbConnect(RSQLite::SQLite(), ":memory:")
# 連接到文件數據庫
con_file <- dbConnect(RSQLite::SQLite(), "mydatabase.db")
# 查看連接信息
dbGetInfo(con_memory)
# 斷開連接
dbDisconnect(con_memory)
dbDisconnect(con_file)
# 使用後自動斷開(推薦)
con <- dbConnect(RSQLite::SQLite(), ":memory:")
# ... 數據庫操作 ...
dbDisconnect(con)
# 或者使用on.exit確保斷開
connect_db <- function(db_path) {
con <- dbConnect(RSQLite::SQLite(), db_path)
on.exit(dbDisconnect(con), add = TRUE)
return(con)
}
🔌 第二部分:數據庫基本操作
2.1 創建和管理表
# 創建數據庫連接
con <- dbConnect(RSQLite::SQLite(), ":memory:")
# 創建表
create_table_sql <- "
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
department TEXT,
salary REAL,
hire_date DATE,
is_active BOOLEAN DEFAULT 1
)"
dbExecute(con, create_table_sql)
# 查看所有表
tables <- dbListTables(con)
print(tables)
# 查看錶結構
dbListFields(con, "employees")
# 刪除表
dbExecute(con, "DROP TABLE IF EXISTS employees")
# 重新創建表並添加約束
create_table_with_constraints <- "
CREATE TABLE employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER CHECK(age >= 18 AND age <= 70),
department TEXT DEFAULT 'General',
salary REAL CHECK(salary >= 0),
hire_date DATE DEFAULT CURRENT_DATE,
email TEXT UNIQUE,
is_active BOOLEAN DEFAULT 1
)"
dbExecute(con, create_table_with_constraints)
# 創建索引提高查詢性能
dbExecute(con, "CREATE INDEX idx_department ON employees(department)")
dbExecute(con, "CREATE INDEX idx_hire_date ON employees(hire_date)")
2.2 數據插入
# 插入單行數據
insert_sql <- "
INSERT INTO employees (name, age, department, salary, hire_date, email)
VALUES ('張三', 28, '技術部', 15000.00, '2022-03-15', 'zhangsan@company.com')
"
dbExecute(con, insert_sql)
# 插入多行數據
employees_data <- data.frame(
name = c("李四", "王五", "趙六"),
age = c(32, 25, 40),
department = c("市場部", "技術部", "財務部"),
salary = c(12000, 18000, 20000),
hire_date = c("2021-06-10", "2023-01-20", "2019-11-05"),
email = c("lisi@company.com", "wangwu@company.com", "zhaoliu@company.com")
)
# 方法1:逐行插入(不推薦,效率低)
for (i in 1:nrow(employees_data)) {
sql <- sprintf(
"INSERT INTO employees (name, age, department, salary, hire_date, email)
VALUES ('%s', %d, '%s', %.2f, '%s', '%s')",
employees_data$name[i],
employees_data$age[i],
employees_data$department[i],
employees_data$salary[i],
employees_data$hire_date[i],
employees_data$email[i]
)
dbExecute(con, sql)
}
# 方法2:使用dbWriteTable(推薦)
# 先清空表
dbExecute(con, "DELETE FROM employees")
# 使用dbWriteTable插入數據
dbWriteTable(con, "employees", employees_data,
append = TRUE, # 追加數據
row.names = FALSE) # 不寫入行名
# 方法3:使用參數化查詢(安全,防止SQL注入)
insert_prepared <- dbSendStatement(con, "
INSERT INTO employees (name, age, department, salary, hire_date, email)
VALUES (?, ?, ?, ?, ?, ?)
")
dbBind(insert_prepared, list(
"孫七", 35, "人事部", 16000, "2020-09-18", "sunqi@company.com"
))
dbClearResult(insert_prepared)
# 查看插入的數據
result <- dbGetQuery(con, "SELECT * FROM employees")
print(result)
2.3 數據查詢
# 查詢所有數據
all_employees <- dbGetQuery(con, "SELECT * FROM employees")
print(all_employees)
# 條件查詢
tech_employees <- dbGetQuery(con, "
SELECT name, age, salary, hire_date
FROM employees
WHERE department = '技術部' AND is_active = 1
ORDER BY salary DESC
")
print(tech_employees)
# 聚合查詢
summary_stats <- dbGetQuery(con, "
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MIN(salary) as min_salary,
MAX(salary) as max_salary,
SUM(salary) as total_salary
FROM employees
WHERE is_active = 1
GROUP BY department
HAVING COUNT(*) > 0
ORDER BY avg_salary DESC
")
print(summary_stats)
# 連接查詢(創建部門表)
dbExecute(con, "
CREATE TABLE departments (
dept_id INTEGER PRIMARY KEY,
dept_name TEXT UNIQUE,
manager TEXT,
budget REAL
)")
dbWriteTable(con, "departments",
data.frame(
dept_id = 1:4,
dept_name = c("技術部", "市場部", "財務部", "人事部"),
manager = c("張三", "李四", "王五", "趙六"),
budget = c(500000, 300000, 400000, 200000)
),
append = TRUE,
row.names = FALSE
)
# 內連接查詢
join_query <- "
SELECT
e.name,
e.salary,
d.dept_name,
d.manager,
d.budget
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name
ORDER BY e.salary DESC
"
join_result <- dbGetQuery(con, join_query)
print(join_result)
# 分頁查詢
page_size <- 2
page_number <- 1
paged_query <- sprintf("
SELECT * FROM employees
ORDER BY id
LIMIT %d OFFSET %d",
page_size, (page_number - 1) * page_size)
paged_result <- dbGetQuery(con, paged_query)
print(paged_result)
2.4 數據更新和刪除
# 更新數據
# 給技術部所有員工加薪10%
update_sql <- "
UPDATE employees
SET salary = salary * 1.1
WHERE department = '技術部' AND is_active = 1
"
rows_updated <- dbExecute(con, update_sql)
cat("更新了", rows_updated, "行數據\n")
# 更新特定員工信息
update_employee <- "
UPDATE employees
SET salary = ?, department = ?
WHERE name = ? AND is_active = 1
"
stmt <- dbSendStatement(con, update_employee)
dbBind(stmt, list(22000, "技術部", "王五"))
dbClearResult(stmt)
# 刪除數據
# 刪除已離職員工(假設is_active=0表示離職)
delete_sql <- "DELETE FROM employees WHERE is_active = 0"
rows_deleted <- dbExecute(con, delete_sql)
cat("刪除了", rows_deleted, "行數據\n")
# 軟刪除(將is_active設為0)
soft_delete <- "
UPDATE employees
SET is_active = 0
WHERE hire_date < '2020-01-01'
"
rows_soft_deleted <- dbExecute(con, soft_delete)
cat("軟刪除了", rows_soft_deleted, "行數據\n")
# 驗證更新結果
updated_data <- dbGetQuery(con, "
SELECT name, department, salary, is_active
FROM employees
ORDER BY department, salary DESC
")
print(updated_data)
🔄 第三部分:使用dplyr操作數據庫
3.1 dbplyr基礎
# dbplyr允許使用dplyr語法操作數據庫
library(dplyr)
library(dbplyr)
# 重新建立連接
con <- dbConnect(RSQLite::SQLite(), ":memory:")
# 寫入示例數據
dbWriteTable(con, "mtcars", mtcars, row.names = FALSE)
dbWriteTable(con, "iris", iris, row.names = FALSE)
# 創建數據庫表對象
mtcars_tbl <- tbl(con, "mtcars")
iris_tbl <- tbl(con, "iris")
# 查看對象類型
class(mtcars_tbl)
# 基本的dplyr操作
result <- mtcars_tbl %>%
select(mpg, cyl, hp, wt) %>%
filter(cyl == 6, mpg > 20) %>%
arrange(desc(mpg))
# 查看生成的SQL
result %>% show_query()
# 執行查詢並獲取結果
collected_result <- result %>% collect()
print(collected_result)
3.2 高級dplyr操作
# 分組和彙總
grouped_stats <- mtcars_tbl %>%
group_by(cyl) %>%
summarize(
count = n(),
avg_mpg = mean(mpg, na.rm = TRUE),
avg_hp = mean(hp, na.rm = TRUE),
max_wt = max(wt, na.rm = TRUE),
min_wt = min(wt, na.rm = TRUE)
) %>%
arrange(desc(avg_mpg))
print(grouped_stats %>% collect())
# 連接操作
# 創建第二個表
manufacturers <- data.frame(
model = rownames(mtcars),
manufacturer = c(rep("Ford", 8), rep("Chevy", 3),
rep("Toyota", 3), rep("Honda", 2),
rep("Mazda", 1), rep("Datsun", 1),
rep("Merc", 7), rep("Fiat", 1),
rep("Porsche", 1), rep("Lotus", 1),
rep("Maserati", 1)),
country = c(rep("USA", 16), rep("Japan", 6),
rep("Germany", 7), rep("Italy", 2),
"UK")
)
dbWriteTable(con, "manufacturers", manufacturers, row.names = FALSE)
manuf_tbl <- tbl(con, "manufacturers")
# 連接查詢
joined_data <- mtcars_tbl %>%
inner_join(manuf_tbl, by = c("model" = "model")) %>%
select(model, mpg, cyl, hp, manufacturer, country) %>%
group_by(country) %>%
summarize(
car_count = n(),
avg_mpg = mean(mpg, na.rm = TRUE),
avg_hp = mean(hp, na.rm = TRUE)
) %>%
arrange(desc(avg_mpg))
print(joined_data %>% collect())
# 窗口函數
window_result <- mtcars_tbl %>%
select(model, mpg, cyl, hp) %>%
group_by(cyl) %>%
mutate(
mpg_rank = row_number(desc(mpg)),
avg_mpg_by_cyl = mean(mpg, na.rm = TRUE),
mpg_diff_from_avg = mpg - avg_mpg_by_cyl
) %>%
filter(mpg_rank <= 3) %>%
arrange(cyl, mpg_rank)
print(window_result %>% collect())
3.3 數據庫特有函數
# 使用SQL函數
sql_functions <- mtcars_tbl %>%
mutate(
mpg_category = case_when(
mpg < 15 ~ "低油耗",
mpg < 25 ~ "中油耗",
TRUE ~ "高油耗"
),
hp_per_ton = hp / wt,
year_added = 1970 + row_number() %% 10 # 模擬年份
) %>%
group_by(mpg_category) %>%
summarize(
count = n(),
avg_hp = mean(hp, na.rm = TRUE),
min_mpg = min(mpg, na.rm = TRUE),
max_mpg = max(mpg, na.rm = TRUE)
) %>%
ungroup()
print(sql_functions %>% collect())
# 日期函數(創建帶日期的表)
dates_data <- data.frame(
id = 1:100,
event_date = seq.Date(from = as.Date("2023-01-01"),
by = "day",
length.out = 100),
value = rnorm(100, mean = 100, sd = 20)
)
dbWriteTable(con, "events", dates_data, row.names = FALSE)
events_tbl <- tbl(con, "events")
# 使用日期函數
date_analysis <- events_tbl %>%
mutate(
year = as.integer(strftime(event_date, '%Y')),
month = as.integer(strftime(event_date, '%m')),
week = as.integer(strftime(event_date, '%W')),
day_of_week = strftime(event_date, '%w')
) %>%
group_by(year, month) %>%
summarize(
event_count = n(),
total_value = sum(value, na.rm = TRUE),
avg_value = mean(value, na.rm = TRUE)
) %>%
arrange(year, month)
print(date_analysis %>% collect())
🌐 第四部分:連接其他類型數據庫
4.1 MySQL數據庫連接
# 安裝MySQL驅動
install.packages("RMySQL")
library(RMySQL)
# 連接MySQL數據庫(示例代碼,需要實際數據庫)
# con_mysql <- dbConnect(
# RMySQL::MySQL(),
# host = "localhost", # 數據庫主機
# port = 3306, # 端口
# user = "your_username", # 用户名
# password = "your_password", # 密碼
# dbname = "your_database" # 數據庫名
# )
# 使用配置文件的更安全方法
# config <- list(
# host = Sys.getenv("DB_HOST"),
# port = as.integer(Sys.getenv("DB_PORT")),
# user = Sys.getenv("DB_USER"),
# password = Sys.getenv("DB_PASSWORD"),
# dbname = Sys.getenv("DB_NAME")
# )
# con_mysql <- do.call(dbConnect, c(RMySQL::MySQL(), config))
# 使用keyring包安全存儲密碼
install.packages("keyring")
library(keyring)
# 存儲密碼(第一次運行時設置)
# key_set_with_value("mysql_password", password = "your_password")
# 獲取密碼
# password <- key_get("mysql_password")
# 創建連接
# con_mysql <- dbConnect(
# RMySQL::MySQL(),
# host = "localhost",
# port = 3306,
# user = "your_username",
# password = password,
# dbname = "your_database"
# )
# MySQL特定操作
# 查看數據庫
# dbGetQuery(con_mysql, "SHOW DATABASES")
# 查看錶
# dbGetQuery(con_mysql, "SHOW TABLES")
# 斷開連接
# dbDisconnect(con_mysql)
4.2 PostgreSQL數據庫連接
# 安裝PostgreSQL驅動
install.packages("RPostgres")
library(RPostgres)
# 連接PostgreSQL數據庫(示例代碼)
# con_pg <- dbConnect(
# RPostgres::Postgres(),
# host = "localhost",
# port = 5432,
# user = "your_username",
# password = "your_password",
# dbname = "your_database"
# )
# PostgreSQL特有函數
# 使用PostGIS擴展(空間數據庫)
# spatial_query <- "
# SELECT
# name,
# ST_AsText(geometry) as wkt,
# ST_Area(geometry) as area
# FROM spatial_table
# WHERE ST_Within(geometry, ST_MakeEnvelope(0, 0, 100, 100, 4326))
# "
#
# spatial_data <- dbGetQuery(con_pg, spatial_query)
# 斷開連接
# dbDisconnect(con_pg)
4.3 使用odbc統一接口
# odbc包提供了統一的數據庫接口
install.packages("odbc")
library(odbc)
# 查看可用的ODBC驅動
drivers <- odbc::odbcListDrivers()
print(head(drivers))
# 使用ODBC連接(需要配置ODBC數據源)
# 連接SQL Server示例
# con_odbc <- dbConnect(
# odbc::odbc(),
# dsn = "Your_DSN_Name", # ODBC數據源名稱
# uid = "username", # 用户名
# pwd = "password", # 密碼
# database = "your_db" # 數據庫名
# )
# 或者直接使用連接字符串
# con_odbc <- dbConnect(
# odbc::odbc(),
# .connection_string = "Driver={ODBC Driver 17 for SQL Server};Server=localhost;Database=your_db;UID=username;PWD=password"
# )
# 執行查詢
# result <- dbGetQuery(con_odbc, "SELECT * FROM your_table")
# 斷開連接
# dbDisconnect(con_odbc)
4.4 數據庫連接池
# 對於Web應用或需要頻繁連接的情況,使用連接池
install.packages("pool")
library(pool)
# 創建連接池
# pool <- dbPool(
# RMySQL::MySQL(),
# host = "localhost",
# port = 3306,
# username = "your_username",
# password = "your_password",
# dbname = "your_database",
# minSize = 1, # 最小連接數
# maxSize = 10, # 最大連接數
# idleTimeout = 3600 # 空閒超時時間(秒)
# )
# 從池中獲取連接
# con <- poolCheckout(pool)
# 執行查詢
# result <- dbGetQuery(con, "SELECT * FROM your_table")
# 歸還連接到池
# poolReturn(con)
# 關閉連接池
# poolClose(pool)
# 使用dplyr與連接池
# pool_tbl <- tbl(pool, "your_table")
# result <- pool_tbl %>% filter(...) %>% collect()
🚀 第五部分:大數據處理策略
5.1 分塊處理大數據
# 當數據量太大無法一次性加載時,使用分塊處理
library(DBI)
# 創建大數據集示例
con <- dbConnect(RSQLite::SQLite(), ":memory:")
# 創建大數據表
set.seed(123)
big_data <- data.frame(
id = 1:1000000,
value1 = rnorm(1000000),
value2 = runif(1000000, 0, 100),
category = sample(letters[1:10], 1000000, replace = TRUE),
timestamp = seq.POSIXt(
from = as.POSIXct("2023-01-01"),
by = "sec",
length.out = 1000000
)
)
# 寫入數據庫
system.time({
dbWriteTable(con, "big_table", big_data, row.names = FALSE)
})
# 方法1:分塊查詢
process_in_chunks <- function(con, chunk_size = 100000) {
# 獲取總行數
total_rows <- dbGetQuery(con, "SELECT COUNT(*) as n FROM big_table")$n
# 計算塊數
chunks <- ceiling(total_rows / chunk_size)
results <- list()
for (i in 1:chunks) {
offset <- (i - 1) * chunk_size
# 分塊查詢
chunk_query <- sprintf("
SELECT * FROM big_table
LIMIT %d OFFSET %d",
chunk_size, offset
)
chunk_data <- dbGetQuery(con, chunk_query)
# 處理當前塊
chunk_result <- chunk_data %>%
group_by(category) %>%
summarize(
count = n(),
avg_value1 = mean(value1),
avg_value2 = mean(value2)
)
results[[i]] <- chunk_result
cat(sprintf("處理進度: %.1f%%\n", i/chunks * 100))
}
# 合併結果
final_result <- bind_rows(results) %>%
group_by(category) %>%
summarize(
total_count = sum(count),
avg_value1 = weighted.mean(avg_value1, count),
avg_value2 = weighted.mean(avg_value2, count)
)
return(final_result)
}
# 執行分塊處理
chunked_result <- process_in_chunks(con, chunk_size = 50000)
print(chunked_result)
5.2 使用索引優化查詢
# 為大數據表創建索引
cat("創建索引前查詢:\n")
system.time({
unindexed_query <- dbGetQuery(con, "
SELECT category, AVG(value1), AVG(value2)
FROM big_table
WHERE category IN ('a', 'b', 'c')
GROUP BY category
")
})
# 創建索引
cat("\n創建索引...\n")
dbExecute(con, "CREATE INDEX idx_category ON big_table(category)")
dbExecute(con, "CREATE INDEX idx_timestamp ON big_table(timestamp)")
cat("\n創建索引後查詢:\n")
system.time({
indexed_query <- dbGetQuery(con, "
SELECT category, AVG(value1), AVG(value2)
FROM big_table
WHERE category IN ('a', 'b', 'c')
GROUP BY category
")
})
# 複合索引
dbExecute(con, "CREATE INDEX idx_category_timestamp ON big_table(category, timestamp)")
# 查看索引信息
index_info <- dbGetQuery(con, "
SELECT name, tbl_name, sql
FROM sqlite_master
WHERE type = 'index'
")
print(index_info)
5.3 批量操作優化
# 批量插入優化
cat("\n=== 批量插入性能比較 ===\n")
# 準備測試數據
test_data <- data.frame(
id = 1:10000,
x = rnorm(10000),
y = rnorm(10000),
z = sample(letters[1:5], 10000, replace = TRUE)
)
# 創建測試表
dbExecute(con, "CREATE TABLE test_insert (id INTEGER, x REAL, y REAL, z TEXT)")
# 方法1:逐行插入(最慢)
cat("方法1:逐行插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
for (i in 1:100) { # 只測試100行
sql <- sprintf("INSERT INTO test_insert VALUES (%d, %f, %f, '%s')",
test_data$id[i], test_data$x[i], test_data$y[i], test_data$z[i])
dbExecute(con, sql)
}
})
# 方法2:參數化批量插入
cat("\n方法2:參數化批量插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
# 準備SQL語句
stmt <- dbSendStatement(con, "
INSERT INTO test_insert (id, x, y, z)
VALUES (?, ?, ?, ?)
")
# 批量綁定參數
for (i in 1:1000) { # 測試1000行
dbBind(stmt, list(
test_data$id[i],
test_data$x[i],
test_data$y[i],
test_data$z[i]
))
}
dbClearResult(stmt)
})
# 方法3:dbWriteTable(最快)
cat("\n方法3:dbWriteTable\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
dbWriteTable(con, "test_insert", test_data[1:5000, ],
append = TRUE, row.names = FALSE)
})
# 方法4:事務批量插入
cat("\n方法4:事務批量插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
# 開始事務
dbExecute(con, "BEGIN TRANSACTION")
stmt <- dbSendStatement(con, "
INSERT INTO test_insert (id, x, y, z)
VALUES (?, ?, ?, ?)
")
for (i in 1:1000) {
dbBind(stmt, list(
test_data$id[i],
test_data$x[i],
test_data$y[i],
test_data$z[i]
))
}
dbClearResult(stmt)
# 提交事務
dbExecute(con, "COMMIT")
})
5.4 數據庫導出和備份
# 導出數據到文件
# 導出為CSV
query_result <- dbGetQuery(con, "SELECT * FROM big_table LIMIT 10000")
write.csv(query_result, "exported_data.csv", row.names = FALSE)
# 導出為RDS(二進制格式,更快)
saveRDS(query_result, "exported_data.rds")
# 導出整個表(使用dbWriteTable反向)
# dbWriteTable(con, "backup_table", query_result, overwrite = TRUE)
# 數據庫備份(SQLite特定)
db_backup <- dbConnect(RSQLite::SQLite(), "backup.db")
dbExecute(db_backup, "ATTACH DATABASE ':memory:' AS main")
# 複製所有表
tables <- dbListTables(con)
for (table in tables) {
dbExecute(db_backup, sprintf("CREATE TABLE %s AS SELECT * FROM main.%s", table, table))
}
dbDisconnect(db_backup)
# 使用SQLite的備份命令
# system("sqlite3 mydatabase.db .dump > backup.sql")
🏭 第六部分:實戰案例
案例1:電商數據分析系統
# 創建電商數據庫系統
library(DBI)
library(RSQLite)
library(dplyr)
library(dbplyr)
# 創建數據庫連接
con <- dbConnect(RSQLite::SQLite(), "ecommerce.db")
# 創建表結構
create_tables <- function(con) {
# 用户表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
registration_date DATE DEFAULT CURRENT_DATE,
country TEXT,
is_active BOOLEAN DEFAULT 1
)
")
# 產品表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS products (
product_id INTEGER PRIMARY KEY AUTOINCREMENT,
product_name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL CHECK(price > 0),
stock_quantity INTEGER DEFAULT 0,
created_date DATE DEFAULT CURRENT_DATE
)
")
# 訂單表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS orders (
order_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
order_date DATE DEFAULT CURRENT_DATE,
total_amount REAL,
status TEXT DEFAULT 'pending',
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
")
# 訂單詳情表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS order_items (
order_item_id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER,
product_id INTEGER,
quantity INTEGER CHECK(quantity > 0),
unit_price REAL,
subtotal REAL GENERATED ALWAYS AS (quantity * unit_price) VIRTUAL,
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
)
")
# 創建索引
dbExecute(con, "CREATE INDEX idx_users_country ON users(country)")
dbExecute(con, "CREATE INDEX idx_products_category ON products(category)")
dbExecute(con, "CREATE INDEX idx_orders_user_date ON orders(user_id, order_date)")
dbExecute(con, "CREATE INDEX idx_order_items_order ON order_items(order_id)")
}
# 初始化表
create_tables(con)
# 插入示例數據
insert_sample_data <- function(con) {
# 插入用户數據
users <- data.frame(
username = paste0("user", 1:100),
email = paste0("user", 1:100, "@example.com"),
country = sample(c("中國", "美國", "英國", "日本", "德國"), 100, replace = TRUE),
registration_date = sample(
seq.Date(as.Date("2022-01-01"), as.Date("2023-12-31"), by = "day"),
100, replace = TRUE
)
)
dbWriteTable(con, "users", users, append = TRUE, row.names = FALSE)
# 插入產品數據
categories <- c("電子產品", "家居用品", "服裝", "食品", "圖書")
products <- data.frame(
product_name = paste0("產品", 1:50),
category = sample(categories, 50, replace = TRUE),
price = round(runif(50, 10, 1000), 2),
stock_quantity = sample(0:100, 50, replace = TRUE)
)
dbWriteTable(con, "products", products, append = TRUE, row.names = FALSE)
# 插入訂單數據
for (i in 1:200) {
user_id <- sample(1:100, 1)
order_date <- sample(
seq.Date(as.Date("2023-01-01"), as.Date("2023-12-31"), by = "day"),
1
)
# 創建訂單
order_id <- dbGetQuery(con, sprintf("
INSERT INTO orders (user_id, order_date, total_amount)
VALUES (%d, '%s', 0)
RETURNING order_id",
user_id, order_date
))$order_id
# 添加訂單項
n_items <- sample(1:5, 1)
total_amount <- 0
for (j in 1:n_items) {
product_id <- sample(1:50, 1)
quantity <- sample(1:3, 1)
# 獲取產品價格
price <- dbGetQuery(con, sprintf(
"SELECT price FROM products WHERE product_id = %d", product_id
))$price
subtotal <- quantity * price
total_amount <- total_amount + subtotal
dbExecute(con, sprintf("
INSERT INTO order_items (order_id, product_id, quantity, unit_price)
VALUES (%d, %d, %d, %.2f)",
order_id, product_id, quantity, price
))
}
# 更新訂單總金額
dbExecute(con, sprintf("
UPDATE orders SET total_amount = %.2f WHERE order_id = %d",
total_amount, order_id
))
}
}
# 插入數據
insert_sample_data(con)
# 數據分析函數
analyze_ecommerce <- function(con) {
# 使用dplyr進行復雜分析
users_tbl <- tbl(con, "users")
orders_tbl <- tbl(con, "orders")
products_tbl <- tbl(con, "products")
order_items_tbl <- tbl(con, "order_items")
cat("=== 電商數據分析報告 ===\n\n")
# 1. 用户分析
cat("1. 用户分析:\n")
user_analysis <- users_tbl %>%
group_by(country) %>%
summarize(
user_count = n(),
active_users = sum(is_active, na.rm = TRUE)
) %>%
arrange(desc(user_count)) %>%
collect()
print(user_analysis)
# 2. 銷售分析
cat("\n2. 銷售分析:\n")
sales_analysis <- orders_tbl %>%
inner_join(users_tbl, by = "user_id") %>%
group_by(country, strftime('%Y-%m', order_date)) %>%
summarize(
order_count = n(),
total_revenue = sum(total_amount, na.rm = TRUE),
avg_order_value = mean(total_amount, na.rm = TRUE)
) %>%
arrange(desc(total_revenue)) %>%
collect()
print(head(sales_analysis, 10))
# 3. 產品分析
cat("\n3. 產品分析:\n")
product_analysis <- order_items_tbl %>%
inner_join(products_tbl, by = "product_id") %>%
inner_join(orders_tbl, by = "order_id") %>%
group_by(category) %>%
summarize(
units_sold = sum(quantity, na.rm = TRUE),
revenue = sum(quantity * unit_price, na.rm = TRUE),
unique_products = n_distinct(product_id)
) %>%
arrange(desc(revenue)) %>%
collect()
print(product_analysis)
# 4. 用户價值分析
cat("\n4. 用户價值分析:\n")
customer_value <- orders_tbl %>%
inner_join(users_tbl, by = "user_id") %>%
group_by(user_id, username, country) %>%
summarize(
total_orders = n(),
total_spent = sum(total_amount, na.rm = TRUE),
avg_order_value = mean(total_amount, na.rm = TRUE),
first_order = min(order_date, na.rm = TRUE),
last_order = max(order_date, na.rm = TRUE)
) %>%
mutate(
days_since_last_order = as.numeric(
difftime(Sys.Date(), last_order, units = "days")
)
) %>%
arrange(desc(total_spent)) %>%
collect()
print(head(customer_value, 10))
# 5. 庫存分析
cat("\n5. 庫存分析:\n")
inventory_analysis <- products_tbl %>%
left_join(
order_items_tbl %>%
group_by(product_id) %>%
summarize(total_sold = sum(quantity, na.rm = TRUE)),
by = "product_id"
) %>%
mutate(
total_sold = coalesce(total_sold, 0),
stock_ratio = stock_quantity / (stock_quantity + total_sold),
status = case_when(
stock_quantity == 0 ~ "缺貨",
stock_ratio < 0.2 ~ "低庫存",
TRUE ~ "庫存充足"
)
) %>%
group_by(category, status) %>%
summarize(
product_count = n(),
avg_price = mean(price, na.rm = TRUE)
) %>%
arrange(category, status) %>%
collect()
print(inventory_analysis)
return(list(
user_analysis = user_analysis,
sales_analysis = sales_analysis,
product_analysis = product_analysis,
customer_value = customer_value,
inventory_analysis = inventory_analysis
))
}
# 執行分析
analysis_results <- analyze_ecommerce(con)
# 保存分析結果
saveRDS(analysis_results, "ecommerce_analysis.rds")
# 關閉連接
dbDisconnect(con)
# 重新連接查看數據庫文件大小
file_info <- file.info("ecommerce.db")
cat(sprintf("\n數據庫文件大小: %.2f MB\n", file_info$size / 1024 / 1024))
案例2:實時數據監控系統
# 實時數據監控和報警系統
library(DBI)
library(RSQLite)
library(dplyr)
# 創建監控數據庫
con <- dbConnect(RSQLite::SQLite(), "monitoring.db")
# 創建監控表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS system_metrics (
metric_id INTEGER PRIMARY KEY AUTOINCREMENT,
server_name TEXT NOT NULL,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
severity TEXT CHECK(severity IN ('正常', '警告', '嚴重')),
is_acknowledged BOOLEAN DEFAULT 0
)
")
# 創建索引
dbExecute(con, "CREATE INDEX idx_metrics_timestamp ON system_metrics(timestamp)")
dbExecute(con, "CREATE INDEX idx_metrics_severity ON system_metrics(severity, is_acknowledged)")
# 模擬實時數據生成
generate_metrics <- function(con, n = 100) {
servers <- c("web_server_1", "web_server_2", "db_server", "cache_server")
metrics <- c("cpu_usage", "memory_usage", "disk_usage", "network_traffic", "response_time")
for (i in 1:n) {
server <- sample(servers, 1)
metric <- sample(metrics, 1)
# 根據指標類型生成不同的值
if (metric == "cpu_usage") {
value <- runif(1, 10, 90) + rnorm(1, 0, 5)
} else if (metric == "memory_usage") {
value <- runif(1, 40, 95) + rnorm(1, 0, 3)
} else if (metric == "disk_usage") {
value <- runif(1, 30, 85)
} else if (metric == "network_traffic") {
value <- runif(1, 100, 1000)
} else {
value <- runif(1, 50, 500)
}
# 確定嚴重程度
if (metric %in% c("cpu_usage", "memory_usage") && value > 85) {
severity <- "嚴重"
} else if (metric %in% c("cpu_usage", "memory_usage") && value > 75) {
severity <- "警告"
} else if (metric == "response_time" && value > 300) {
severity <- "嚴重"
} else if (metric == "response_time" && value > 200) {
severity <- "警告"
} else {
severity <- "正常"
}
# 插入數據
dbExecute(con, sprintf("
INSERT INTO system_metrics (server_name, metric_name, metric_value, severity)
VALUES ('%s', '%s', %.2f, '%s')",
server, metric, value, severity
))
# 模擬實時延遲
Sys.sleep(0.1)
}
}
# 啓動數據生成器(在實際應用中可能在一個單獨的線程中)
# generate_metrics(con, 50)
# 監控查詢函數
monitor_system <- function(con) {
cat("\n=== 系統監控面板 ===\n")
cat("時間:", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), "\n\n")
# 1. 當前告警
current_alerts <- dbGetQuery(con, "
SELECT
server_name,
metric_name,
metric_value,
severity,
timestamp
FROM system_metrics
WHERE severity IN ('警告', '嚴重')
AND is_acknowledged = 0
AND timestamp > datetime('now', '-5 minutes')
ORDER BY severity DESC, timestamp DESC
")
if (nrow(current_alerts) > 0) {
cat("當前告警:\n")
print(current_alerts)
} else {
cat("當前告警: 無\n")
}
# 2. 服務器狀態概覽
server_status <- dbGetQuery(con, "
SELECT
server_name,
COUNT(*) as metric_count,
SUM(CASE WHEN severity = '嚴重' THEN 1 ELSE 0 END) as critical_count,
SUM(CASE WHEN severity = '警告' THEN 1 ELSE 0 END) as warning_count,
MAX(timestamp) as last_update
FROM system_metrics
WHERE timestamp > datetime('now', '-10 minutes')
GROUP BY server_name
ORDER BY critical_count DESC, warning_count DESC
")
cat("\n服務器狀態概覽 (最近10分鐘):\n")
print(server_status)
# 3. 指標趨勢
metric_trend <- dbGetQuery(con, "
SELECT
metric_name,
AVG(metric_value) as avg_value,
MIN(metric_value) as min_value,
MAX(metric_value) as max_value,
COUNT(*) as readings
FROM system_metrics
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY metric_name
ORDER BY metric_name
")
cat("\n指標趨勢 (最近1小時):\n")
print(metric_trend)
# 4. 性能分析
performance_analysis <- dbGetQuery(con, "
WITH hourly_stats AS (
SELECT
strftime('%Y-%m-%d %H:00:00', timestamp) as hour,
metric_name,
AVG(metric_value) as avg_value
FROM system_metrics
WHERE timestamp > datetime('now', '-24 hours')
GROUP BY strftime('%Y-%m-%d %H:00:00', timestamp), metric_name
)
SELECT
metric_name,
MIN(avg_value) as min_hourly_avg,
MAX(avg_value) as max_hourly_avg,
AVG(avg_value) as overall_avg
FROM hourly_stats
GROUP BY metric_name
ORDER BY metric_name
")
cat("\n性能分析 (最近24小時):\n")
print(performance_analysis)
return(list(
current_alerts = current_alerts,
server_status = server_status,
metric_trend = metric_trend,
performance_analysis = performance_analysis
))
}
# 模擬監控循環
simulate_monitoring <- function(con, iterations = 5) {
for (i in 1:iterations) {
cat("\n\n=== 監控循環", i, "===\n")
# 生成新數據
generate_metrics(con, 20)
# 運行監控
monitor_system(con)
# 確認一些告警(模擬)
if (i %% 2 == 0) {
dbExecute(con, "
UPDATE system_metrics
SET is_acknowledged = 1
WHERE severity = '警告'
AND is_acknowledged = 0
AND timestamp < datetime('now', '-1 minute')
")
}
# 等待一段時間
Sys.sleep(2)
}
}
# 運行模擬監控
# simulate_monitoring(con, 3)
# 清理函數
cleanup_old_data <- function(con, days_to_keep = 7) {
deleted_rows <- dbExecute(con, sprintf("
DELETE FROM system_metrics
WHERE timestamp < datetime('now', '-%d days')",
days_to_keep
))
cat(sprintf("清理了 %d 天前的數據,刪除了 %d 行\n",
days_to_keep, deleted_rows))
# 執行VACUUM釋放空間
dbExecute(con, "VACUUM")
}
# 清理舊數據
cleanup_old_data(con, 1)
# 關閉連接
dbDisconnect(con)
💻 今日練習
練習1:SQLite數據庫操作
# 1. 創建一個SQLite內存數據庫
# 2. 創建學生表,包含字段:學號、姓名、年齡、性別、成績、班級
# 3. 插入至少20條學生記錄
# 4. 編寫查詢:
# - 查詢每個班級的平均成績
# - 查詢成績前10名的學生
# - 統計男女學生人數和平均成績
# 5. 將查詢結果導出為CSV文件
練習2:MySQL/PostgreSQL連接
# 1. 安裝並配置本地MySQL或PostgreSQL數據庫
# 2. 使用R連接數據庫
# 3. 創建一個產品庫存表
# 4. 實現以下功能:
# - 批量導入產品數據
# - 查詢低庫存產品(庫存少於10)
# - 更新產品價格
# - 生成庫存報告
# 5. 使用連接池優化數據庫連接
練習3:大數據處理優化
# 1. 創建一個包含100萬行數據的模擬銷售表
# 2. 實現分塊處理函數,計算:
# - 每日銷售總額
# - 每個產品的銷售數量
# - 銷售趨勢分析
# 3. 比較不同查詢方法的性能:
# - 一次性查詢
# - 分塊查詢
# - 使用索引優化查詢
# 4. 實現數據歸檔功能,將舊數據移動到歷史表
📌 今日總結
重點掌握:
- ✓ SQLite數據庫的連接和基本操作
- ✓ 使用DBI包執行SQL查詢和事務
- ✓ 使用dplyr語法操作數據庫(dbplyr)
- ✓ 連接其他類型數據庫(MySQL、PostgreSQL)
- ✓ 大數據處理策略和性能優化
數據庫操作流程:
- 連接數據庫:使用dbConnect()建立連接
- 執行SQL:使用dbExecute()或dbGetQuery()
- 數據操作:插入、查詢、更新、刪除
- 事務管理:BEGIN、COMMIT、ROLLBACK
- 斷開連接:使用dbDisconnect()
性能優化要點:
- 使用索引:為經常查詢的列創建索引
- 批量操作:使用事務進行批量插入
- 分塊處理:大數據集分塊讀取和處理
- 連接池:Web應用使用連接池管理連接
- 定期維護:清理舊數據,執行VACUUM
🎯 明日預告
第十二天:網絡數據採集與API
- 網絡爬蟲基礎(rvest包)
- API數據獲取(httr包)
- 網頁數據解析
- 動態網頁抓取(RSelenium)
學習建議:
- 從SQLite開始練習,不需要安裝額外數據庫軟件
- 學習基本的SQL語法,這對數據庫操作至關重要
- 理解數據庫連接的安全性問題(密碼管理)
- 在實際項目中應用數據庫,積累經驗
- 注意數據庫操作的錯誤處理和資源清理
記得保存今天的代碼腳本為day11.R,明天我們將學習網絡數據採集!🌐