明月清风精进不止 2019-12-06
公司的 C 端用户中心,入口流量极大。
有两个接口,都是修改的用户的某个信息,在高并发下,出现了一点问题。
A 用户有两个属性,名字和年龄,接口 a 修改了名字,接口 b 修改了职业,但是一旦并发执行,总会有一个修改失败,这个失败的频率还很高。
之前并发量不高的时候,这个是没有问题的,并发量一起来,这个问题就是很严重的了。
初步判定,是锁的问题了。
为了确认这个问题,首先打印所有的 sql
,发现了问题。
并发量不高的时候(我依次请求),两个 sql
都乖乖的只 update
自己修改的部分,但是一旦有并发(我开启两个线程执行),有一个 sql
就 update
了两个数据,即名字和年龄。
模拟代码如下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344 | from flask import Flask, request, jsonifyfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerBase = declarative_base()engine = create_engine("mysql+pymysql://test::3306/test" , echo=True, pool_size=20)Session = sessionmaker(bind=engine)app = Flask(__name__)class (Base): __tablename__ = "tb_user" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64)) age = Column(Integer)def (): session = Session() name = request.json.get("key") id = request.json.get("id") user = session.query(TbUser).filter_by(id=id).first() print(f"name origin {user.name}, new {name}") user.name = name session.add(session.merge(user)) session.commit() return ""@app.route("/user/age", methods=["PUT"])def modify_age(): session = Session() age = request.json.get("key") id = request.json.get("id") user = session.query(TbUser).filter_by(id=id).first() print(f"age origin {user.age}, new {age}") user.age = age session.add(session.merge(user)) session.commit() return "" |
将这段代码保存为 app.py
然后执行 export FLASK_APP=app.py && export DATABASE_URL="" && flask run -h 0.0.0.0 -p 5000
运行起来
并发测试代码:
123456789101112131415161718192021 | import aiohttpimport asyncioimport randomfrom uuid import uuid4async def run(path, key): params = {"id": 1, "key": key} async with aiohttp.ClientSession() as session: async with session.put(f"http://127.0.0.1:5000/user/{path}",json=params) as resp: print(resp.status) print(await resp.text())loop = asyncio.get_event_loop()tasks = [ loop.create_task(run("name", uuid4().hex)), loop.create_task(run("name", uuid4().hex)), loop.create_task( 大专栏 一次关于 sqlalchemy 的排错 - qbrun("age", random.randint(0, 100000))), loop.create_task(run("age", random.randint(0, 100000)))]loop.run_until_complete(asyncio.wait(tasks))loop.close() |
我执行了多次这个脚本,但是没有发现有一条 update 语句是更新多个参数的,这令人非常奇怪。
把我觉得是数据库锁的原因否决了。
我又细查了一下代码。
由于公司的 C 端用户量极大,于是架构上我们采纳了分库分表。
于是,这要求我们在 model
的建立上,需要极其抽象。
细剥这一部分的代码,然后发现了一个问题。
获取 user
的代码里面,单独获取了一次 session
, 而保存 user
又获取了一次 session
。基本确定问题出现在这里了。
这是,头脑也有了代码的基本运行模型了。
如下图所示:
进程 1 修改用户姓名为 lisi
进程 2 修改用户年龄为 20
进程 1 和进程 2 同时执行的时候,先拿取一次 session
获取 user
信息,这时候数据库的用户信息是 {"name": "zhangsan", "age": 15}
。
进程 2 先于进程 1 修改完用户数据,获取一次 session
,然后将数据更新, 更新的时候,用户数据不变,更新完,用户 age=20
。
进程 1 开始执行保存用户数据,获取一次 session
,此时用户数据因为金进程 2 的原因,已经发生了改变。此时的用户数据为 {"name": "zhangsan", "age": 20}
.这时候保存用户数据,则改变的是 {"name": "lisi", "age": 15}
.
就好像进程 2 没有执行一样。
为了验证我得想法。
模拟代码:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 | from flask import Flask, request, jsonifyfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerBase = declarative_base()engine = create_engine("mysql+pymysql://test::3306/test" , echo=True, pool_size=20)Session = sessionmaker(bind=engine)app = Flask(__name__)class TbUser(Base): __tablename__ = "tb_user" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64)) age = Column(Integer)@app.route("/user/name", methods=["PUT"])def modify_name(): session = Session() name = request.json.get("key") id = request.json.get("id") user = session.query(TbUser).filter_by(id=id).first() session.close() print(f"name origin {user.name}, new {name}") user.name = name session = Session() session.add(session.merge(user)) session.commit() return ""@app.route("/user/age", methods=["PUT"])def modify_age(): session = Session() age = request.json.get("key") id = request.json.get("id") user = session.query(TbUser).filter_by(id=id).first() session.close() print(f"age origin {user.age}, new {age}") user.age = age session = Session() session.add(session.merge(user)) session.commit() return "" |
执行一次。
可以看到,这次 name
和 age
都被更新了。
找到了原因,就好办多了。
和同事敲定整个流程不能重建和销毁 session
,需要 session
的地方弄全局变量或函数传递的方式,问题得解。