Count the streams using the 'http' sessions instead of the 'IP'.

This commit is contained in:
Zorglube 2021-03-16 22:43:55 +01:00
parent bcd248ffb0
commit afc9f19e36
3 changed files with 45 additions and 36 deletions

View File

@ -3,6 +3,7 @@ package common
// Misc utils // Misc utils
import ( import (
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
@ -41,3 +42,20 @@ func Substr(input string, start int, length int) string {
return string(asRunes[start : start+length]) return string(asRunes[start : start+length])
} }
// Return the value of "Forwarded" or "X-Forwarded-For",
// if "Forwarded" & "X-Forwarded-For" are present then "Forwarded" value is returned.
// Return "" if "Forwarded" and "X-Forwarded-For" are absent.
func ExtractForwarded(r *http.Request) string {
f := r.Header.Get("Forwarded")
if f != "" {
return f
}
xff := r.Header.Get("X-Forwarded-For")
if xff != "" {
return xff
}
return ""
}

View File

@ -104,7 +104,7 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
Conn: conn, Conn: conn,
// If the server is behind a reverse proxy (eg, Nginx), look // If the server is behind a reverse proxy (eg, Nginx), look
// for this header to get the real IP address of the client. // for this header to get the real IP address of the client.
forwardedFor: r.Header.Get("X-Forwarded-For"), forwardedFor: common.ExtractForwarded(r),
} }
go func() { go func() {
@ -409,10 +409,8 @@ func handlePlay(conn *rtmp.Conn) {
} }
func handleLive(w http.ResponseWriter, r *http.Request) { func handleLive(w http.ResponseWriter, r *http.Request) {
uri := strings.Trim(r.URL.Path, "/")
l.RLock() l.RLock()
ch := channels[uri] ch := channels[strings.Trim(r.URL.Path, "/")]
l.RUnlock() l.RUnlock()
if ch != nil { if ch != nil {
@ -426,10 +424,10 @@ func handleLive(w http.ResponseWriter, r *http.Request) {
muxer := flv.NewMuxerWriteFlusher(writeFlusher{httpflusher: flusher, Writer: w}) muxer := flv.NewMuxerWriteFlusher(writeFlusher{httpflusher: flusher, Writer: w})
cursor := ch.que.Latest() cursor := ch.que.Latest()
ip := extractIp(r) session, _ := sstore.Get(r, "moviesession")
stats.addViewer(ip) stats.addViewer(session)
avutil.CopyFile(muxer, cursor) avutil.CopyFile(muxer, cursor)
stats.removeViewer(ip) stats.removeViewer(session)
} else { } else {
// Maybe HTTP_204 is better than HTTP_404 // Maybe HTTP_204 is better than HTTP_404
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
@ -446,16 +444,3 @@ func handleDefault(w http.ResponseWriter, r *http.Request) {
handleIndexTemplate(w, r) handleIndexTemplate(w, r)
} }
} }
func extractIp(r *http.Request) string {
ip := r.Host
f := r.Header.Get("Forwarded")
xff := r.Header.Get("X-Forwarded-For")
if !(xff == "") {
ip = xff
}
if !(f == "") {
ip = f
}
return ip
}

View File

@ -4,6 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/gorilla/sessions"
"github.com/zorchenhimer/MovieNight/common" "github.com/zorchenhimer/MovieNight/common"
) )
@ -15,24 +16,25 @@ type streamStats struct {
mutex sync.Mutex mutex sync.Mutex
streamStart time.Time streamStart time.Time
streamLive bool // True if live streamLive bool // True if live
viewers map[string]string viewers map[*sessions.Session]int
maxViewers int maxViewers int
} }
func (s *streamStats) addViewer(ip string) { func (s *streamStats) addViewer(session *sessions.Session) {
s.mutex.Lock() s.mutex.Lock()
s.viewers[ip] = ip s.viewers[session] = len(s.viewers)
s.updateMaxViewers(len(s.viewers)) size := len(s.viewers)
s.updateMaxViewers(size)
s.mutex.Unlock() s.mutex.Unlock()
common.LogDebugf("Viewer connect from: %s\n", ip) common.LogDebugf("[stats] %d viewer(s) connected\n", size)
} }
func (s *streamStats) removeViewer(ip string) { func (s *streamStats) removeViewer(session *sessions.Session) {
s.mutex.Lock() s.mutex.Lock()
delete(s.viewers, ip) delete(s.viewers, session)
s.mutex.Unlock() s.mutex.Unlock()
common.LogDebugf("Viewer left from: %s\n", ip) common.LogDebugf("[stats] One viewer left the stream\n")
} }
func (s *streamStats) updateMaxViewers(size int) { func (s *streamStats) updateMaxViewers(size int) {
@ -42,11 +44,15 @@ func (s *streamStats) updateMaxViewers(size int) {
} }
func (s *streamStats) resetViewers() { func (s *streamStats) resetViewers() {
s.viewers = make(map[string]string) s.viewers = sessionsMapNew()
}
func sessionsMapNew() map[*sessions.Session]int {
return make(map[*sessions.Session]int)
} }
func newStreamStats() streamStats { func newStreamStats() streamStats {
return streamStats{start: time.Now(), streamLive: false, viewers: make(map[string]string)} return streamStats{start: time.Now(), streamLive: false, viewers: sessionsMapNew()}
} }
func (s *streamStats) msgInInc() { func (s *streamStats) msgInInc() {
@ -80,11 +86,11 @@ func (s *streamStats) Print() {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
common.LogInfof("Messages In: %d\n", s.messageIn) common.LogInfof("[stats] Messages In: %d\n", s.messageIn)
common.LogInfof("Messages Out: %d\n", s.messageOut) common.LogInfof("[stats] Messages Out: %d\n", s.messageOut)
common.LogInfof("Max users in chat: %d\n", s.maxUsers) common.LogInfof("[stats] Max users in chat: %d\n", s.maxUsers)
common.LogInfof("Total Time: %s\n", time.Since(s.start)) common.LogInfof("[stats] Total Time: %s\n", time.Since(s.start))
common.LogInfof("Max Stream Viewer: %d\n", s.maxViewers) common.LogInfof("[stats] Max Stream Viewer: %d\n", s.maxViewers)
} }
func (s *streamStats) startStream() { func (s *streamStats) startStream() {
@ -126,7 +132,7 @@ func (s *streamStats) getMaxViewerCount() int {
return s.maxViewers return s.maxViewers
} }
func (s *streamStats) getViewers() map[string]string { func (s *streamStats) getViewers() map[*sessions.Session]int {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()