SQLite是一个轻量级的嵌入式关系型数据库,无需独立的服务器进程,数据库就是一个文件。Python内置了SQLite支持,通过sqlite3模块即可使用。
import sqlite3
import os
# 连接数据库(如果不存在则创建)
def create_database():
# 连接数据库
conn = sqlite3.connect('example.db')
# 创建游标对象
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建订单表
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
order_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
product_name TEXT NOT NULL,
amount REAL NOT NULL,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# 创建索引以提高查询性能
cursor.execute('CREATE INDEX IF NOT EXISTS idx_username ON users(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_id ON orders(user_id)')
# 提交更改
conn.commit()
print("数据库和表创建成功!")
return conn
# 使用上下文管理器连接数据库
def connect_with_context():
with sqlite3.connect('example.db') as conn:
conn.execute("PRAGMA foreign_keys = ON") # 启用外键约束
return conn
def insert_data(conn):
cursor = conn.cursor()
# 插入单条数据
cursor.execute(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
('alice', 'alice@example.com', 25)
)
# 插入多条数据
users_data = [
('bob', 'bob@example.com', 30),
('charlie', 'charlie@example.com', 35),
('diana', 'diana@example.com', 28)
]
cursor.executemany(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
users_data
)
# 获取最后插入的ID
last_id = cursor.lastrowid
print(f"最后插入的用户ID: {last_id}")
# 插入订单数据
orders_data = [
(1, '笔记本电脑', 899.99),
(1, '鼠标', 25.50),
(2, '键盘', 75.00),
(3, '显示器', 299.99),
(4, '耳机', 89.99)
]
cursor.executemany(
"INSERT INTO orders (user_id, product_name, amount) VALUES (?, ?, ?)",
orders_data
)
conn.commit()
print("数据插入成功!")
def query_data(conn):
cursor = conn.cursor()
# 查询所有用户
print("=== 所有用户 ===")
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
for user in users:
print(f"ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}, 年龄: {user[3]}, 注册时间: {user[4]}")
# 查询特定用户
print("\n=== 查询年龄大于28的用户 ===")
cursor.execute(
"SELECT username, email, age FROM users WHERE age > ?",
(28,)
)
for row in cursor.fetchall():
print(f"用户名: {row[0]}, 邮箱: {row[1]}, 年龄: {row[2]}")
# 使用字典游标(Python 3.8+)
print("\n=== 使用字典游标查询 ===")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username LIKE ?", ('%a%',))
rows = cursor.fetchall()
for row in rows:
print(f"ID: {row['id']}, 用户名: {row['username']}")
def update_and_delete(conn):
cursor = conn.cursor()
# 更新数据
cursor.execute(
"UPDATE users SET age = ? WHERE username = ?",
(26, 'alice')
)
print(f"更新了 {cursor.rowcount} 条记录")
# 删除数据
cursor.execute("DELETE FROM users WHERE username = ?", ('charlie',))
print(f"删除了 {cursor.rowcount} 条记录")
conn.commit()
def transaction_example(conn):
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
cursor = conn.cursor()
# 一系列操作
cursor.execute(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
('eva', 'eva@example.com', 32)
)
user_id = cursor.lastrowid
cursor.execute(
"INSERT INTO orders (user_id, product_name, amount) VALUES (?, ?, ?)",
(user_id, '平板电脑', 499.99)
)
# 模拟错误(如果发生则回滚)
# raise ValueError("模拟错误")
# 提交事务
conn.commit()
print("事务提交成功!")
except Exception as e:
# 回滚事务
conn.rollback()
print(f"事务回滚,原因: {e}")
import sqlite3
from datetime import datetime
# 自定义类型适配器
def adapt_datetime(dt):
return dt.isoformat()
def convert_datetime(text):
return datetime.fromisoformat(text.decode())
# 注册适配器
sqlite3.register_adapter(datetime, adapt_datetime)
sqlite3.register_converter("timestamp", convert_datetime)
# 自定义行工厂
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def advanced_features():
# 连接时指定自定义类型
conn = sqlite3.connect(
'advanced.db',
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
conn.row_factory = dict_factory
cursor = conn.cursor()
# 创建包含时间戳的表
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY,
message TEXT,
created_at TIMESTAMP
)
''')
# 插入带时间戳的数据
now = datetime.now()
cursor.execute(
"INSERT INTO logs (message, created_at) VALUES (?, ?)",
("测试日志", now)
)
conn.commit()
# 查询并自动转换时间戳
cursor.execute("SELECT * FROM logs")
for row in cursor.fetchall():
print(f"日志: {row['message']}, 时间: {row['created_at']}, 类型: {type(row['created_at'])}")
conn.close()
class DatabaseManager:
def __init__(self, db_name):
self.db_name = db_name
def __enter__(self):
self.conn = sqlite3.connect(self.db_name)
self.conn.row_factory = sqlite3.Row
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()
return False # 不抑制异常
# 使用示例
def use_database_manager():
with DatabaseManager('managed.db') as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL
)
''')
products = [
('产品A', 19.99),
('产品B', 29.99),
('产品C', 39.99)
]
cursor.executemany(
"INSERT INTO products (name, price) VALUES (?, ?)",
products
)
cursor.execute("SELECT * FROM products")
for row in cursor.fetchall():
print(dict(row))
import sqlite3
from datetime import datetime
import csv
import json
class StudentManagementSystem:
def __init__(self, db_name='students.db'):
self.db_name = db_name
self.init_database()
def init_database(self):
"""初始化数据库和表"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
# 创建学生表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
student_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
gender TEXT CHECK(gender IN ('男', '女')),
birth_date TEXT,
major TEXT,
enrollment_date TEXT,
gpa REAL DEFAULT 0.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建课程表
cursor.execute('''
CREATE TABLE IF NOT EXISTS courses (
course_id INTEGER PRIMARY KEY AUTOINCREMENT,
course_name TEXT NOT NULL,
credit INTEGER DEFAULT 1,
teacher TEXT,
semester TEXT
)
''')
# 创建成绩表(关联学生和课程)
cursor.execute('''
CREATE TABLE IF NOT EXISTS grades (
grade_id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id INTEGER,
course_id INTEGER,
score REAL CHECK(score >= 0 AND score <= 100),
grade_point REAL,
grade_date TEXT,
FOREIGN KEY (student_id) REFERENCES students(student_id) ON DELETE CASCADE,
FOREIGN KEY (course_id) REFERENCES courses(course_id) ON DELETE CASCADE,
UNIQUE(student_id, course_id)
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_student_name ON students(name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_student_major ON students(major)')
conn.commit()
def add_student(self, name, gender, birth_date, major, enrollment_date):
"""添加学生"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO students (name, gender, birth_date, major, enrollment_date)
VALUES (?, ?, ?, ?, ?)
''', (name, gender, birth_date, major, enrollment_date))
conn.commit()
print(f"学生 {name} 添加成功!ID: {cursor.lastrowid}")
def add_course(self, course_name, credit, teacher, semester):
"""添加课程"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO courses (course_name, credit, teacher, semester)
VALUES (?, ?, ?, ?)
''', (course_name, credit, teacher, semester))
conn.commit()
print(f"课程 {course_name} 添加成功!")
def add_grade(self, student_id, course_id, score):
"""添加成绩并计算绩点"""
# 计算绩点(标准4.0制)
if score >= 90:
grade_point = 4.0
elif score >= 85:
grade_point = 3.7
elif score >= 82:
grade_point = 3.3
elif score >= 78:
grade_point = 3.0
elif score >= 75:
grade_point = 2.7
elif score >= 72:
grade_point = 2.3
elif score >= 68:
grade_point = 2.0
elif score >= 64:
grade_point = 1.5
elif score >= 60:
grade_point = 1.0
else:
grade_point = 0.0
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO grades (student_id, course_id, score, grade_point, grade_date)
VALUES (?, ?, ?, ?, date('now'))
''', (student_id, course_id, score, grade_point))
# 更新学生平均绩点
self._update_student_gpa(conn, student_id)
conn.commit()
print(f"成绩添加成功!绩点: {grade_point}")
def _update_student_gpa(self, conn, student_id):
"""更新学生平均绩点"""
cursor = conn.cursor()
cursor.execute('''
SELECT AVG(g.grade_point)
FROM grades g
JOIN courses c ON g.course_id = c.course_id
WHERE g.student_id = ?
''', (student_id,))
result = cursor.fetchone()
gpa = result[0] if result[0] is not None else 0.0
cursor.execute('''
UPDATE students SET gpa = ? WHERE student_id = ?
''', (round(gpa, 2), student_id))
def query_students(self, major=None, min_gpa=None):
"""查询学生信息"""
with sqlite3.connect(self.db_name) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = "SELECT * FROM students WHERE 1=1"
params = []
if major:
query += " AND major = ?"
params.append(major)
if min_gpa is not None:
query += " AND gpa >= ?"
params.append(min_gpa)
query += " ORDER BY gpa DESC"
cursor.execute(query, params)
students = []
for row in cursor.fetchall():
students.append(dict(row))
return students
def get_student_report(self, student_id):
"""获取学生成绩报告"""
with sqlite3.connect(self.db_name) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取学生基本信息
cursor.execute('''
SELECT s.*,
COUNT(g.grade_id) as course_count,
AVG(g.score) as avg_score
FROM students s
LEFT JOIN grades g ON s.student_id = g.student_id
WHERE s.student_id = ?
GROUP BY s.student_id
''', (student_id,))
student_info = cursor.fetchone()
if not student_info:
return None
# 获取详细成绩
cursor.execute('''
SELECT c.course_name, c.credit, g.score, g.grade_point
FROM grades g
JOIN courses c ON g.course_id = c.course_id
WHERE g.student_id = ?
ORDER BY c.course_name
''', (student_id,))
grades = [dict(row) for row in cursor.fetchall()]
return {
'student_info': dict(student_info),
'grades': grades
}
def export_to_csv(self, filename='students_export.csv'):
"""导出学生数据到CSV"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT student_id, name, gender, birth_date, major, enrollment_date, gpa
FROM students
ORDER BY student_id
''')
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入标题行
writer.writerow(['学号', '姓名', '性别', '出生日期', '专业', '入学日期', '平均绩点'])
# 写入数据
writer.writerows(cursor.fetchall())
print(f"数据已导出到 {filename}")
def import_from_csv(self, filename):
"""从CSV导入学生数据"""
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
with open(filename, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过标题行
for row in reader:
try:
cursor.execute('''
INSERT INTO students (name, gender, birth_date, major, enrollment_date, gpa)
VALUES (?, ?, ?, ?, ?, ?)
''', (row[1], row[2], row[3], row[4], row[5], float(row[6]) if row[6] else 0.0))
except Exception as e:
print(f"导入失败: {row}, 错误: {e}")
conn.commit()
print(f"从 {filename} 导入数据完成")
def backup_database(self, backup_name=None):
"""备份数据库"""
if backup_name is None:
backup_name = f"{self.db_name}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
with sqlite3.connect(self.db_name) as source:
with sqlite3.connect(backup_name) as target:
source.backup(target)
print(f"数据库已备份到 {backup_name}")
return backup_name
# 使用示例
def demo_student_system():
# 创建系统实例
sms = StudentManagementSystem('university.db')
# 添加测试数据
print("=== 添加学生 ===")
students = [
('张三', '男', '2000-05-15', '计算机科学', '2020-09-01'),
('李四', '女', '2001-03-22', '软件工程', '2020-09-01'),
('王五', '男', '1999-11-30', '计算机科学', '2019-09-01'),
('赵六', '女', '2000-08-14', '人工智能', '2020-09-01')
]
for student in students:
sms.add_student(*student)
print("\n=== 添加课程 ===")
courses = [
('数据结构', 4, '王教授', '2023-秋季'),
('数据库原理', 3, '李教授', '2023-秋季'),
('算法设计', 4, '张教授', '2023-春季'),
('机器学习', 3, '陈教授', '2023-春季')
]
for course in courses:
sms.add_course(*course)
print("\n=== 添加成绩 ===")
# 假设学生ID和课程ID从1开始
grades = [
(1, 1, 85), # 张三 数据结构 85分
(1, 2, 92), # 张三 数据库原理 92分
(2, 1, 78), # 李四 数据结构 78分
(2, 3, 88), # 李四 算法设计 88分
(3, 2, 95), # 王五 数据库原理 95分
(4, 4, 91), # 赵六 机器学习 91分
]
for grade in grades:
sms.add_grade(*grade)
print("\n=== 查询计算机科学专业的学生 ===")
cs_students = sms.query_students(major='计算机科学')
for student in cs_students:
print(f"{student['name']} - {student['major']} - GPA: {student['gpa']}")
print("\n=== 获取学生成绩报告 ===")
report = sms.get_student_report(1) # 获取张三的成绩报告
if report:
print(f"学生: {report['student_info']['name']}")
print(f"专业: {report['student_info']['major']}")
print(f"平均绩点: {report['student_info']['gpa']}")
print("课程成绩:")
for grade in report['grades']:
print(f" {grade['course_name']}: {grade['score']}分, 绩点: {grade['grade_point']}")
print("\n=== 导出数据到CSV ===")
sms.export_to_csv('students.csv')
print("\n=== 备份数据库 ===")
sms.backup_database()
return sms
if __name__ == "__main__":
# 运行演示
sms = demo_student_system()
# 额外功能:查询绩点高于3.0的学生
print("\n=== 绩点高于3.0的学生 ===")
high_gpa_students = sms.query_students(min_gpa=3.0)
for student in high_gpa_students:
print(f"{student['name']}: {student['gpa']}")
import sqlite3
import time
from contextlib import contextmanager
class OptimizedDatabase:
def __init__(self, db_name):
self.db_name = db_name
@contextmanager
def get_connection(self):
"""获取数据库连接的上下文管理器"""
conn = sqlite3.connect(self.db_name)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def batch_insert(self, data, batch_size=1000):
"""批量插入数据优化"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 开始事务
cursor.execute("BEGIN TRANSACTION")
try:
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
cursor.executemany(
"INSERT INTO large_table (name, value) VALUES (?, ?)",
batch
)
cursor.execute("COMMIT")
print(f"批量插入了 {len(data)} 条记录")
except Exception as e:
cursor.execute("ROLLBACK")
print(f"批量插入失败: {e}")
def create_indexes(self):
"""创建合适的索引"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 分析查询模式并创建索引
indexes = [
"CREATE INDEX IF NOT EXISTS idx_name ON large_table(name)",
"CREATE INDEX IF NOT EXISTS idx_value ON large_table(value)",
"CREATE INDEX IF NOT EXISTS idx_name_value ON large_table(name, value)"
]
for index_sql in indexes:
cursor.execute(index_sql)
def vacuum_database(self):
"""压缩数据库文件,回收空间"""
with self.get_connection() as conn:
conn.execute("VACUUM")
print("数据库压缩完成")
def analyze_performance(self):
"""分析查询性能"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 启用性能分析
cursor.execute("PRAGMA journal_mode = WAL") # Write-Ahead Logging
cursor.execute("PRAGMA synchronous = NORMAL")
cursor.execute("PRAGMA cache_size = -2000") # 2MB缓存
# 执行计划分析
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM large_table WHERE name = ?", ('test',))
plan = cursor.fetchall()
print("查询执行计划:")
for row in plan:
print(row)
def use_window_functions(self):
"""使用窗口函数(SQLite 3.25+)"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建示例数据
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
salesperson TEXT,
region TEXT,
amount REAL,
sale_date TEXT
)
''')
# 使用窗口函数计算排名
cursor.execute('''
SELECT
salesperson,
region,
amount,
sale_date,
RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region,
SUM(amount) OVER (PARTITION BY region) as region_total
FROM sales
ORDER BY region, rank_in_region
''')
results = cursor.fetchall()
for row in results:
print(dict(row))
def performance_comparison():
"""性能对比测试"""
db = OptimizedDatabase('performance_test.db')
# 准备测试数据
test_data = [(f'name_{i}', i * 1.5) for i in range(10000)]
# 测试批量插入性能
print("测试批量插入性能...")
# 单条插入
start_time = time.time()
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS test_table (name TEXT, value REAL)")
for item in test_data[:1000]: # 只测试1000条
cursor.execute("INSERT INTO test_table (name, value) VALUES (?, ?)", item)
print(f"单条插入1000条记录耗时: {time.time() - start_time:.2f}秒")
# 批量插入
start_time = time.time()
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS test_table_batch")
cursor.execute("CREATE TABLE test_table_batch (name TEXT, value REAL)")
cursor.executemany("INSERT INTO test_table_batch (name, value) VALUES (?, ?)", test_data[:1000])
print(f"批量插入1000条记录耗时: {time.time() - start_time:.2f}秒")
# 运行性能测试
if __name__ == "__main__":
print("=== 性能优化示例 ===")
performance_comparison()
这个全面的指南涵盖了从基础到高级的SQLite使用,以及一个完整的学生管理系统案例。你可以根据需要调整和扩展这些代码来满足具体需求。