Cleanup incoming stream handler

Cleaned up the incoming RTMP handler to close connections on error.
The code aquireing the RTMP channel was also rewritten to be more
idomatic and easier to follow.
This commit is contained in:
Zorchenhimer 2021-01-10 12:04:16 -05:00
parent b3b9255ea0
commit 238d80658b
1 changed files with 14 additions and 12 deletions

View File

@ -364,37 +364,39 @@ func handlePublish(conn *rtmp.Conn) {
if len(urlParts) > 2 { if len(urlParts) > 2 {
common.LogErrorln("Extra garbage after stream key") common.LogErrorln("Extra garbage after stream key")
l.Unlock() l.Unlock()
conn.Close()
return return
} }
if len(urlParts) != 2 { if len(urlParts) != 2 {
common.LogErrorln("Missing stream key") common.LogErrorln("Missing stream key")
l.Unlock() l.Unlock()
conn.Close()
return return
} }
if urlParts[1] != settings.GetStreamKey() { if urlParts[1] != settings.GetStreamKey() {
common.LogErrorln("Stream key is incorrect. Denying stream.") common.LogErrorln("Stream key is incorrect. Denying stream.")
l.Unlock() l.Unlock()
conn.Close()
return //If key not match, deny stream return //If key not match, deny stream
} }
streamPath := urlParts[0] streamPath := urlParts[0]
ch := channels[streamPath] ch, exists := channels[streamPath]
if ch == nil { if exists {
ch = &Channel{} common.LogErrorln("Stream already running. Denying publish.")
ch.que = pubsub.NewQueue() conn.Close()
ch.que.WriteHeader(streams) l.Unlock()
channels[streamPath] = ch
} else {
ch = nil
}
l.Unlock()
if ch == nil {
common.LogErrorln("Unable to start stream, channel is nil.")
return return
} }
ch = &Channel{}
ch.que = pubsub.NewQueue()
ch.que.WriteHeader(streams)
channels[streamPath] = ch
l.Unlock()
stats.startStream() stats.startStream()
common.LogInfoln("Stream started") common.LogInfoln("Stream started")