📚 今日目標

  1. 學習連接不同類型數據庫(SQLite、MySQL、PostgreSQL)
  2. 掌握使用DBI和odbc包進行數據庫操作
  3. 學習使用dplyr操作數據庫
  4. 掌握SQL查詢與R代碼的整合
  5. 學習大數據處理策略和優化

🗄️ 第一部分:數據庫基礎

1.1 數據庫連接概述

# R語言連接數據庫的主要方式:
# 1. DBI + 數據庫驅動包(RSQLite, RMySQL, RPostgreSQL等)
# 2. odbc包(統一接口)
# 3. RJDBC(Java數據庫連接)

# 安裝必要的數據庫包
install.packages(c("DBI", "RSQLite", "odbc", "RMySQL", "RPostgres", 
                   "dbplyr", "dplyr", "pool", "keyring"))

# 加載基礎包
library(DBI)

1.2 SQLite數據庫連接

# SQLite是輕量級文件數據庫,適合學習和測試

# 安裝SQLite驅動
install.packages("RSQLite")
library(RSQLite)

# 連接到內存數據庫(臨時)
con_memory <- dbConnect(RSQLite::SQLite(), ":memory:")

# 連接到文件數據庫
con_file <- dbConnect(RSQLite::SQLite(), "mydatabase.db")

# 查看連接信息
dbGetInfo(con_memory)

# 斷開連接
dbDisconnect(con_memory)
dbDisconnect(con_file)

# 使用後自動斷開(推薦)
con <- dbConnect(RSQLite::SQLite(), ":memory:")
# ... 數據庫操作 ...
dbDisconnect(con)

# 或者使用on.exit確保斷開
connect_db <- function(db_path) {
  con <- dbConnect(RSQLite::SQLite(), db_path)
  on.exit(dbDisconnect(con), add = TRUE)
  return(con)
}

🔌 第二部分:數據庫基本操作

2.1 創建和管理表

# 創建數據庫連接
con <- dbConnect(RSQLite::SQLite(), ":memory:")

# 創建表
create_table_sql <- "
CREATE TABLE IF NOT EXISTS employees (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  name TEXT NOT NULL,
  age INTEGER,
  department TEXT,
  salary REAL,
  hire_date DATE,
  is_active BOOLEAN DEFAULT 1
)"

dbExecute(con, create_table_sql)

# 查看所有表
tables <- dbListTables(con)
print(tables)

# 查看錶結構
dbListFields(con, "employees")

# 刪除表
dbExecute(con, "DROP TABLE IF EXISTS employees")

# 重新創建表並添加約束
create_table_with_constraints <- "
CREATE TABLE employees (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  name TEXT NOT NULL,
  age INTEGER CHECK(age >= 18 AND age <= 70),
  department TEXT DEFAULT 'General',
  salary REAL CHECK(salary >= 0),
  hire_date DATE DEFAULT CURRENT_DATE,
  email TEXT UNIQUE,
  is_active BOOLEAN DEFAULT 1
)"

dbExecute(con, create_table_with_constraints)

# 創建索引提高查詢性能
dbExecute(con, "CREATE INDEX idx_department ON employees(department)")
dbExecute(con, "CREATE INDEX idx_hire_date ON employees(hire_date)")

2.2 數據插入

# 插入單行數據
insert_sql <- "
INSERT INTO employees (name, age, department, salary, hire_date, email) 
VALUES ('張三', 28, '技術部', 15000.00, '2022-03-15', 'zhangsan@company.com')
"

dbExecute(con, insert_sql)

# 插入多行數據
employees_data <- data.frame(
  name = c("李四", "王五", "趙六"),
  age = c(32, 25, 40),
  department = c("市場部", "技術部", "財務部"),
  salary = c(12000, 18000, 20000),
  hire_date = c("2021-06-10", "2023-01-20", "2019-11-05"),
  email = c("lisi@company.com", "wangwu@company.com", "zhaoliu@company.com")
)

# 方法1:逐行插入(不推薦,效率低)
for (i in 1:nrow(employees_data)) {
  sql <- sprintf(
    "INSERT INTO employees (name, age, department, salary, hire_date, email) 
     VALUES ('%s', %d, '%s', %.2f, '%s', '%s')",
    employees_data$name[i],
    employees_data$age[i],
    employees_data$department[i],
    employees_data$salary[i],
    employees_data$hire_date[i],
    employees_data$email[i]
  )
  dbExecute(con, sql)
}

# 方法2:使用dbWriteTable(推薦)
# 先清空表
dbExecute(con, "DELETE FROM employees")

# 使用dbWriteTable插入數據
dbWriteTable(con, "employees", employees_data, 
             append = TRUE,  # 追加數據
             row.names = FALSE)  # 不寫入行名

# 方法3:使用參數化查詢(安全,防止SQL注入)
insert_prepared <- dbSendStatement(con, "
  INSERT INTO employees (name, age, department, salary, hire_date, email) 
  VALUES (?, ?, ?, ?, ?, ?)
")

dbBind(insert_prepared, list(
  "孫七", 35, "人事部", 16000, "2020-09-18", "sunqi@company.com"
))

dbClearResult(insert_prepared)

# 查看插入的數據
result <- dbGetQuery(con, "SELECT * FROM employees")
print(result)

2.3 數據查詢

# 查詢所有數據
all_employees <- dbGetQuery(con, "SELECT * FROM employees")
print(all_employees)

# 條件查詢
tech_employees <- dbGetQuery(con, "
  SELECT name, age, salary, hire_date 
  FROM employees 
  WHERE department = '技術部' AND is_active = 1
  ORDER BY salary DESC
")
print(tech_employees)

# 聚合查詢
summary_stats <- dbGetQuery(con, "
  SELECT 
    department,
    COUNT(*) as employee_count,
    AVG(salary) as avg_salary,
    MIN(salary) as min_salary,
    MAX(salary) as max_salary,
    SUM(salary) as total_salary
  FROM employees
  WHERE is_active = 1
  GROUP BY department
  HAVING COUNT(*) > 0
  ORDER BY avg_salary DESC
")
print(summary_stats)

# 連接查詢(創建部門表)
dbExecute(con, "
CREATE TABLE departments (
  dept_id INTEGER PRIMARY KEY,
  dept_name TEXT UNIQUE,
  manager TEXT,
  budget REAL
)")

dbWriteTable(con, "departments", 
  data.frame(
    dept_id = 1:4,
    dept_name = c("技術部", "市場部", "財務部", "人事部"),
    manager = c("張三", "李四", "王五", "趙六"),
    budget = c(500000, 300000, 400000, 200000)
  ), 
  append = TRUE, 
  row.names = FALSE
)

# 內連接查詢
join_query <- "
SELECT 
  e.name,
  e.salary,
  d.dept_name,
  d.manager,
  d.budget
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name
ORDER BY e.salary DESC
"

join_result <- dbGetQuery(con, join_query)
print(join_result)

# 分頁查詢
page_size <- 2
page_number <- 1

paged_query <- sprintf("
SELECT * FROM employees 
ORDER BY id 
LIMIT %d OFFSET %d",
page_size, (page_number - 1) * page_size)

paged_result <- dbGetQuery(con, paged_query)
print(paged_result)

2.4 數據更新和刪除

# 更新數據
# 給技術部所有員工加薪10%
update_sql <- "
UPDATE employees 
SET salary = salary * 1.1 
WHERE department = '技術部' AND is_active = 1
"

rows_updated <- dbExecute(con, update_sql)
cat("更新了", rows_updated, "行數據\n")

# 更新特定員工信息
update_employee <- "
UPDATE employees 
SET salary = ?, department = ? 
WHERE name = ? AND is_active = 1
"

stmt <- dbSendStatement(con, update_employee)
dbBind(stmt, list(22000, "技術部", "王五"))
dbClearResult(stmt)

# 刪除數據
# 刪除已離職員工(假設is_active=0表示離職)
delete_sql <- "DELETE FROM employees WHERE is_active = 0"
rows_deleted <- dbExecute(con, delete_sql)
cat("刪除了", rows_deleted, "行數據\n")

# 軟刪除(將is_active設為0)
soft_delete <- "
UPDATE employees 
SET is_active = 0 
WHERE hire_date < '2020-01-01'
"

rows_soft_deleted <- dbExecute(con, soft_delete)
cat("軟刪除了", rows_soft_deleted, "行數據\n")

# 驗證更新結果
updated_data <- dbGetQuery(con, "
SELECT name, department, salary, is_active 
FROM employees 
ORDER BY department, salary DESC
")
print(updated_data)

🔄 第三部分:使用dplyr操作數據庫

3.1 dbplyr基礎

# dbplyr允許使用dplyr語法操作數據庫
library(dplyr)
library(dbplyr)

# 重新建立連接
con <- dbConnect(RSQLite::SQLite(), ":memory:")

# 寫入示例數據
dbWriteTable(con, "mtcars", mtcars, row.names = FALSE)
dbWriteTable(con, "iris", iris, row.names = FALSE)

# 創建數據庫表對象
mtcars_tbl <- tbl(con, "mtcars")
iris_tbl <- tbl(con, "iris")

# 查看對象類型
class(mtcars_tbl)

# 基本的dplyr操作
result <- mtcars_tbl %>%
  select(mpg, cyl, hp, wt) %>%
  filter(cyl == 6, mpg > 20) %>%
  arrange(desc(mpg))

# 查看生成的SQL
result %>% show_query()

# 執行查詢並獲取結果
collected_result <- result %>% collect()
print(collected_result)

3.2 高級dplyr操作

# 分組和彙總
grouped_stats <- mtcars_tbl %>%
  group_by(cyl) %>%
  summarize(
    count = n(),
    avg_mpg = mean(mpg, na.rm = TRUE),
    avg_hp = mean(hp, na.rm = TRUE),
    max_wt = max(wt, na.rm = TRUE),
    min_wt = min(wt, na.rm = TRUE)
  ) %>%
  arrange(desc(avg_mpg))

print(grouped_stats %>% collect())

# 連接操作
# 創建第二個表
manufacturers <- data.frame(
  model = rownames(mtcars),
  manufacturer = c(rep("Ford", 8), rep("Chevy", 3), 
                   rep("Toyota", 3), rep("Honda", 2),
                   rep("Mazda", 1), rep("Datsun", 1),
                   rep("Merc", 7), rep("Fiat", 1),
                   rep("Porsche", 1), rep("Lotus", 1),
                   rep("Maserati", 1)),
  country = c(rep("USA", 16), rep("Japan", 6), 
              rep("Germany", 7), rep("Italy", 2),
              "UK")
)

dbWriteTable(con, "manufacturers", manufacturers, row.names = FALSE)
manuf_tbl <- tbl(con, "manufacturers")

# 連接查詢
joined_data <- mtcars_tbl %>%
  inner_join(manuf_tbl, by = c("model" = "model")) %>%
  select(model, mpg, cyl, hp, manufacturer, country) %>%
  group_by(country) %>%
  summarize(
    car_count = n(),
    avg_mpg = mean(mpg, na.rm = TRUE),
    avg_hp = mean(hp, na.rm = TRUE)
  ) %>%
  arrange(desc(avg_mpg))

print(joined_data %>% collect())

# 窗口函數
window_result <- mtcars_tbl %>%
  select(model, mpg, cyl, hp) %>%
  group_by(cyl) %>%
  mutate(
    mpg_rank = row_number(desc(mpg)),
    avg_mpg_by_cyl = mean(mpg, na.rm = TRUE),
    mpg_diff_from_avg = mpg - avg_mpg_by_cyl
  ) %>%
  filter(mpg_rank <= 3) %>%
  arrange(cyl, mpg_rank)

print(window_result %>% collect())

3.3 數據庫特有函數

# 使用SQL函數
sql_functions <- mtcars_tbl %>%
  mutate(
    mpg_category = case_when(
      mpg < 15 ~ "低油耗",
      mpg < 25 ~ "中油耗",
      TRUE ~ "高油耗"
    ),
    hp_per_ton = hp / wt,
    year_added = 1970 + row_number() %% 10  # 模擬年份
  ) %>%
  group_by(mpg_category) %>%
  summarize(
    count = n(),
    avg_hp = mean(hp, na.rm = TRUE),
    min_mpg = min(mpg, na.rm = TRUE),
    max_mpg = max(mpg, na.rm = TRUE)
  ) %>%
  ungroup()

print(sql_functions %>% collect())

# 日期函數(創建帶日期的表)
dates_data <- data.frame(
  id = 1:100,
  event_date = seq.Date(from = as.Date("2023-01-01"), 
                        by = "day", 
                        length.out = 100),
  value = rnorm(100, mean = 100, sd = 20)
)

dbWriteTable(con, "events", dates_data, row.names = FALSE)
events_tbl <- tbl(con, "events")

# 使用日期函數
date_analysis <- events_tbl %>%
  mutate(
    year = as.integer(strftime(event_date, '%Y')),
    month = as.integer(strftime(event_date, '%m')),
    week = as.integer(strftime(event_date, '%W')),
    day_of_week = strftime(event_date, '%w')
  ) %>%
  group_by(year, month) %>%
  summarize(
    event_count = n(),
    total_value = sum(value, na.rm = TRUE),
    avg_value = mean(value, na.rm = TRUE)
  ) %>%
  arrange(year, month)

print(date_analysis %>% collect())

🌐 第四部分:連接其他類型數據庫

4.1 MySQL數據庫連接

# 安裝MySQL驅動
install.packages("RMySQL")
library(RMySQL)

# 連接MySQL數據庫(示例代碼,需要實際數據庫)
# con_mysql <- dbConnect(
#   RMySQL::MySQL(),
#   host = "localhost",      # 數據庫主機
#   port = 3306,             # 端口
#   user = "your_username",  # 用户名
#   password = "your_password",  # 密碼
#   dbname = "your_database"    # 數據庫名
# )

# 使用配置文件的更安全方法
# config <- list(
#   host = Sys.getenv("DB_HOST"),
#   port = as.integer(Sys.getenv("DB_PORT")),
#   user = Sys.getenv("DB_USER"),
#   password = Sys.getenv("DB_PASSWORD"),
#   dbname = Sys.getenv("DB_NAME")
# )

# con_mysql <- do.call(dbConnect, c(RMySQL::MySQL(), config))

# 使用keyring包安全存儲密碼
install.packages("keyring")
library(keyring)

# 存儲密碼(第一次運行時設置)
# key_set_with_value("mysql_password", password = "your_password")

# 獲取密碼
# password <- key_get("mysql_password")

# 創建連接
# con_mysql <- dbConnect(
#   RMySQL::MySQL(),
#   host = "localhost",
#   port = 3306,
#   user = "your_username",
#   password = password,
#   dbname = "your_database"
# )

# MySQL特定操作
# 查看數據庫
# dbGetQuery(con_mysql, "SHOW DATABASES")

# 查看錶
# dbGetQuery(con_mysql, "SHOW TABLES")

# 斷開連接
# dbDisconnect(con_mysql)

4.2 PostgreSQL數據庫連接

# 安裝PostgreSQL驅動
install.packages("RPostgres")
library(RPostgres)

# 連接PostgreSQL數據庫(示例代碼)
# con_pg <- dbConnect(
#   RPostgres::Postgres(),
#   host = "localhost",
#   port = 5432,
#   user = "your_username",
#   password = "your_password",
#   dbname = "your_database"
# )

# PostgreSQL特有函數
# 使用PostGIS擴展(空間數據庫)
# spatial_query <- "
# SELECT 
#   name,
#   ST_AsText(geometry) as wkt,
#   ST_Area(geometry) as area
# FROM spatial_table
# WHERE ST_Within(geometry, ST_MakeEnvelope(0, 0, 100, 100, 4326))
# "
# 
# spatial_data <- dbGetQuery(con_pg, spatial_query)

# 斷開連接
# dbDisconnect(con_pg)

4.3 使用odbc統一接口

# odbc包提供了統一的數據庫接口
install.packages("odbc")
library(odbc)

# 查看可用的ODBC驅動
drivers <- odbc::odbcListDrivers()
print(head(drivers))

# 使用ODBC連接(需要配置ODBC數據源)
# 連接SQL Server示例
# con_odbc <- dbConnect(
#   odbc::odbc(),
#   dsn = "Your_DSN_Name",  # ODBC數據源名稱
#   uid = "username",       # 用户名
#   pwd = "password",       # 密碼
#   database = "your_db"    # 數據庫名
# )

# 或者直接使用連接字符串
# con_odbc <- dbConnect(
#   odbc::odbc(),
#   .connection_string = "Driver={ODBC Driver 17 for SQL Server};Server=localhost;Database=your_db;UID=username;PWD=password"
# )

# 執行查詢
# result <- dbGetQuery(con_odbc, "SELECT * FROM your_table")

# 斷開連接
# dbDisconnect(con_odbc)

4.4 數據庫連接池

# 對於Web應用或需要頻繁連接的情況,使用連接池
install.packages("pool")
library(pool)

# 創建連接池
# pool <- dbPool(
#   RMySQL::MySQL(),
#   host = "localhost",
#   port = 3306,
#   username = "your_username",
#   password = "your_password",
#   dbname = "your_database",
#   minSize = 1,    # 最小連接數
#   maxSize = 10,   # 最大連接數
#   idleTimeout = 3600  # 空閒超時時間(秒)
# )

# 從池中獲取連接
# con <- poolCheckout(pool)

# 執行查詢
# result <- dbGetQuery(con, "SELECT * FROM your_table")

# 歸還連接到池
# poolReturn(con)

# 關閉連接池
# poolClose(pool)

# 使用dplyr與連接池
# pool_tbl <- tbl(pool, "your_table")
# result <- pool_tbl %>% filter(...) %>% collect()

🚀 第五部分:大數據處理策略

5.1 分塊處理大數據

# 當數據量太大無法一次性加載時,使用分塊處理
library(DBI)

# 創建大數據集示例
con <- dbConnect(RSQLite::SQLite(), ":memory:")

# 創建大數據表
set.seed(123)
big_data <- data.frame(
  id = 1:1000000,
  value1 = rnorm(1000000),
  value2 = runif(1000000, 0, 100),
  category = sample(letters[1:10], 1000000, replace = TRUE),
  timestamp = seq.POSIXt(
    from = as.POSIXct("2023-01-01"),
    by = "sec",
    length.out = 1000000
  )
)

# 寫入數據庫
system.time({
  dbWriteTable(con, "big_table", big_data, row.names = FALSE)
})

# 方法1:分塊查詢
process_in_chunks <- function(con, chunk_size = 100000) {
  # 獲取總行數
  total_rows <- dbGetQuery(con, "SELECT COUNT(*) as n FROM big_table")$n
  
  # 計算塊數
  chunks <- ceiling(total_rows / chunk_size)
  
  results <- list()
  
  for (i in 1:chunks) {
    offset <- (i - 1) * chunk_size
    
    # 分塊查詢
    chunk_query <- sprintf("
      SELECT * FROM big_table 
      LIMIT %d OFFSET %d",
      chunk_size, offset
    )
    
    chunk_data <- dbGetQuery(con, chunk_query)
    
    # 處理當前塊
    chunk_result <- chunk_data %>%
      group_by(category) %>%
      summarize(
        count = n(),
        avg_value1 = mean(value1),
        avg_value2 = mean(value2)
      )
    
    results[[i]] <- chunk_result
    
    cat(sprintf("處理進度: %.1f%%\n", i/chunks * 100))
  }
  
  # 合併結果
  final_result <- bind_rows(results) %>%
    group_by(category) %>%
    summarize(
      total_count = sum(count),
      avg_value1 = weighted.mean(avg_value1, count),
      avg_value2 = weighted.mean(avg_value2, count)
    )
  
  return(final_result)
}

# 執行分塊處理
chunked_result <- process_in_chunks(con, chunk_size = 50000)
print(chunked_result)

5.2 使用索引優化查詢

# 為大數據表創建索引
cat("創建索引前查詢:\n")
system.time({
  unindexed_query <- dbGetQuery(con, "
    SELECT category, AVG(value1), AVG(value2)
    FROM big_table
    WHERE category IN ('a', 'b', 'c')
    GROUP BY category
  ")
})

# 創建索引
cat("\n創建索引...\n")
dbExecute(con, "CREATE INDEX idx_category ON big_table(category)")
dbExecute(con, "CREATE INDEX idx_timestamp ON big_table(timestamp)")

cat("\n創建索引後查詢:\n")
system.time({
  indexed_query <- dbGetQuery(con, "
    SELECT category, AVG(value1), AVG(value2)
    FROM big_table
    WHERE category IN ('a', 'b', 'c')
    GROUP BY category
  ")
})

# 複合索引
dbExecute(con, "CREATE INDEX idx_category_timestamp ON big_table(category, timestamp)")

# 查看索引信息
index_info <- dbGetQuery(con, "
  SELECT name, tbl_name, sql 
  FROM sqlite_master 
  WHERE type = 'index'
")
print(index_info)

5.3 批量操作優化

# 批量插入優化
cat("\n=== 批量插入性能比較 ===\n")

# 準備測試數據
test_data <- data.frame(
  id = 1:10000,
  x = rnorm(10000),
  y = rnorm(10000),
  z = sample(letters[1:5], 10000, replace = TRUE)
)

# 創建測試表
dbExecute(con, "CREATE TABLE test_insert (id INTEGER, x REAL, y REAL, z TEXT)")

# 方法1:逐行插入(最慢)
cat("方法1:逐行插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
  for (i in 1:100) {  # 只測試100行
    sql <- sprintf("INSERT INTO test_insert VALUES (%d, %f, %f, '%s')",
                   test_data$id[i], test_data$x[i], test_data$y[i], test_data$z[i])
    dbExecute(con, sql)
  }
})

# 方法2:參數化批量插入
cat("\n方法2:參數化批量插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
  # 準備SQL語句
  stmt <- dbSendStatement(con, "
    INSERT INTO test_insert (id, x, y, z) 
    VALUES (?, ?, ?, ?)
  ")
  
  # 批量綁定參數
  for (i in 1:1000) {  # 測試1000行
    dbBind(stmt, list(
      test_data$id[i],
      test_data$x[i],
      test_data$y[i],
      test_data$z[i]
    ))
  }
  
  dbClearResult(stmt)
})

# 方法3:dbWriteTable(最快)
cat("\n方法3:dbWriteTable\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
  dbWriteTable(con, "test_insert", test_data[1:5000, ], 
               append = TRUE, row.names = FALSE)
})

# 方法4:事務批量插入
cat("\n方法4:事務批量插入\n")
dbExecute(con, "DELETE FROM test_insert")
system.time({
  # 開始事務
  dbExecute(con, "BEGIN TRANSACTION")
  
  stmt <- dbSendStatement(con, "
    INSERT INTO test_insert (id, x, y, z) 
    VALUES (?, ?, ?, ?)
  ")
  
  for (i in 1:1000) {
    dbBind(stmt, list(
      test_data$id[i],
      test_data$x[i],
      test_data$y[i],
      test_data$z[i]
    ))
  }
  
  dbClearResult(stmt)
  
  # 提交事務
  dbExecute(con, "COMMIT")
})

5.4 數據庫導出和備份

# 導出數據到文件
# 導出為CSV
query_result <- dbGetQuery(con, "SELECT * FROM big_table LIMIT 10000")
write.csv(query_result, "exported_data.csv", row.names = FALSE)

# 導出為RDS(二進制格式,更快)
saveRDS(query_result, "exported_data.rds")

# 導出整個表(使用dbWriteTable反向)
# dbWriteTable(con, "backup_table", query_result, overwrite = TRUE)

# 數據庫備份(SQLite特定)
db_backup <- dbConnect(RSQLite::SQLite(), "backup.db")
dbExecute(db_backup, "ATTACH DATABASE ':memory:' AS main")

# 複製所有表
tables <- dbListTables(con)
for (table in tables) {
  dbExecute(db_backup, sprintf("CREATE TABLE %s AS SELECT * FROM main.%s", table, table))
}

dbDisconnect(db_backup)

# 使用SQLite的備份命令
# system("sqlite3 mydatabase.db .dump > backup.sql")

🏭 第六部分:實戰案例

案例1:電商數據分析系統

# 創建電商數據庫系統
library(DBI)
library(RSQLite)
library(dplyr)
library(dbplyr)

# 創建數據庫連接
con <- dbConnect(RSQLite::SQLite(), "ecommerce.db")

# 創建表結構
create_tables <- function(con) {
  # 用户表
  dbExecute(con, "
    CREATE TABLE IF NOT EXISTS users (
      user_id INTEGER PRIMARY KEY AUTOINCREMENT,
      username TEXT UNIQUE NOT NULL,
      email TEXT UNIQUE NOT NULL,
      registration_date DATE DEFAULT CURRENT_DATE,
      country TEXT,
      is_active BOOLEAN DEFAULT 1
    )
  ")
  
  # 產品表
  dbExecute(con, "
    CREATE TABLE IF NOT EXISTS products (
      product_id INTEGER PRIMARY KEY AUTOINCREMENT,
      product_name TEXT NOT NULL,
      category TEXT NOT NULL,
      price REAL CHECK(price > 0),
      stock_quantity INTEGER DEFAULT 0,
      created_date DATE DEFAULT CURRENT_DATE
    )
  ")
  
  # 訂單表
  dbExecute(con, "
    CREATE TABLE IF NOT EXISTS orders (
      order_id INTEGER PRIMARY KEY AUTOINCREMENT,
      user_id INTEGER,
      order_date DATE DEFAULT CURRENT_DATE,
      total_amount REAL,
      status TEXT DEFAULT 'pending',
      FOREIGN KEY (user_id) REFERENCES users(user_id)
    )
  ")
  
  # 訂單詳情表
  dbExecute(con, "
    CREATE TABLE IF NOT EXISTS order_items (
      order_item_id INTEGER PRIMARY KEY AUTOINCREMENT,
      order_id INTEGER,
      product_id INTEGER,
      quantity INTEGER CHECK(quantity > 0),
      unit_price REAL,
      subtotal REAL GENERATED ALWAYS AS (quantity * unit_price) VIRTUAL,
      FOREIGN KEY (order_id) REFERENCES orders(order_id),
      FOREIGN KEY (product_id) REFERENCES products(product_id)
    )
  ")
  
  # 創建索引
  dbExecute(con, "CREATE INDEX idx_users_country ON users(country)")
  dbExecute(con, "CREATE INDEX idx_products_category ON products(category)")
  dbExecute(con, "CREATE INDEX idx_orders_user_date ON orders(user_id, order_date)")
  dbExecute(con, "CREATE INDEX idx_order_items_order ON order_items(order_id)")
}

# 初始化表
create_tables(con)

# 插入示例數據
insert_sample_data <- function(con) {
  # 插入用户數據
  users <- data.frame(
    username = paste0("user", 1:100),
    email = paste0("user", 1:100, "@example.com"),
    country = sample(c("中國", "美國", "英國", "日本", "德國"), 100, replace = TRUE),
    registration_date = sample(
      seq.Date(as.Date("2022-01-01"), as.Date("2023-12-31"), by = "day"),
      100, replace = TRUE
    )
  )
  
  dbWriteTable(con, "users", users, append = TRUE, row.names = FALSE)
  
  # 插入產品數據
  categories <- c("電子產品", "家居用品", "服裝", "食品", "圖書")
  products <- data.frame(
    product_name = paste0("產品", 1:50),
    category = sample(categories, 50, replace = TRUE),
    price = round(runif(50, 10, 1000), 2),
    stock_quantity = sample(0:100, 50, replace = TRUE)
  )
  
  dbWriteTable(con, "products", products, append = TRUE, row.names = FALSE)
  
  # 插入訂單數據
  for (i in 1:200) {
    user_id <- sample(1:100, 1)
    order_date <- sample(
      seq.Date(as.Date("2023-01-01"), as.Date("2023-12-31"), by = "day"),
      1
    )
    
    # 創建訂單
    order_id <- dbGetQuery(con, sprintf("
      INSERT INTO orders (user_id, order_date, total_amount) 
      VALUES (%d, '%s', 0) 
      RETURNING order_id",
      user_id, order_date
    ))$order_id
    
    # 添加訂單項
    n_items <- sample(1:5, 1)
    total_amount <- 0
    
    for (j in 1:n_items) {
      product_id <- sample(1:50, 1)
      quantity <- sample(1:3, 1)
      
      # 獲取產品價格
      price <- dbGetQuery(con, sprintf(
        "SELECT price FROM products WHERE product_id = %d", product_id
      ))$price
      
      subtotal <- quantity * price
      total_amount <- total_amount + subtotal
      
      dbExecute(con, sprintf("
        INSERT INTO order_items (order_id, product_id, quantity, unit_price)
        VALUES (%d, %d, %d, %.2f)",
        order_id, product_id, quantity, price
      ))
    }
    
    # 更新訂單總金額
    dbExecute(con, sprintf("
      UPDATE orders SET total_amount = %.2f WHERE order_id = %d",
      total_amount, order_id
    ))
  }
}

# 插入數據
insert_sample_data(con)

# 數據分析函數
analyze_ecommerce <- function(con) {
  # 使用dplyr進行復雜分析
  users_tbl <- tbl(con, "users")
  orders_tbl <- tbl(con, "orders")
  products_tbl <- tbl(con, "products")
  order_items_tbl <- tbl(con, "order_items")
  
  cat("=== 電商數據分析報告 ===\n\n")
  
  # 1. 用户分析
  cat("1. 用户分析:\n")
  user_analysis <- users_tbl %>%
    group_by(country) %>%
    summarize(
      user_count = n(),
      active_users = sum(is_active, na.rm = TRUE)
    ) %>%
    arrange(desc(user_count)) %>%
    collect()
  
  print(user_analysis)
  
  # 2. 銷售分析
  cat("\n2. 銷售分析:\n")
  sales_analysis <- orders_tbl %>%
    inner_join(users_tbl, by = "user_id") %>%
    group_by(country, strftime('%Y-%m', order_date)) %>%
    summarize(
      order_count = n(),
      total_revenue = sum(total_amount, na.rm = TRUE),
      avg_order_value = mean(total_amount, na.rm = TRUE)
    ) %>%
    arrange(desc(total_revenue)) %>%
    collect()
  
  print(head(sales_analysis, 10))
  
  # 3. 產品分析
  cat("\n3. 產品分析:\n")
  product_analysis <- order_items_tbl %>%
    inner_join(products_tbl, by = "product_id") %>%
    inner_join(orders_tbl, by = "order_id") %>%
    group_by(category) %>%
    summarize(
      units_sold = sum(quantity, na.rm = TRUE),
      revenue = sum(quantity * unit_price, na.rm = TRUE),
      unique_products = n_distinct(product_id)
    ) %>%
    arrange(desc(revenue)) %>%
    collect()
  
  print(product_analysis)
  
  # 4. 用户價值分析
  cat("\n4. 用户價值分析:\n")
  customer_value <- orders_tbl %>%
    inner_join(users_tbl, by = "user_id") %>%
    group_by(user_id, username, country) %>%
    summarize(
      total_orders = n(),
      total_spent = sum(total_amount, na.rm = TRUE),
      avg_order_value = mean(total_amount, na.rm = TRUE),
      first_order = min(order_date, na.rm = TRUE),
      last_order = max(order_date, na.rm = TRUE)
    ) %>%
    mutate(
      days_since_last_order = as.numeric(
        difftime(Sys.Date(), last_order, units = "days")
      )
    ) %>%
    arrange(desc(total_spent)) %>%
    collect()
  
  print(head(customer_value, 10))
  
  # 5. 庫存分析
  cat("\n5. 庫存分析:\n")
  inventory_analysis <- products_tbl %>%
    left_join(
      order_items_tbl %>%
        group_by(product_id) %>%
        summarize(total_sold = sum(quantity, na.rm = TRUE)),
      by = "product_id"
    ) %>%
    mutate(
      total_sold = coalesce(total_sold, 0),
      stock_ratio = stock_quantity / (stock_quantity + total_sold),
      status = case_when(
        stock_quantity == 0 ~ "缺貨",
        stock_ratio < 0.2 ~ "低庫存",
        TRUE ~ "庫存充足"
      )
    ) %>%
    group_by(category, status) %>%
    summarize(
      product_count = n(),
      avg_price = mean(price, na.rm = TRUE)
    ) %>%
    arrange(category, status) %>%
    collect()
  
  print(inventory_analysis)
  
  return(list(
    user_analysis = user_analysis,
    sales_analysis = sales_analysis,
    product_analysis = product_analysis,
    customer_value = customer_value,
    inventory_analysis = inventory_analysis
  ))
}

# 執行分析
analysis_results <- analyze_ecommerce(con)

# 保存分析結果
saveRDS(analysis_results, "ecommerce_analysis.rds")

# 關閉連接
dbDisconnect(con)

# 重新連接查看數據庫文件大小
file_info <- file.info("ecommerce.db")
cat(sprintf("\n數據庫文件大小: %.2f MB\n", file_info$size / 1024 / 1024))

案例2:實時數據監控系統

# 實時數據監控和報警系統
library(DBI)
library(RSQLite)
library(dplyr)

# 創建監控數據庫
con <- dbConnect(RSQLite::SQLite(), "monitoring.db")

# 創建監控表
dbExecute(con, "
CREATE TABLE IF NOT EXISTS system_metrics (
  metric_id INTEGER PRIMARY KEY AUTOINCREMENT,
  server_name TEXT NOT NULL,
  metric_name TEXT NOT NULL,
  metric_value REAL NOT NULL,
  timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
  severity TEXT CHECK(severity IN ('正常', '警告', '嚴重')),
  is_acknowledged BOOLEAN DEFAULT 0
)
")

# 創建索引
dbExecute(con, "CREATE INDEX idx_metrics_timestamp ON system_metrics(timestamp)")
dbExecute(con, "CREATE INDEX idx_metrics_severity ON system_metrics(severity, is_acknowledged)")

# 模擬實時數據生成
generate_metrics <- function(con, n = 100) {
  servers <- c("web_server_1", "web_server_2", "db_server", "cache_server")
  metrics <- c("cpu_usage", "memory_usage", "disk_usage", "network_traffic", "response_time")
  
  for (i in 1:n) {
    server <- sample(servers, 1)
    metric <- sample(metrics, 1)
    
    # 根據指標類型生成不同的值
    if (metric == "cpu_usage") {
      value <- runif(1, 10, 90) + rnorm(1, 0, 5)
    } else if (metric == "memory_usage") {
      value <- runif(1, 40, 95) + rnorm(1, 0, 3)
    } else if (metric == "disk_usage") {
      value <- runif(1, 30, 85)
    } else if (metric == "network_traffic") {
      value <- runif(1, 100, 1000)
    } else {
      value <- runif(1, 50, 500)
    }
    
    # 確定嚴重程度
    if (metric %in% c("cpu_usage", "memory_usage") && value > 85) {
      severity <- "嚴重"
    } else if (metric %in% c("cpu_usage", "memory_usage") && value > 75) {
      severity <- "警告"
    } else if (metric == "response_time" && value > 300) {
      severity <- "嚴重"
    } else if (metric == "response_time" && value > 200) {
      severity <- "警告"
    } else {
      severity <- "正常"
    }
    
    # 插入數據
    dbExecute(con, sprintf("
      INSERT INTO system_metrics (server_name, metric_name, metric_value, severity)
      VALUES ('%s', '%s', %.2f, '%s')",
      server, metric, value, severity
    ))
    
    # 模擬實時延遲
    Sys.sleep(0.1)
  }
}

# 啓動數據生成器(在實際應用中可能在一個單獨的線程中)
# generate_metrics(con, 50)

# 監控查詢函數
monitor_system <- function(con) {
  cat("\n=== 系統監控面板 ===\n")
  cat("時間:", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), "\n\n")
  
  # 1. 當前告警
  current_alerts <- dbGetQuery(con, "
    SELECT 
      server_name,
      metric_name,
      metric_value,
      severity,
      timestamp
    FROM system_metrics
    WHERE severity IN ('警告', '嚴重') 
      AND is_acknowledged = 0
      AND timestamp > datetime('now', '-5 minutes')
    ORDER BY severity DESC, timestamp DESC
  ")
  
  if (nrow(current_alerts) > 0) {
    cat("當前告警:\n")
    print(current_alerts)
  } else {
    cat("當前告警: 無\n")
  }
  
  # 2. 服務器狀態概覽
  server_status <- dbGetQuery(con, "
    SELECT 
      server_name,
      COUNT(*) as metric_count,
      SUM(CASE WHEN severity = '嚴重' THEN 1 ELSE 0 END) as critical_count,
      SUM(CASE WHEN severity = '警告' THEN 1 ELSE 0 END) as warning_count,
      MAX(timestamp) as last_update
    FROM system_metrics
    WHERE timestamp > datetime('now', '-10 minutes')
    GROUP BY server_name
    ORDER BY critical_count DESC, warning_count DESC
  ")
  
  cat("\n服務器狀態概覽 (最近10分鐘):\n")
  print(server_status)
  
  # 3. 指標趨勢
  metric_trend <- dbGetQuery(con, "
    SELECT 
      metric_name,
      AVG(metric_value) as avg_value,
      MIN(metric_value) as min_value,
      MAX(metric_value) as max_value,
      COUNT(*) as readings
    FROM system_metrics
    WHERE timestamp > datetime('now', '-1 hour')
    GROUP BY metric_name
    ORDER BY metric_name
  ")
  
  cat("\n指標趨勢 (最近1小時):\n")
  print(metric_trend)
  
  # 4. 性能分析
  performance_analysis <- dbGetQuery(con, "
    WITH hourly_stats AS (
      SELECT 
        strftime('%Y-%m-%d %H:00:00', timestamp) as hour,
        metric_name,
        AVG(metric_value) as avg_value
      FROM system_metrics
      WHERE timestamp > datetime('now', '-24 hours')
      GROUP BY strftime('%Y-%m-%d %H:00:00', timestamp), metric_name
    )
    SELECT 
      metric_name,
      MIN(avg_value) as min_hourly_avg,
      MAX(avg_value) as max_hourly_avg,
      AVG(avg_value) as overall_avg
    FROM hourly_stats
    GROUP BY metric_name
    ORDER BY metric_name
  ")
  
  cat("\n性能分析 (最近24小時):\n")
  print(performance_analysis)
  
  return(list(
    current_alerts = current_alerts,
    server_status = server_status,
    metric_trend = metric_trend,
    performance_analysis = performance_analysis
  ))
}

# 模擬監控循環
simulate_monitoring <- function(con, iterations = 5) {
  for (i in 1:iterations) {
    cat("\n\n=== 監控循環", i, "===\n")
    
    # 生成新數據
    generate_metrics(con, 20)
    
    # 運行監控
    monitor_system(con)
    
    # 確認一些告警(模擬)
    if (i %% 2 == 0) {
      dbExecute(con, "
        UPDATE system_metrics 
        SET is_acknowledged = 1 
        WHERE severity = '警告' 
          AND is_acknowledged = 0
          AND timestamp < datetime('now', '-1 minute')
      ")
    }
    
    # 等待一段時間
    Sys.sleep(2)
  }
}

# 運行模擬監控
# simulate_monitoring(con, 3)

# 清理函數
cleanup_old_data <- function(con, days_to_keep = 7) {
  deleted_rows <- dbExecute(con, sprintf("
    DELETE FROM system_metrics 
    WHERE timestamp < datetime('now', '-%d days')",
    days_to_keep
  ))
  
  cat(sprintf("清理了 %d 天前的數據,刪除了 %d 行\n", 
              days_to_keep, deleted_rows))
  
  # 執行VACUUM釋放空間
  dbExecute(con, "VACUUM")
}

# 清理舊數據
cleanup_old_data(con, 1)

# 關閉連接
dbDisconnect(con)

💻 今日練習

練習1:SQLite數據庫操作

# 1. 創建一個SQLite內存數據庫
# 2. 創建學生表,包含字段:學號、姓名、年齡、性別、成績、班級
# 3. 插入至少20條學生記錄
# 4. 編寫查詢:
#    - 查詢每個班級的平均成績
#    - 查詢成績前10名的學生
#    - 統計男女學生人數和平均成績
# 5. 將查詢結果導出為CSV文件

練習2:MySQL/PostgreSQL連接

# 1. 安裝並配置本地MySQL或PostgreSQL數據庫
# 2. 使用R連接數據庫
# 3. 創建一個產品庫存表
# 4. 實現以下功能:
#    - 批量導入產品數據
#    - 查詢低庫存產品(庫存少於10)
#    - 更新產品價格
#    - 生成庫存報告
# 5. 使用連接池優化數據庫連接

練習3:大數據處理優化

# 1. 創建一個包含100萬行數據的模擬銷售表
# 2. 實現分塊處理函數,計算:
#    - 每日銷售總額
#    - 每個產品的銷售數量
#    - 銷售趨勢分析
# 3. 比較不同查詢方法的性能:
#    - 一次性查詢
#    - 分塊查詢
#    - 使用索引優化查詢
# 4. 實現數據歸檔功能,將舊數據移動到歷史表

📌 今日總結

重點掌握:

  • ✓ SQLite數據庫的連接和基本操作
  • ✓ 使用DBI包執行SQL查詢和事務
  • ✓ 使用dplyr語法操作數據庫(dbplyr)
  • ✓ 連接其他類型數據庫(MySQL、PostgreSQL)
  • ✓ 大數據處理策略和性能優化

數據庫操作流程:

  1. 連接數據庫:使用dbConnect()建立連接
  2. 執行SQL:使用dbExecute()或dbGetQuery()
  3. 數據操作:插入、查詢、更新、刪除
  4. 事務管理:BEGIN、COMMIT、ROLLBACK
  5. 斷開連接:使用dbDisconnect()

性能優化要點:

  1. 使用索引:為經常查詢的列創建索引
  2. 批量操作:使用事務進行批量插入
  3. 分塊處理:大數據集分塊讀取和處理
  4. 連接池:Web應用使用連接池管理連接
  5. 定期維護:清理舊數據,執行VACUUM

🎯 明日預告

第十二天:網絡數據採集與API

  • 網絡爬蟲基礎(rvest包)
  • API數據獲取(httr包)
  • 網頁數據解析
  • 動態網頁抓取(RSelenium)

學習建議

  1. 從SQLite開始練習,不需要安裝額外數據庫軟件
  2. 學習基本的SQL語法,這對數據庫操作至關重要
  3. 理解數據庫連接的安全性問題(密碼管理)
  4. 在實際項目中應用數據庫,積累經驗
  5. 注意數據庫操作的錯誤處理和資源清理

記得保存今天的代碼腳本為day11.R,明天我們將學習網絡數據採集!🌐