一次关于 sqlalchemy 的排错 - qb

明月清风精进不止 2019-12-06

前言

公司的 C 端用户中心,入口流量极大。
有两个接口,都是修改的用户的某个信息,在高并发下,出现了一点问题。
A 用户有两个属性,名字和年龄,接口 a 修改了名字,接口 b 修改了职业,但是一旦并发执行,总会有一个修改失败,这个失败的频率还很高。
之前并发量不高的时候,这个是没有问题的,并发量一起来,这个问题就是很严重的了。
初步判定,是锁的问题了。

问题复现

为了确认这个问题,首先打印所有的 sql,发现了问题。
并发量不高的时候(我依次请求),两个 sql 都乖乖的只 update 自己修改的部分,但是一旦有并发(我开启两个线程执行),有一个 sqlupdate 了两个数据,即名字和年龄。
模拟代码如下:

1234567891011121314151617181920212223242526272829303132333435363738394041424344
from flask import Flask, request, jsonifyfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerBase = declarative_base()engine = create_engine("mysql+pymysql://test::3306/test" , echo=True, pool_size=20)Session = sessionmaker(bind=engine)app = Flask(__name__)class (Base):    __tablename__ = "tb_user"    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(64))    age = Column(Integer)def ():    session = Session()    name = request.json.get("key")    id = request.json.get("id")    user = session.query(TbUser).filter_by(id=id).first()    print(f"name origin {user.name}, new {name}")    user.name = name    session.add(session.merge(user))    session.commit()    return ""@app.route("/user/age", methods=["PUT"])def modify_age():    session = Session()    age = request.json.get("key")    id = request.json.get("id")    user = session.query(TbUser).filter_by(id=id).first()    print(f"age origin {user.age}, new {age}")    user.age = age    session.add(session.merge(user))    session.commit()    return ""

将这段代码保存为 app.py 然后执行 export FLASK_APP=app.py && export DATABASE_URL="" && flask run -h 0.0.0.0 -p 5000 运行起来
并发测试代码:

123456789101112131415161718192021
import aiohttpimport asyncioimport randomfrom uuid import uuid4async def run(path, key):	params = {"id": 1, "key": key}	async with aiohttp.ClientSession() as session:		async with session.put(f"http://127.0.0.1:5000/user/{path}",json=params) as resp:			print(resp.status)			print(await resp.text())loop = asyncio.get_event_loop()tasks = [	loop.create_task(run("name", uuid4().hex)),	loop.create_task(run("name", uuid4().hex)),	loop.create_task( 大专栏  一次关于 sqlalchemy 的排错 - qbrun("age", random.randint(0, 100000))),	loop.create_task(run("age", random.randint(0, 100000)))]loop.run_until_complete(asyncio.wait(tasks))loop.close()

我执行了多次这个脚本,但是没有发现有一条 update 语句是更新多个参数的,这令人非常奇怪。
一次关于 sqlalchemy 的排错 - qb
把我觉得是数据库锁的原因否决了。

问题浮出水面

我又细查了一下代码。
由于公司的 C 端用户量极大,于是架构上我们采纳了分库分表。
于是,这要求我们在 model 的建立上,需要极其抽象。
细剥这一部分的代码,然后发现了一个问题。
获取 user 的代码里面,单独获取了一次 session, 而保存 user 又获取了一次 session。基本确定问题出现在这里了。
这是,头脑也有了代码的基本运行模型了。
如下图所示:
一次关于 sqlalchemy 的排错 - qb
进程 1 修改用户姓名为 lisi
进程 2 修改用户年龄为 20
进程 1 和进程 2 同时执行的时候,先拿取一次 session 获取 user 信息,这时候数据库的用户信息是 {"name": "zhangsan", "age": 15}
进程 2 先于进程 1 修改完用户数据,获取一次 session,然后将数据更新, 更新的时候,用户数据不变,更新完,用户 age=20
进程 1 开始执行保存用户数据,获取一次 session,此时用户数据因为金进程 2 的原因,已经发生了改变。此时的用户数据为 {"name": "zhangsan", "age": 20}.这时候保存用户数据,则改变的是 {"name": "lisi", "age": 15}.
就好像进程 2 没有执行一样。
为了验证我得想法。
模拟代码:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
from flask import Flask, request, jsonifyfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerBase = declarative_base()engine = create_engine("mysql+pymysql://test::3306/test" , echo=True, pool_size=20)Session = sessionmaker(bind=engine)app = Flask(__name__)class TbUser(Base):    __tablename__ = "tb_user"    id = Column(Integer, primary_key=True, autoincrement=True)    name = Column(String(64))    age = Column(Integer)@app.route("/user/name", methods=["PUT"])def modify_name():    session = Session()    name = request.json.get("key")    id = request.json.get("id")    user = session.query(TbUser).filter_by(id=id).first()    session.close()    print(f"name origin {user.name}, new {name}")    user.name = name        session = Session()    session.add(session.merge(user))    session.commit()    return ""@app.route("/user/age", methods=["PUT"])def modify_age():    session = Session()    age = request.json.get("key")    id = request.json.get("id")    user = session.query(TbUser).filter_by(id=id).first()    session.close()    print(f"age origin {user.age}, new {age}")    user.age = age        session = Session()    session.add(session.merge(user))    session.commit()    return ""

执行一次。
一次关于 sqlalchemy 的排错 - qb
可以看到,这次 nameage 都被更新了。

问题得解

找到了原因,就好办多了。
和同事敲定整个流程不能重建和销毁 session ,需要 session 的地方弄全局变量或函数传递的方式,问题得解。

相关推荐