From fac6e285bd08de3ef9b7cb4285793162b23f42e1 Mon Sep 17 00:00:00 2001 From: joeyak Date: Thu, 21 Mar 2019 16:20:50 -0400 Subject: [PATCH] Add stream stats, minor changes in main, and adding ctrl+c handling Stream statistics will now print when enabled in settings.json. There is messages in, messages out, and total time running. The main function was changed so the server setups for the main and rtmp are in separate functions, and both are started with their own goroutine. OS Interrupt catching was added, and if stream stats is true, the stats will be printed. --- connection.go | 4 +-- main.go | 93 +++++++++++++++++++++++++++++++++++---------------- settings.go | 10 ++++-- stats.go | 35 +++++++++++++++++++ 4 files changed, 108 insertions(+), 34 deletions(-) create mode 100644 stats.go diff --git a/connection.go b/connection.go index 9e98375..82ac88c 100644 --- a/connection.go +++ b/connection.go @@ -16,14 +16,14 @@ type chatConnection struct { func (cc *chatConnection) ReadData(data interface{}) error { defer cc.mutex.Unlock() cc.mutex.Lock() - + stats.msgInInc() return cc.ReadJSON(data) } func (cc *chatConnection) WriteData(data interface{}) error { defer cc.mutex.Unlock() cc.mutex.Lock() - + stats.msgOutInc() return cc.WriteJSON(data) } diff --git a/main.go b/main.go index aa30d09..42c260f 100644 --- a/main.go +++ b/main.go @@ -5,27 +5,72 @@ import ( "fmt" "net/http" "os" + "os/signal" "github.com/nareix/joy4/format" "github.com/nareix/joy4/format/rtmp" ) var ( - addr = flag.String("l", ":8089", "host:port of the MovieNight") - sKey = flag.String("k", "", "Stream key, to protect your stream") + addr string + sKey string + stats streamStats ) func init() { format.RegisterAll() + + flag.StringVar(&addr, "l", ":8089", "host:port of the MovieNight") + flag.StringVar(&sKey, "k", "", "Stream key, to protect your stream") + + stats = newStreamStats() } func main() { flag.Parse() - server := &rtmp.Server{} - server.HandlePlay = handlePlay - server.HandlePublish = handlePublish + exit := make(chan bool) + go handleInterrupt(exit) + // Load emotes before starting server. + var err error + if chat, err = newChatRoom(); err != nil { + fmt.Println(err) + os.Exit(1) + } + + if addr != "" { + addr = settings.ListenAddress + } + + // A stream key was passed on the command line. Use it, but don't save + // it over the stream key in the settings.json file. + if sKey != "" { + settings.SetTempKey(sKey) + } + + fmt.Println("Stream key: ", settings.GetStreamKey()) + fmt.Println("Admin password: ", settings.AdminPassword) + fmt.Println("Listen and serve ", addr) + + go startServer() + go startRmptServer() + + <-exit +} + +func startRmptServer() { + server := &rtmp.Server{ + HandlePlay: handlePlay, + HandlePublish: handlePublish, + } + err := server.ListenAndServe() + if err != nil { + fmt.Printf("Error trying to start server: %v\n", err) + } +} + +func startServer() { // Chat websocket http.HandleFunc("/ws", wsHandler) http.HandleFunc("/static/js/", wsStaticFiles) @@ -40,29 +85,19 @@ func main() { http.HandleFunc("/", handleDefault) - address := settings.ListenAddress - if addr != nil && len(*addr) != 0 { - address = *addr + err := http.ListenAndServe(addr, nil) + if err != nil { + fmt.Printf("Error trying to start rmtp server: %v\n", err) } - - // Load emotes before starting server. - var err error - if chat, err = newChatRoom(); err != nil { - fmt.Println(err) - os.Exit(1) - } - - // A stream key was passed on the command line. Use it, but don't save - // it over the stream key in the settings.json file. - if sKey != nil && len(*sKey) != 0 { - settings.SetTempKey(*sKey) - } - - fmt.Println("Stream key: ", settings.GetStreamKey()) - fmt.Println("Admin password: ", settings.AdminPassword) - - go http.ListenAndServe(address, nil) - fmt.Println("Listen and serve ", *addr) - - server.ListenAndServe() +} + +func handleInterrupt(exit chan bool) { + ch := make(chan os.Signal) + signal.Notify(ch, os.Interrupt) + <-ch + fmt.Println("Closing server") + if settings.StreamStats { + stats.Print() + } + exit <- true } diff --git a/settings.go b/settings.go index be18cf8..7fa4626 100644 --- a/settings.go +++ b/settings.go @@ -15,14 +15,18 @@ var settings *Settings var settingsMtx sync.Mutex type Settings struct { - filename string - cmdLineKey string // stream key from the command line + // Non-Saved settings + filename string + cmdLineKey string // stream key from the command line + + // Saved settings + StreamStats bool MaxMessageCount int TitleLength int // maximum length of the title that can be set with the /playing AdminPassword string - Bans []BanInfo StreamKey string ListenAddress string + Bans []BanInfo } type BanInfo struct { diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..a5b6bbc --- /dev/null +++ b/stats.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +type streamStats struct { + messageIn int + messageOut int + start time.Time + mutex sync.Mutex +} + +func newStreamStats() streamStats { + return streamStats{start: time.Now()} +} + +func (s *streamStats) msgInInc() { + s.mutex.Lock() + s.messageIn++ + s.mutex.Unlock() +} +func (s *streamStats) msgOutInc() { + s.mutex.Lock() + s.messageOut++ + s.mutex.Unlock() +} + +func (s *streamStats) Print() { + fmt.Printf("Messages In: %d\n", s.messageIn) + fmt.Printf("Messages Out: %d\n", s.messageOut) + fmt.Printf("Total Time: %s\n", time.Since(s.start)) +}