141 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
package server
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"net/http"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/gin-gonic/gin"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
 | 
						|
	libhttp "github.com/oceanbase/configserver/lib/http"
 | 
						|
)
 | 
						|
 | 
						|
type HttpServer struct {
 | 
						|
	// server will be stopped, new request will be rejected
 | 
						|
	Stopping int32
 | 
						|
	// current session count, concurrent safely
 | 
						|
	Counter *Counter
 | 
						|
	// http routers
 | 
						|
	Router *gin.Engine
 | 
						|
	// address
 | 
						|
	Address string
 | 
						|
	// http server, call its Run, Shutdown methods
 | 
						|
	Server *http.Server
 | 
						|
	// stop the http.Server by calling cancel method
 | 
						|
	Cancel context.CancelFunc
 | 
						|
}
 | 
						|
 | 
						|
// UseCounter use counter middleware
 | 
						|
func (server *HttpServer) UseCounter() {
 | 
						|
	server.Router.Use(
 | 
						|
		server.counterPreHandlerFunc,
 | 
						|
		server.counterPostHandlerFunc,
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
// Run start a httpServer
 | 
						|
// when ctx is cancelled, call shutdown to stop the httpServer
 | 
						|
func (server *HttpServer) Run(ctx context.Context) {
 | 
						|
 | 
						|
	server.Server.Handler = server.Router
 | 
						|
	if server.Address != "" {
 | 
						|
		log.WithContext(ctx).Infof("listen on address: %s", server.Address)
 | 
						|
		tcpListener, err := libhttp.NewTcpListener(server.Address)
 | 
						|
		if err != nil {
 | 
						|
			log.WithError(err).
 | 
						|
				Errorf("create tcp listener on address '%s' failed %v", server.Address, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		go func() {
 | 
						|
			if err := server.Server.Serve(tcpListener); err != nil {
 | 
						|
				log.WithError(err).
 | 
						|
					Info("tcp server exited")
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			if err := server.Shutdown(ctx); err != nil {
 | 
						|
				log.WithContext(ctx).
 | 
						|
					WithError(err).
 | 
						|
					Error("server shutdown failed!")
 | 
						|
				// in a for loop, sleep 100ms
 | 
						|
				time.Sleep(time.Millisecond * 100)
 | 
						|
			} else {
 | 
						|
				log.WithContext(ctx).Info("server shutdown successfully.")
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// shutdown httpServer can shutdown if sessionCount is 0,
 | 
						|
// otherwise, return an error
 | 
						|
func (server *HttpServer) Shutdown(ctx context.Context) error {
 | 
						|
	atomic.StoreInt32(&(server.Stopping), 1)
 | 
						|
	sessionCount := atomic.LoadInt32(&server.Counter.sessionCount)
 | 
						|
	if sessionCount > 0 {
 | 
						|
		return errors.Errorf("server shutdown failed, cur-session count:%d, shutdown will be success when wait session-count is 0.", sessionCount)
 | 
						|
	}
 | 
						|
	return server.Server.Close()
 | 
						|
}
 | 
						|
 | 
						|
// counterPreHandlerFunc middleware for httpServer session count, before process a request
 | 
						|
func (server *HttpServer) counterPreHandlerFunc(c *gin.Context) {
 | 
						|
	if atomic.LoadInt32(&(server.Stopping)) == 1 {
 | 
						|
		c.Abort()
 | 
						|
		c.JSON(http.StatusServiceUnavailable, "server is shutdowning now.")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	server.Counter.incr()
 | 
						|
 | 
						|
	c.Next()
 | 
						|
}
 | 
						|
 | 
						|
// counterPostHandlerFunc middleware for httpServer session count, after process a request
 | 
						|
func (server *HttpServer) counterPostHandlerFunc(c *gin.Context) {
 | 
						|
	c.Next()
 | 
						|
	server.Counter.decr()
 | 
						|
}
 | 
						|
 | 
						|
// counter session counter
 | 
						|
// when server receive a request, sessionCount +1,
 | 
						|
// when the request returns a response, sessionCount -1.
 | 
						|
type Counter struct {
 | 
						|
	sessionCount int32
 | 
						|
	sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// incr sessionCount +1 concurrent safely
 | 
						|
func (c *Counter) incr() {
 | 
						|
	c.Lock()
 | 
						|
	c.sessionCount++
 | 
						|
	defer c.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// decr sessionCount -1 concurrent safely
 | 
						|
func (c *Counter) decr() {
 | 
						|
	c.Lock()
 | 
						|
	c.sessionCount--
 | 
						|
	defer c.Unlock()
 | 
						|
}
 |