Files
oceanbase/tools/ob-configserver/server/http_server.go
2022-05-30 16:44:36 +08:00

141 lines
3.6 KiB
Go

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
package server
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
libhttp "github.com/oceanbase/configserver/lib/http"
)
type HttpServer struct {
// server will be stopped, new request will be rejected
Stopping int32
// current session count, concurrent safely
Counter *Counter
// http routers
Router *gin.Engine
// address
Address string
// http server, call its Run, Shutdown methods
Server *http.Server
// stop the http.Server by calling cancel method
Cancel context.CancelFunc
}
// UseCounter use counter middleware
func (server *HttpServer) UseCounter() {
server.Router.Use(
server.counterPreHandlerFunc,
server.counterPostHandlerFunc,
)
}
// Run start a httpServer
// when ctx is cancelled, call shutdown to stop the httpServer
func (server *HttpServer) Run(ctx context.Context) {
server.Server.Handler = server.Router
if server.Address != "" {
log.WithContext(ctx).Infof("listen on address: %s", server.Address)
tcpListener, err := libhttp.NewTcpListener(server.Address)
if err != nil {
log.WithError(err).
Errorf("create tcp listener on address '%s' failed %v", server.Address, err)
return
}
go func() {
if err := server.Server.Serve(tcpListener); err != nil {
log.WithError(err).
Info("tcp server exited")
}
}()
}
for {
select {
case <-ctx.Done():
if err := server.Shutdown(ctx); err != nil {
log.WithContext(ctx).
WithError(err).
Error("server shutdown failed!")
// in a for loop, sleep 100ms
time.Sleep(time.Millisecond * 100)
} else {
log.WithContext(ctx).Info("server shutdown successfully.")
return
}
}
}
}
// shutdown httpServer can shutdown if sessionCount is 0,
// otherwise, return an error
func (server *HttpServer) Shutdown(ctx context.Context) error {
atomic.StoreInt32(&(server.Stopping), 1)
sessionCount := atomic.LoadInt32(&server.Counter.sessionCount)
if sessionCount > 0 {
return errors.Errorf("server shutdown failed, cur-session count:%d, shutdown will be success when wait session-count is 0.", sessionCount)
}
return server.Server.Close()
}
// counterPreHandlerFunc middleware for httpServer session count, before process a request
func (server *HttpServer) counterPreHandlerFunc(c *gin.Context) {
if atomic.LoadInt32(&(server.Stopping)) == 1 {
c.Abort()
c.JSON(http.StatusServiceUnavailable, "server is shutdowning now.")
return
}
server.Counter.incr()
c.Next()
}
// counterPostHandlerFunc middleware for httpServer session count, after process a request
func (server *HttpServer) counterPostHandlerFunc(c *gin.Context) {
c.Next()
server.Counter.decr()
}
// counter session counter
// when server receive a request, sessionCount +1,
// when the request returns a response, sessionCount -1.
type Counter struct {
sessionCount int32
sync.Mutex
}
// incr sessionCount +1 concurrent safely
func (c *Counter) incr() {
c.Lock()
c.sessionCount++
defer c.Unlock()
}
// decr sessionCount -1 concurrent safely
func (c *Counter) decr() {
c.Lock()
c.sessionCount--
defer c.Unlock()
}