141 lines
3.6 KiB
Go
141 lines
3.6 KiB
Go
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/pkg/errors"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
libhttp "github.com/oceanbase/configserver/lib/http"
|
|
)
|
|
|
|
type HttpServer struct {
|
|
// server will be stopped, new request will be rejected
|
|
Stopping int32
|
|
// current session count, concurrent safely
|
|
Counter *Counter
|
|
// http routers
|
|
Router *gin.Engine
|
|
// address
|
|
Address string
|
|
// http server, call its Run, Shutdown methods
|
|
Server *http.Server
|
|
// stop the http.Server by calling cancel method
|
|
Cancel context.CancelFunc
|
|
}
|
|
|
|
// UseCounter use counter middleware
|
|
func (server *HttpServer) UseCounter() {
|
|
server.Router.Use(
|
|
server.counterPreHandlerFunc,
|
|
server.counterPostHandlerFunc,
|
|
)
|
|
}
|
|
|
|
// Run start a httpServer
|
|
// when ctx is cancelled, call shutdown to stop the httpServer
|
|
func (server *HttpServer) Run(ctx context.Context) {
|
|
|
|
server.Server.Handler = server.Router
|
|
if server.Address != "" {
|
|
log.WithContext(ctx).Infof("listen on address: %s", server.Address)
|
|
tcpListener, err := libhttp.NewTcpListener(server.Address)
|
|
if err != nil {
|
|
log.WithError(err).
|
|
Errorf("create tcp listener on address '%s' failed %v", server.Address, err)
|
|
return
|
|
}
|
|
go func() {
|
|
if err := server.Server.Serve(tcpListener); err != nil {
|
|
log.WithError(err).
|
|
Info("tcp server exited")
|
|
}
|
|
}()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
if err := server.Shutdown(ctx); err != nil {
|
|
log.WithContext(ctx).
|
|
WithError(err).
|
|
Error("server shutdown failed!")
|
|
// in a for loop, sleep 100ms
|
|
time.Sleep(time.Millisecond * 100)
|
|
} else {
|
|
log.WithContext(ctx).Info("server shutdown successfully.")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// shutdown httpServer can shutdown if sessionCount is 0,
|
|
// otherwise, return an error
|
|
func (server *HttpServer) Shutdown(ctx context.Context) error {
|
|
atomic.StoreInt32(&(server.Stopping), 1)
|
|
sessionCount := atomic.LoadInt32(&server.Counter.sessionCount)
|
|
if sessionCount > 0 {
|
|
return errors.Errorf("server shutdown failed, cur-session count:%d, shutdown will be success when wait session-count is 0.", sessionCount)
|
|
}
|
|
return server.Server.Close()
|
|
}
|
|
|
|
// counterPreHandlerFunc middleware for httpServer session count, before process a request
|
|
func (server *HttpServer) counterPreHandlerFunc(c *gin.Context) {
|
|
if atomic.LoadInt32(&(server.Stopping)) == 1 {
|
|
c.Abort()
|
|
c.JSON(http.StatusServiceUnavailable, "server is shutdowning now.")
|
|
return
|
|
}
|
|
|
|
server.Counter.incr()
|
|
|
|
c.Next()
|
|
}
|
|
|
|
// counterPostHandlerFunc middleware for httpServer session count, after process a request
|
|
func (server *HttpServer) counterPostHandlerFunc(c *gin.Context) {
|
|
c.Next()
|
|
server.Counter.decr()
|
|
}
|
|
|
|
// counter session counter
|
|
// when server receive a request, sessionCount +1,
|
|
// when the request returns a response, sessionCount -1.
|
|
type Counter struct {
|
|
sessionCount int32
|
|
sync.Mutex
|
|
}
|
|
|
|
// incr sessionCount +1 concurrent safely
|
|
func (c *Counter) incr() {
|
|
c.Lock()
|
|
c.sessionCount++
|
|
defer c.Unlock()
|
|
}
|
|
|
|
// decr sessionCount -1 concurrent safely
|
|
func (c *Counter) decr() {
|
|
c.Lock()
|
|
c.sessionCount--
|
|
defer c.Unlock()
|
|
}
|