From 9bcbad6a0761f4c971e5ccfb9c9e47511e0efa28 Mon Sep 17 00:00:00 2001 From: Zorchenhimer Date: Tue, 26 Mar 2019 11:41:31 -0400 Subject: [PATCH 1/2] Expand graceful shutdown to use http.Shutdown() Not sure if this is better or not, but I'm testing it out. --- chatroom.go | 30 +++++++++++++ main.go | 121 ++++++++++++++++++++++++++++++---------------------- 2 files changed, 101 insertions(+), 50 deletions(-) diff --git a/chatroom.go b/chatroom.go index aebde3e..ca1e118 100644 --- a/chatroom.go +++ b/chatroom.go @@ -30,6 +30,8 @@ type ChatRoom struct { modPasswords []string // single-use mod passwords modPasswordsMtx sync.Mutex + + isShutdown bool } //initializing the chatroom @@ -53,6 +55,10 @@ func newChatRoom() (*ChatRoom, error) { } func (cr *ChatRoom) JoinTemp(conn *chatConnection) (string, error) { + if cr.isShutdown { + return "", fmt.Errorf("Server is shutting down") + } + defer cr.clientsMtx.Unlock() cr.clientsMtx.Lock() @@ -77,6 +83,10 @@ func (cr *ChatRoom) JoinTemp(conn *chatConnection) (string, error) { //registering a new client //returns pointer to a Client, or Nil, if the name is already taken func (cr *ChatRoom) Join(name, uid string) (*Client, error) { + if cr.isShutdown { + return nil, fmt.Errorf("Server is shutting down") + } + defer cr.clientsMtx.Unlock() cr.clientsMtx.Lock() @@ -504,3 +514,23 @@ func (cr *ChatRoom) changeName(oldName, newName string, forced bool) error { return fmt.Errorf("Client not found with name %q", oldName) } + +func (cr *ChatRoom) Shutdown() { + cr.isShutdown = true + common.LogInfoln("ChatRoom is shutting down.") + + cr.clientsMtx.Lock() + defer cr.clientsMtx.Unlock() + + for uuid, client := range cr.clients { + common.LogDebugf("Closing connection for %s", uuid) + client.conn.Close() + } + + for uuid, conn := range cr.tempConn { + common.LogDebugf("Closing connection for temp %s", uuid) + conn.Close() + } + + common.LogInfoln("ChatRoom Shutdown() finished") +} diff --git a/main.go b/main.go index 929bae3..e59121c 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,13 @@ package main import ( + "context" "flag" "fmt" "net/http" "os" "os/signal" + "time" "github.com/nareix/joy4/format" "github.com/nareix/joy4/format/rtmp" @@ -13,9 +15,10 @@ import ( ) var ( - addr string - sKey string - stats = newStreamStats() + addr string + sKey string + stats = newStreamStats() + chatServer *http.Server ) func setupSettings() error { @@ -52,9 +55,6 @@ func main() { os.Exit(1) } - exit := make(chan bool) - go handleInterrupt(exit) - // Load emotes before starting server. var err error if chat, err = newChatRoom(); err != nil { @@ -76,53 +76,74 @@ func main() { common.LogInfoln("Admin password: ", settings.AdminPassword) common.LogInfoln("Listen and serve ", addr) - go startServer() - go startRmtpServer() - - <-exit -} - -func startRmtpServer() { server := &rtmp.Server{ HandlePlay: handlePlay, HandlePublish: handlePublish, } - err := server.ListenAndServe() - if err != nil { - // If the server cannot start, don't pretend we can continue. - panic("Error trying to start rtmp server: " + err.Error()) + chatServer := &http.Server{ + Addr: addr, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, } -} - -func startServer() { - // Chat websocket - http.HandleFunc("/ws", wsHandler) - http.HandleFunc("/static/js/", wsStaticFiles) - http.HandleFunc("/static/css/", wsStaticFiles) - http.HandleFunc("/static/img/", wsImages) - http.HandleFunc("/static/main.wasm", wsWasmFile) - http.HandleFunc("/emotes/", wsEmotes) - http.HandleFunc("/favicon.ico", wsStaticFiles) - http.HandleFunc("/chat", handleIndexTemplate) - http.HandleFunc("/video", handleIndexTemplate) - http.HandleFunc("/help", handleHelpTemplate) - - http.HandleFunc("/", handleDefault) - - err := http.ListenAndServe(addr, nil) - if err != nil { - // If the server cannot start, don't pretend we can continue. - panic("Error trying to start chat/http server: " + err.Error()) - } -} - -func handleInterrupt(exit chan bool) { - ch := make(chan os.Signal) - signal.Notify(ch, os.Interrupt) - <-ch - common.LogInfoln("Closing server") - if settings.StreamStats { - stats.Print() - } - exit <- true + + chatServer.RegisterOnShutdown(func() { chat.Shutdown() }) + //server.RegisterOnShutdown(func() { common.LogDebugln("server shutdown callback called.") }) + + // Signal handler + exit := make(chan bool) + go func() { + ch := make(chan os.Signal) + signal.Notify(ch, os.Interrupt) + <-ch + common.LogInfoln("Closing server") + if settings.StreamStats { + stats.Print() + } + + if err := chatServer.Shutdown(context.Background()); err != nil { + common.LogErrorf("Error shutting down chat server: %v", err) + } + + common.LogInfoln("Shutdown() sent. Sending exit.") + exit <- true + }() + + // Chat and HTTP server + go func() { + // Chat websocket + mux := http.NewServeMux() + mux.HandleFunc("/ws", wsHandler) + mux.HandleFunc("/static/js/", wsStaticFiles) + mux.HandleFunc("/static/css/", wsStaticFiles) + mux.HandleFunc("/static/img/", wsImages) + mux.HandleFunc("/static/main.wasm", wsWasmFile) + mux.HandleFunc("/emotes/", wsEmotes) + mux.HandleFunc("/favicon.ico", wsStaticFiles) + mux.HandleFunc("/chat", handleIndexTemplate) + mux.HandleFunc("/video", handleIndexTemplate) + mux.HandleFunc("/help", handleHelpTemplate) + + mux.HandleFunc("/", handleDefault) + + chatServer.Handler = mux + err := chatServer.ListenAndServe() + if err != http.ErrServerClosed { + // If the server cannot start, don't pretend we can continue. + panic("Error trying to start chat/http server: " + err.Error()) + } + common.LogDebugln("ChatServer closed.") + }() + + // RTMP server + go func() { + err := server.ListenAndServe() + if err != http.ErrServerClosed { + // If the server cannot start, don't pretend we can continue. + panic("Error trying to start rtmp server: " + err.Error()) + } + common.LogDebugln("RTMP server closed.") + }() + + <-exit } From e879112b1bd133100425b8e66184103863084e6f Mon Sep 17 00:00:00 2001 From: Zorchenhimer Date: Thu, 28 Mar 2019 10:18:17 -0400 Subject: [PATCH 2/2] Add some comments clarifying some things --- chatroom.go | 5 +++++ main.go | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/chatroom.go b/chatroom.go index ca1e118..8fb41bd 100644 --- a/chatroom.go +++ b/chatroom.go @@ -55,6 +55,7 @@ func newChatRoom() (*ChatRoom, error) { } func (cr *ChatRoom) JoinTemp(conn *chatConnection) (string, error) { + // Don't allow new joins when the server is closing. if cr.isShutdown { return "", fmt.Errorf("Server is shutting down") } @@ -83,6 +84,7 @@ func (cr *ChatRoom) JoinTemp(conn *chatConnection) (string, error) { //registering a new client //returns pointer to a Client, or Nil, if the name is already taken func (cr *ChatRoom) Join(name, uid string) (*Client, error) { + // Don't allow new joins when the server is closing. if cr.isShutdown { return nil, fmt.Errorf("Server is shutting down") } @@ -515,6 +517,9 @@ func (cr *ChatRoom) changeName(oldName, newName string, forced bool) error { return fmt.Errorf("Client not found with name %q", oldName) } +// Shutdown the chatroom. First, dissallow new joins by setting +// isShutdown, then close each client connection. This would be +// a good place to put a final command that gets sent to the client. func (cr *ChatRoom) Shutdown() { cr.isShutdown = true common.LogInfoln("ChatRoom is shutting down.") diff --git a/main.go b/main.go index e59121c..dae9ef6 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,8 @@ func main() { HandlePlay: handlePlay, HandlePublish: handlePublish, } + + // Define this here so we can set some timeouts and things. chatServer := &http.Server{ Addr: addr, ReadTimeout: 10 * time.Second, @@ -88,8 +90,13 @@ func main() { } chatServer.RegisterOnShutdown(func() { chat.Shutdown() }) + // rtmp.Server does not implement .RegisterOnShutdown() //server.RegisterOnShutdown(func() { common.LogDebugln("server shutdown callback called.") }) + // These have been moved back to annon functitons so I could use + // `server`, `chatServer`, and `exit` in them without needing to + // pass them as parameters. + // Signal handler exit := make(chan bool) go func() { @@ -111,7 +118,9 @@ func main() { // Chat and HTTP server go func() { - // Chat websocket + // Use a ServeMux here instead of the default, global, + // http handler. It's a good idea when we're starting more + // than one server. mux := http.NewServeMux() mux.HandleFunc("/ws", wsHandler) mux.HandleFunc("/static/js/", wsStaticFiles) @@ -128,6 +137,8 @@ func main() { chatServer.Handler = mux err := chatServer.ListenAndServe() + // http.ErrServerClosed is returned when server.Shuddown() + // is called. if err != http.ErrServerClosed { // If the server cannot start, don't pretend we can continue. panic("Error trying to start chat/http server: " + err.Error()) @@ -138,6 +149,8 @@ func main() { // RTMP server go func() { err := server.ListenAndServe() + // http.ErrServerClosed is returned when server.Shuddown() + // is called. if err != http.ErrServerClosed { // If the server cannot start, don't pretend we can continue. panic("Error trying to start rtmp server: " + err.Error())