I am trying to scale a chatting application which consists of interaction between customer service agent and customer.
If we have multiple instances of backend running, the agent or customer socket can connect(join_room()) to any available instance.
For communication between multiple instances, we added redis pubsub. Whenever we want to emit a message to agent or customer, we will publish the message and room_id to redis channel which will tell all instances to emit the message.
Below is the code which we wrote:
app.py
from flask import Flask, jsonify, request
from flask_cors import CORS
from flask_socketio import SocketIO, emit, join_room, send
from multiprocessing import Process
import redis
redis_conn = redis.Redis(charset="utf-8", decode_responses=True)
pubsub = redis_conn.pubsub()
app = Flask(__name__)
app.config["SECRET_KEY"] = Config.SECRET_KEY
CORS(app)
# Initializing Socket
socketio = SocketIO(app, cors_allowed_origins="*", path="/livechat_backend_sock/socket.io/")
@socketio.on("agent_connect_request")
def agent_connect_request(data):
print("
*** agent_connect_request data: ", data)
msg = {"input_type": "text", "response_type": "live_connecting",
"response": "Searching for Agent", "data": data}
if "user_section" in data.keys():
emit("user_reply", msg)
# result = some db operations
# create a room on agent_email_id as it will always be unique
room_id = result["agent_email_id"]
msg["agent_details"] = result["agent_data"]
# some more db operations
# tell a process to emit on room_id for all instances
p = Process(target=subs, args=(room_id,))
p.start()
join_room(room_id)
emit("user_reply", msg) # this happens properly
subscribe_agent(room_id, user_id)
# publish a message
pub(data, user_id, room_id, "user_connections")
# emit("user_connections", data, room=room_id) => this is what is expected after publish
def subscribe_agent(agent_email_id, channel):
p = Process(target=subs, args=(channel,))
p.start()
def subs(room):
pubsub.subscribe(room)
for message in pubsub.listen():
if message.get("type") == "message":
data = json.loads(message.get("data"))
msg = data.get("message")
user_id = data.get("from")
room_id = data.get("to")
sock = data.get("sock")
res = subs_emitter(msg, user_id, room_id, sock)
def unsbs(room):
pubsub.unsubscribe(room)
def pub(msg, user_id, room, sock):
data = {
"message": msg,
"from": user_id,
"to": room,
"sock": sock
}
redis_conn.publish(user_id, json.dumps(data))
def subs_emitter(msg, user_id, room_id, sock):
if sock == "user_connections":
print('on user_connections')
# code reaches here but emit is not happening
socketio.emit("user_connections", msg, room=room_id, broadcast=True)
if sock == "livechat_agent":
socketio.emit("livechat_agent", msg, room=room_id)
if sock == "livechat_user":
socketio.emit("livechat_user", msg, room=room_id)
if sock == "agent_group":
socketio.emit("livechat_user", msg, room=room_id)
if sock == "liveuser_disconnect_status":
socketio.emit("liveuser_disconnect_status", msg, room=room_id)
if sock == "agent_break":
socketio.emit("agent_break", msg, room=room_id)
return True
When we receive the published message, code is not emitting the message to frontend. Where am I going wrong??
question from:
https://stackoverflow.com/questions/66057973/how-to-emit-from-a-redis-pubsub-listener-using-flask-socketio