Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
252 views
in Technique[技术] by (71.8m points)

python - How does Fastapi manage concurrent calls to the database with sqlalchemy

I want to understand how the database is still consistent after concurrent calls to the endpoint which update a counter. Sorry for the names of the classes/methods I copied most of the code from the docs of the tools.

The server code is as follow

import uvicorn
from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker

app = FastAPI()

engine = create_engine('postgresql://postgres:test@localhost/test')
Base = declarative_base()


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    counter = Column(Integer)

    def __repr__(self):
        return "counter='%s'" % (self.counter)


Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

# create
session = Session()
ed_user = User(name='ed', counter=0)
session.add(ed_user)
session.commit()
session.close()


async def updates():
    # update
    for i in range(50):
        session = Session()
        our_user = session.query(User).filter_by(name='ed').first()
        c = our_user.counter
        our_user.counter = c + 1
        session.add(our_user)
        session.commit()
        session.close()


@app.get("/update")
async def update():
    await updates()
    return {"Hello": "World"}


@app.get("/counter")
async def counter():
    session = Session()
    our_user = session.query(User).filter_by(name='ed').first()
    print(our_user)


if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

I start it and then run the following script:

from threading import Thread
import requests


def info():
    r = requests.get('http://localhost:8000/counter')


def thread():
    threads = []

    for _ in range(100):
        t = Thread(target=fetch())
        t.start()
        threads.append(t)

    for t in threads:
        t.join()


def fetch():
    requests.get("http://localhost:8000/update")


thread()
info()

The result of the counter at the end is always correct, even when using non-asnyc endpoints and updates() method. I tried doing the same without running Fastapi and only use Threads which run the updates() function, but in this case the counter is inconsistent.

So for some reason Fastapi does the magic, but what exactly is happening? And Shouldn't the database provide some kind of isolation feature, too?

EDIT

It was actually a bug in my code, where I didn't create the threads in a right way. I passed the result of fetch() instead of the function itself. Now when I run it the rightway it behaves as expected and the non-async version has an inconsistent counter.

question from:https://stackoverflow.com/questions/65866235/how-does-fastapi-manage-concurrent-calls-to-the-database-with-sqlalchemy

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

There was actually a bug in my example code. It should be

def thread():
    threads = []

    for _ in range(100):
        t = Thread(target=fetch)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

I passed the target parameter the result of the function not the function itself. That is why it didn't behave as expected and async and non-async endpoints showed the same results. Now it behaves as expected and the non-async endpoint shows inconsistent results, due to concurrent access on same resource.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...