简介
Peewee 是一个轻量级且功能强大的 Python ORM(对象关系映射)库。它旨在简化数据库操作,支持 SQLite、MySQL、PostgreSQL 等多种数据库。Peewee 语法简洁直观,非常适合中小型项目或快速原型开发。
一、环境准备
安装 Peewee
pip install peewee安装数据库驱动
根据你使用的数据库类型,安装相应的驱动
# MySQL 驱动
pip install pymysql二、数据库连接示例
from peewee import *
#import pymysql
db = MySQLDatabase(
'my_database',
user='root',
password='password',
host='127.0.0.1',
port=3306,
charset='utf8mb4'
)三、定义模型
常见字段类型
定义基础模型
建议定义一个 BaseModel 来统一管理数据库连接,避免在每个模型中重复配置
class BaseModel(Model):
class Meta:
database = db模型示例
from datetime import datetime
class User(BaseModel):
username = CharField(unique=True, max_length=50)
email = CharField(null=True)
created_at = DateTimeField(default=datetime.now)
class Article(BaseModel):
user = ForeignKeyField(User, backref='articles', on_delete='CASCADE')
content = TextField()
is_published = BooleanField(default=True)
created_at = DateTimeField(default=datetime.now)四、数据库操作
连接数据库
在操作数据前,建议先连接数据库并创建表。
# 连接数据库
db.connect()
db.create_tables([User, Article], safe=True) # safe=True 防止表已存在报错增加数据
# 添加用户
user = User.create(username='alice', email='alice@example.com')
# 添加文章
Article.create(user=user, content='Hello Peewee!')查询数据
# 查询所有用户
users = User.select()
for u in users:
print(u.username)
# 条件查询
user_alice = User.select().where(User.username == 'alice').first()
# 多条件查询 (AND)
query = User.select().where((User.username == 'alice') & (User.email.is_null(False)))
# 排序与限制
query = Article.select().order_by(Article.created_at.desc()).limit(5)更新数据
# 修改实例属性后保存
user = User.get(User.username == 'alice')
user.email = 'new_email@example.com'
user.save()删除数据
# 删除一个用户
user_alice = User.select().where(User.username == 'alice').first()
user.delete_instance()五、完整代码示例
from peewee import *
from datetime import datetime
# import pymysql # 使用的是 pymysql 作为驱动,通常不需要显式 import 除非为了特定配置
# 1. 数据库配置
db = MySQLDatabase(
'my_database',
user='root',
password='password',
host='127.0.0.1',
port=3306,
charset='utf8mb4'
)
class BaseModel(Model):
class Meta:
database = db
# 2. 定义模型
class User(BaseModel):
username = CharField(unique=True, max_length=50)
email = CharField(null=True)
created_at = DateTimeField(default=datetime.now)
class Article(BaseModel):
user = ForeignKeyField(User, backref='articles', on_delete='CASCADE')
content = TextField()
is_published = BooleanField(default=True)
created_at = DateTimeField(default=datetime.now)
# 3. 执行逻辑
try:
db.connect()
db.create_tables([User, Article], safe=True) # safe=True 防止表已存在报错
# --- 增 ---
# 使用事务包裹,保证数据一致性
with db.atomic():
user = User.create(username='alice', email='alice@example.com')
Article.create(user=user, content='Hello Peewee!')
# --- 查 ---
users = User.select()
for u in users:
print(f"User: {u.username}, Email: {u.email}")
# 条件查询
user_alice = User.select().where(User.username == 'alice').first()
# --- 改 ---
if user_alice:
user_alice.email = 'new_email@example.com'
user_alice.save()
# --- 删 ---
if user_alice:
user_alice.delete_instance()
print("User deleted successfully.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if not db.is_closed():
db.close()
评论区