You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
require"http"require"./kick_reason"modulePlaceOS::Api# use a manager so the we can free the request context objectsclassChatManagerLog= ::Log.for(self)definitialize(@ice_config)# grab the existing `PlaceOS::Driver::Subscriptions` instancesubscriber=PlaceOS::Api::WebSocket::Session.subscriptions.@subscribersubscriber.channel("internal/chat/forward_signal")do |_,payload|
signal=SessionSignal.from_json(payload)perform_forwarded_signal(signal)endsubscriber.channel("internal/chat/kick_user")do |_,payload|
user_id,reason=Tuple(String,String).from_json(payload)perform_kick_user(user_id,reason)endsubscriber.channel("internal/chat/transfer_user")do |_,payload|
user_id,session_id,connection_details=Tuple(String,String?,String?).from_json(payload)perform_transfer(user_id,session_id,connection_details)endspawn{ping_sockets}end# =================================# Websocket Ping / ensure connected# =================================protecteddefping_socketsloopdosleep30# ping the sockets to ensure connectivitybeginconnections=sockets.dupid="SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}"connections.eachdo |websocket,session|
perform_ping(id,websocket,session)rescueExceptionendrescueendendendprotecteddefperform_ping(id,websocket,session)send_signal(websocket,SessionSignal.new(id: id,type: :ping,session_id: session.session_id,user_id: "SERVER::DATA",to_user: session.user_id,value: "{}"))rescueend# =================================# Various helpers# =================================protecteddefredis_publish(path : String,payload)
::PlaceOS::Driver::RedisStorage.with_redis &.publish(path,payload.to_json)enddefcreate_new_call(signal) : CallDetailscalls[signal.session_id]=CallDetails.new(signal.session_id)enddefsend_signal(websocket,signal)Log.trace{"Sending signal #{signal.type} to #{signal.session_id}"}websocket.send(signal.to_json)rescue# we'll ignore websocket send failures, the user will be cleaned upenddefmember_list(session_id : String) : Array(String)CallDetails::SESSIONS.user_list(session_id)end# =================================# Connection management# =================================# authority_id => config stringprivategetterice_config : Hash(String,String)privategettercalls={}ofString=>CallDetailsprivategettersockets={}ofHTTP::WebSocket=>SessionSignalprivategetteruser_lookup={}ofString=>HTTP::WebSocketdefhandle_session(websocket,request_id,user_id,auth_id)websocket.on_messagedo |message|
Log.context.set(request_id: request_id,user_id: user_id)Log.trace{{frame: "TEXT",text: message}}signal=SessionSignal.from_json(message)signal.place_user_id=user_idsignal.place_auth_id=auth_idcasesignal.typewhen.join?on_join_signal(websocket,signal,auth_id)when.offer?,.answer?,.candidate?,.leave?forward_signal(websocket,signal)elseLog.warn{"user #{user_id} sent unsupported signal #{signal.type}"}endifcall=calls[signal.session_id]?
call.updated_at=Time.utcendendwebsocket.on_closedo |_|
Log.trace{{request_id: request_id,frame: "CLOSE"}}ifconnect_details=sockets.deletewebsocketuser_lookup.deleteconnect_details.user_idremove_from_call(connect_details)# signals routed to the system id that represents the application managing the chatredis_publish("placeos/#{auth_id}/chat/user/left",{connect_details.session_id=>connect_details.user_id,})endendenddefremove_from_call(connect_details : SessionSignal)session_id=connect_details.session_idifcall=calls[session_id]?
# inform the call peers that the user is gonecall.removeconnect_details.user_id# cleanup empty sessionscalls.delete(session_id)ifcall.peers.empty?end# forward the leave signal to all the members of the callconnect_details.type=:leaveCallDetails::SESSIONS.user_list(session_id).eachdo |user_id|
connect_details.to_user=user_idredis_publish("placeos/internal/chat/forward_signal",connect_details)endenddefon_join_signal(websocket,signal,auth_id) : Nilcall=calls[signal.session_id]? || create_new_call(signal)# check the current user can join the call (prevent spoofing)# TODO:: look into this now the service is clusteredifexisting_peer_ws=call.peers[signal.user_id]?
ifexisting_user=sockets[existing_peer_ws]?
ifexisting_user.place_user_id != signal.place_user_idLog.warn{"possible hacking attempt by #{signal.place_user_id}, attempting to spoof #{existing_user.place_user_id}"}websocket.closereturnendendend# check if the user is already in another call and remove themifexisting_user=sockets[websocket]?
remove_from_callexisting_userend# add the user to the new calluser_lookup[signal.user_id]=websocketcall.add(signal.user_id,websocket)sockets[websocket]=signal# Return RTC configuration detailssend_signal(websocket,SessionSignal.new(id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",type: :join,session_id: signal.session_id,user_id: "SERVER::DATA",to_user: signal.user_id,value: ice_config[auth_id]))# Send participant listsend_signal(websocket,SessionSignal.new(id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",type: :participant_list,session_id: signal.session_id,user_id: "SERVER::DATA",to_user: signal.user_id,value: call.peers.keys.to_json))# signals routed to the system id that represents the application managing the chatredis_publish("placeos/#{signal.place_auth_id}/chat/user/joined",{signal.session_id=>signal.user_id,})end# ================================# Forward signal# ================================defforward_signal(websocket,signal) : Nilifcall=calls[signal.session_id]?
# check the current user is in the callifexisting_peer_ws=call.peers[signal.user_id]?
ifexisting_user=sockets[existing_peer_ws]?
ifexisting_user.place_user_id != signal.place_user_idLog.warn{"possible hacking attempt by #{signal.place_user_id}, attempting to spoof #{existing_user.place_user_id}"}websocket.closereturnendendelseLog.warn{"possible hacking attempt by #{signal.place_user_id}, attempting to signal a call they are not in"}websocket.closereturnendredis_publish("placeos/internal/chat/forward_signal", signal) end end # all security checks have occured at this point, forward the message # if the user is connected to this server protected def perform_forwarded_signal(signal) if call = calls[signal.session_id]? if to_user = call.peers[signal.to_user]? send_signal(to_user, signal) end end end # ================================ # Kick User / User exited # ================================ # the user has exited chat def end_call(user_id : String, auth_id : String) # find the users websocket redis_publish("placeos/internal/chat/kick_user", {user_id, "callended"}) # signal the user exited redis_publish("placeos/#{auth_id}/chat/user/exited", {user_id: user_id,})enddefkick_user(auth_id : String,user_id : String,session_id : String,details : KickReason)# find the users websocketredis_publish("placeos/internal/chat/kick_user",{user_id,details.reason})# TODO:: need the auth id here!redis_publish("placeos/#{auth_id}/chat/user/exited",{user_id: user_id,})enddefperform_kick_user(user_id,reason)# find the users websocketwebsocket=user_lookup[user_id]?
returnunlesswebsocketconnect_details=sockets[websocket]?
returnunlessconnect_details# send the kicked user a leave signalsend_signal(websocket,SessionSignal.new(id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",type: :leave,session_id: connect_details.session_id,user_id: user_id,to_user: user_id,value: KickReason.new(reason).to_json))websocket.closeend# ================================# Transfer user# ================================enumTransferResultNoConnectionNoSession# not currently usedSignalSentend# transfer a user to a new chat roomdeftransfer(user_id : String,session_id : String? =nil,payload : String? =nil) : TransferResultcurrent_session_id=CallDetails::SESSIONS.lookup_session(user_id)returnTransferResult::NoConnectionunlesscurrent_session_idredis_publish("placeos/internal/chat/transfer_user",{user_id,session_id,payload})TransferResult::SignalSentenddefperform_transfer(user_id : String,session_id : String? =nil,payload : String? =nil)# find the users websocketwebsocket=user_lookup[user_id]?
returnunlesswebsocketconnect_details=sockets[websocket]?
returnunlessconnect_details# remove the user from the current callremove_from_call(connect_details)ifsession_id && session_id != connect_details.session_id# send the user a Transfer signalsend_signal(websocket,SessionSignal.new(id: "SIGNAL::#{Time.utc.to_unix_ms}+#{Random::Secure.hex(6)}",type: :transfer,session_id: session_id || connect_details.session_id,user_id: "SERVER::DATA",to_user: user_id,value: payload))endendend
The text was updated successfully, but these errors were encountered:
Transfer user
================================
rest-api/src/placeos-rest-api/controllers/webrtc/chat_manager.cr
Line 254 in 9feef41
The text was updated successfully, but these errors were encountered: