Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
429 views
in Technique[技术] by (71.8m points)

python - How to emit from a redis pubsub listener using flask socketio

I am trying to scale a chatting application which consists of interaction between customer service agent and customer.

If we have multiple instances of backend running, the agent or customer socket can connect(join_room()) to any available instance.

For communication between multiple instances, we added redis pubsub. Whenever we want to emit a message to agent or customer, we will publish the message and room_id to redis channel which will tell all instances to emit the message.

Below is the code which we wrote:

app.py

from flask import Flask, jsonify, request
from flask_cors import CORS
from flask_socketio import SocketIO, emit, join_room, send
from multiprocessing import Process
import redis

redis_conn = redis.Redis(charset="utf-8", decode_responses=True)
pubsub = redis_conn.pubsub()


app = Flask(__name__)
app.config["SECRET_KEY"] = Config.SECRET_KEY
CORS(app)
# Initializing Socket

socketio = SocketIO(app, cors_allowed_origins="*", path="/livechat_backend_sock/socket.io/")



@socketio.on("agent_connect_request")
def agent_connect_request(data):
    print("
*** agent_connect_request data: ", data)
    msg = {"input_type": "text", "response_type": "live_connecting",
           "response": "Searching for Agent", "data": data}
    if "user_section" in data.keys():
        emit("user_reply", msg)

    # result = some db operations

    # create a room on agent_email_id as it will always be unique
    room_id = result["agent_email_id"]
    msg["agent_details"] = result["agent_data"]

    # some more db operations

    # tell a process to emit on room_id for all instances
    p = Process(target=subs, args=(room_id,))
    p.start()
    join_room(room_id)

    emit("user_reply", msg) # this happens properly
    subscribe_agent(room_id, user_id)

    # publish a message
    pub(data, user_id, room_id, "user_connections")
    # emit("user_connections", data, room=room_id) => this is what is expected after publish

def subscribe_agent(agent_email_id, channel):
    p = Process(target=subs, args=(channel,))
    p.start()


def subs(room):
    pubsub.subscribe(room)
    for message in pubsub.listen():
        if message.get("type") == "message":
            data = json.loads(message.get("data"))
            msg = data.get("message")
            user_id = data.get("from")
            room_id = data.get("to")
            sock = data.get("sock")
            res = subs_emitter(msg, user_id, room_id, sock)


def unsbs(room):
    pubsub.unsubscribe(room)


def pub(msg, user_id, room, sock):
    data = {
        "message": msg,
        "from": user_id,
        "to": room,
        "sock": sock
    }
    redis_conn.publish(user_id, json.dumps(data))


def subs_emitter(msg, user_id, room_id, sock):
    if sock == "user_connections":
        print('on user_connections')
        # code reaches here but emit is not happening
        socketio.emit("user_connections", msg, room=room_id, broadcast=True) 
    if sock == "livechat_agent":
        socketio.emit("livechat_agent", msg, room=room_id)
    if sock == "livechat_user":
        socketio.emit("livechat_user", msg, room=room_id)
    if sock == "agent_group":
        socketio.emit("livechat_user", msg, room=room_id)
    if sock == "liveuser_disconnect_status":
        socketio.emit("liveuser_disconnect_status", msg, room=room_id)
    if sock == "agent_break":
        socketio.emit("agent_break", msg, room=room_id)
    return True

When we receive the published message, code is not emitting the message to frontend. Where am I going wrong??

question from:https://stackoverflow.com/questions/66057973/how-to-emit-from-a-redis-pubsub-listener-using-flask-socketio

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...