141 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| package server
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"net/http"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/gin-gonic/gin"
 | |
| 	"github.com/pkg/errors"
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| 
 | |
| 	libhttp "github.com/oceanbase/configserver/lib/http"
 | |
| )
 | |
| 
 | |
| type HttpServer struct {
 | |
| 	// server will be stopped, new request will be rejected
 | |
| 	Stopping int32
 | |
| 	// current session count, concurrent safely
 | |
| 	Counter *Counter
 | |
| 	// http routers
 | |
| 	Router *gin.Engine
 | |
| 	// address
 | |
| 	Address string
 | |
| 	// http server, call its Run, Shutdown methods
 | |
| 	Server *http.Server
 | |
| 	// stop the http.Server by calling cancel method
 | |
| 	Cancel context.CancelFunc
 | |
| }
 | |
| 
 | |
| // UseCounter use counter middleware
 | |
| func (server *HttpServer) UseCounter() {
 | |
| 	server.Router.Use(
 | |
| 		server.counterPreHandlerFunc,
 | |
| 		server.counterPostHandlerFunc,
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // Run start a httpServer
 | |
| // when ctx is cancelled, call shutdown to stop the httpServer
 | |
| func (server *HttpServer) Run(ctx context.Context) {
 | |
| 
 | |
| 	server.Server.Handler = server.Router
 | |
| 	if server.Address != "" {
 | |
| 		log.WithContext(ctx).Infof("listen on address: %s", server.Address)
 | |
| 		tcpListener, err := libhttp.NewTcpListener(server.Address)
 | |
| 		if err != nil {
 | |
| 			log.WithError(err).
 | |
| 				Errorf("create tcp listener on address '%s' failed %v", server.Address, err)
 | |
| 			return
 | |
| 		}
 | |
| 		go func() {
 | |
| 			if err := server.Server.Serve(tcpListener); err != nil {
 | |
| 				log.WithError(err).
 | |
| 					Info("tcp server exited")
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			if err := server.Shutdown(ctx); err != nil {
 | |
| 				log.WithContext(ctx).
 | |
| 					WithError(err).
 | |
| 					Error("server shutdown failed!")
 | |
| 				// in a for loop, sleep 100ms
 | |
| 				time.Sleep(time.Millisecond * 100)
 | |
| 			} else {
 | |
| 				log.WithContext(ctx).Info("server shutdown successfully.")
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // shutdown httpServer can shutdown if sessionCount is 0,
 | |
| // otherwise, return an error
 | |
| func (server *HttpServer) Shutdown(ctx context.Context) error {
 | |
| 	atomic.StoreInt32(&(server.Stopping), 1)
 | |
| 	sessionCount := atomic.LoadInt32(&server.Counter.sessionCount)
 | |
| 	if sessionCount > 0 {
 | |
| 		return errors.Errorf("server shutdown failed, cur-session count:%d, shutdown will be success when wait session-count is 0.", sessionCount)
 | |
| 	}
 | |
| 	return server.Server.Close()
 | |
| }
 | |
| 
 | |
| // counterPreHandlerFunc middleware for httpServer session count, before process a request
 | |
| func (server *HttpServer) counterPreHandlerFunc(c *gin.Context) {
 | |
| 	if atomic.LoadInt32(&(server.Stopping)) == 1 {
 | |
| 		c.Abort()
 | |
| 		c.JSON(http.StatusServiceUnavailable, "server is shutdowning now.")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	server.Counter.incr()
 | |
| 
 | |
| 	c.Next()
 | |
| }
 | |
| 
 | |
| // counterPostHandlerFunc middleware for httpServer session count, after process a request
 | |
| func (server *HttpServer) counterPostHandlerFunc(c *gin.Context) {
 | |
| 	c.Next()
 | |
| 	server.Counter.decr()
 | |
| }
 | |
| 
 | |
| // counter session counter
 | |
| // when server receive a request, sessionCount +1,
 | |
| // when the request returns a response, sessionCount -1.
 | |
| type Counter struct {
 | |
| 	sessionCount int32
 | |
| 	sync.Mutex
 | |
| }
 | |
| 
 | |
| // incr sessionCount +1 concurrent safely
 | |
| func (c *Counter) incr() {
 | |
| 	c.Lock()
 | |
| 	c.sessionCount++
 | |
| 	defer c.Unlock()
 | |
| }
 | |
| 
 | |
| // decr sessionCount -1 concurrent safely
 | |
| func (c *Counter) decr() {
 | |
| 	c.Lock()
 | |
| 	c.sessionCount--
 | |
| 	defer c.Unlock()
 | |
| }
 | 
