基于Go的反向代理简单实现

基础web服务

简单实现两个web服务,分别监听9091和9092端口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
)

type web1Handler struct {}

func (h *web1Handler) ServeHTTP(w http.ResponseWriter,r *http.Request){
w.Write([]byte("hello web1"))
}

type web2Handler struct {}

func (h *web2Handler) ServeHTTP(w http.ResponseWriter,r *http.Request){
w.Write([]byte("hello web2"))
}

func main() {
c := make(chan os.Signal)

go func() {
http.ListenAndServe(":9091", &web1Handler{})
}()

go func(){
http.ListenAndServe(":9092", &web2Handler{})
}()

signal.Notify(c, os.Interrupt)
s := <-c
log.Println(s)
}

基础代理

构建一个简单的代理服务器,监听8080端口,将/a的请求发送给9091端口服务器,将/b的请求发送给9092端口服务器。其本质就是代理服务器监听请求,帮client做了请求,然后将结果返回给client 和VPN类似。可能在本次案例中,会暴露其效率的问题。但是以Nginx的实现,基于异步事件驱动的模型,可以抗住10w+的并发。本代码只用于体会其反向代理的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"os/signal"
)

type ProxyHandler struct {

}

func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter,r *http.Request){
// 异常处理
defer func() {
if err := recover(); err != nil{
w.WriteHeader(http.StatusInternalServerError)
log.Println(err)
}
}()

if r.URL.Path == "/a"{
ProxyRequest(w,r,"http://localhost:9091")
return
}
if r.URL.Path == "/b"{
ProxyRequest(w,r,"http://localhost:9092")
return
}
w.Write([]byte("default web page."))
}


func ProxyRequest(w http.ResponseWriter, r *http.Request,url string) {
newRequest, _ := http.NewRequest(r.Method,url,r.Body)
newResponse,_ := http.DefaultClient.Do(newRequest)
defer newResponse.Body.Close()
result,_ := ioutil.ReadAll(newResponse.Body)
w.Write([]byte(result))
}

func main() {
c := make(chan os.Signal)
go func() {
http.ListenAndServe(":8080", &ProxyHandler{})
}()
signal.Notify(c, os.Interrupt)
<-c
}

如此一个简单的反向代理即实现了,但是还有很多细节需要注意

  1. 反向代理时,应该将Header信息、状态码也一并代理转发。
  2. 反向代理时,server如何获取真实的IP。
  3. 如果使用Go内置的反向代理工具。
  4. 反向代理时,如何使用自动化配置代理服务器映射。
  5. 发现代理时,如何为client选择使用哪台服务器,即服务选择算法。

状态转发

对上述的代理函数ProxyRequest做修改,在Request前设置Header、在Response后设置Header、状态码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func ProxyRequest(w http.ResponseWriter, r *http.Request,url string) {
newRequest, _ := http.NewRequest(r.Method,url,r.Body)

for k,v := range r.Header{
newRequest.Header.Set(k,v[0])
}

newResponse,_ := http.DefaultClient.Do(newRequest)

for k,v := range newResponse.Header{
w.Header().Set(k,v[0])
}
w.WriteHeader(newResponse.StatusCode)

defer newResponse.Body.Close()
result,_ := ioutil.ReadAll(newResponse.Body)
w.Write([]byte(result))
}

配置获取真实IP

header中x-forwarded-for属性本没有意义,后来在反向代理中,常用此表示client真实的IP。在有此属性时优先以此值作为真实的IP。由此可以在爬虫时,设置此值骗过一些ip限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ProxyRequest(w http.ResponseWriter, r *http.Request,url string) {
newRequest, _ := http.NewRequest(r.Method,url,r.Body)

for k,v := range r.Header{
newRequest.Header.Set(k,v[0])
}
// 存储真实ip地址
newRequest.Header.Add("x-forwarded-for",r.RemoteAddr)

newResponse,_ := http.DefaultClient.Do(newRequest)

for k,v := range newResponse.Header{
w.Header().Set(k,v[0])
}
w.WriteHeader(newResponse.StatusCode)

defer newResponse.Body.Close()
result,_ := ioutil.ReadAll(newResponse.Body)
w.Write([]byte(result))
}

在web中根据Header中x-forwarded-for获取真实IP。

1
2
3
4
5
6
7
8
9
10
11
12
type web1Handler struct {}

func (h *web1Handler) GetIp(r *http.Request) string {
ips := r.Header.Get("x-forwarded-for")
if ips != ""{
ips_list := strings.Split(ips, ",")
if len(ips_list) > 0 && ips_list[0] != ""{
return ips_list[0]
}
}
return r.RemoteAddr
}

根据Header中Authorization属性设置访问限制(登录)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (h *web1Handler) ServeHTTP(w http.ResponseWriter,r *http.Request){
auth := r.Header.Get("Authorization")
if auth == "" {
w.Header().Set("WWW-Authenticate", `Basic realm="input your name and password"`)
w.WriteHeader(http.StatusUnauthorized)
return
}
authList := strings.Split(auth," ")
if len(authList) == 2 && authList[0] == "Basic" {
res,err := base64.StdEncoding.DecodeString(authList[1])
if err == nil && string(res) == "kid:123"{
w.Write([]byte(fmt.Sprintf("hello web1, come from %s",h.GetIp(r))))
return
}
w.Write([]byte("账号密码错误"))
}
}

根据设置401状态码和Header中的WWW-Authenticate属性,可以触发浏览器默认的登录弹窗。

登录后,Header会默认将账号密码以base64(username:password)的形式编码填充到Authorization属性中。server获取header从中解码拿到username和password。

1
Authorization: Basic a2lkOjEyMw==

base64解码a2lkOjEyMw==得到:kid:123

自动化配置IP-server映射

以ini文件配置为例,读去env.ini配置文件,根据配置文件选择服务器映射。

1
2
3
4
5
6
7
8
9
[proxy]

[proxy.a]
path=/a
pass=http://localhost:9091

[proxy.b]
path=/b
pass=http://localhost:9092

go-ini读配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package conf

import (
"github.com/go-ini/ini"
)

var ProxyConfig map[string]string

func init() {
ProxyConfig = make(map[string]string)
EnvConfig,err := ini.Load("env.ini")
if err != nil {
panic(err)
}

proxy,_ := EnvConfig.GetSection("proxy")
if proxy != nil {
secs := proxy.ChildSections()
for _, c := range secs {
path,_ := c.GetKey("path")
pass,_ := c.GetKey("pass")
if path != nil && pass != nil{
ProxyConfig[path.Value()] = pass.Value()
}
}
}
}

代理根据配置,自动化映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter,r *http.Request){

defer func() {
if err := recover(); err != nil{
w.WriteHeader(http.StatusInternalServerError)
log.Println(err)
}
}()

// auto config
for k,v := range ProxyConfig{
if matched,_ := regexp.MatchString(k,r.URL.Path);matched{
ProxyRequest(w,r,v)
return
}
}
w.Write([]byte("default web page."))
}

GO内置的反向代理

使用httputil.NewSingleHostReverseProxy内置的默认代理,也能实现如上的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter,r *http.Request){

defer func() {
if err := recover(); err != nil{
w.WriteHeader(http.StatusInternalServerError)
log.Println(err)
}
}()

// auto config
for k,v := range ProxyConfig{
if matched,_ := regexp.MatchString(k,r.URL.Path);matched{
// go 内置的反向代理
target,_ := url.Parse(v)
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ServeHTTP(w,r)
return
}
}
w.Write([]byte("default web page."))
}

更底层的实现是:Transport.RoundTrip(newRequest)

1
2
newResponse,_ := http.DefaultClient.Do(newRequest)
newResponse,_ := http.DefaultTransport.RoundTrip(newRequest)

这两个功能相同,下面个更加底层,且可以通过设置http.Transport{}实现更细粒度的控制。

负载均衡算法

同样的服务,有N台服务器提供服务,如何选择呢?

  1. 随机选择
  2. 根据IP-Hash选择,保证通讯连接的稳定(每次都为client分配同一个server)session稳定。
  3. 随机权重选择,根据服务器情况对服务器选择概率进行加权。
  4. 轮询
  5. 加权轮询
  6. 平滑加权轮询
  7. 普通轮询的计数器机制
  8. 平滑加权轮询的降权机制
  • 随机选择、IP-Hash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package conf

import (
"hash/crc32"
"math/rand"
"time"
)

type HttpServer struct {
Host string `json:"host"`
}

func NewHttpServer(host string) *HttpServer {
return &HttpServer{Host: host}
}

type LoadBalance struct {
Servers []*HttpServer
}

func NewLoadBalance() *LoadBalance {
return &LoadBalance{Servers:make([]*HttpServer, 0)}
}

func(l *LoadBalance) AddServer(server *HttpServer) {
l.Servers = append(l.Servers, server)
}

// 随机算法
func (l *LoadBalance) SelectServerByRandom() *HttpServer{
rand.Seed(time.Now().UnixNano())
index := rand.Intn(len(l.Servers))
return l.Servers[index]
}

// IP Hash算法
func (l *LoadBalance) SelectServerByIpHash(ip string) *HttpServer{
sum := crc32.ChecksumIEEE([]byte(ip))
index := int(sum) % len(l.Servers)
return l.Servers[index]
}
  • 随机权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type HttpServer struct {
Host string
Weight int
}

func NewHttpServer(host string,weight int) *HttpServer {
return &HttpServer{Host: host,Weight:weight}
}

type LoadBalance struct {
Servers []*HttpServer
}

func NewLoadBalance() *LoadBalance {
return &LoadBalance{Servers:make([]*HttpServer, 0)}
}

func(l *LoadBalance) AddServer(server *HttpServer) {
l.Servers = append(l.Servers, server)
}

var LB *LoadBalance
var ServerIndexs []int
func init() {
LB= NewLoadBalance()
LB.AddServer(NewHttpServer("http://localhost:9091",5))
LB.AddServer(NewHttpServer("http://localhost:9092",15))

for i,server := range LB.Servers {
if server.Weight > 0{
for j:=0;j<server.Weight;j++{
ServerIndexs = append(ServerIndexs,i)
}
}
}
}

// 加权随机算法
func (l *LoadBalance) SelectServerByWeightRandom() *HttpServer{
rand.Seed(time.Now().UnixNano())
index := rand.Intn(len(ServerIndexs))
return l.Servers[ServerIndexs[index]]
}

// 加权随机算法 - 改良版
func (l *LoadBalance) SelectServerByWeightRandom2() *HttpServer{
rand.Seed(time.Now().UnixNano())
sumList := make([]int, len(l.Servers))
sum := 0
for i, s := range l.Servers{
sum += s.Weight
sumList[i] = sum
}
rad := rand.Intn(sum)
for i, v := range sumList{
if rad < v{
return l.Servers[i]
}
}
return l.Servers[0]
}

此处使用了两种方式实现

  1. 基于数组空间的随机 ,如web1:web2 = 2:3,则随机空间为 [0,0,1,1,1]

  2. 基于数字区间的随机,如web1:web2 = 2:3,则随机数可能落在: [0,2],[2,5]

  • 轮询[0,1],[0,1],[0,1]….
1
2
3
4
5
6
7
8
9
10
11
type LoadBalance struct {
Servers []*HttpServer
CurIndex int // 当前服务器索引
}

// 轮询算法
func (l *LoadBalance) SelectServerByRound() *HttpServer{
server := l.Servers[l.CurIndex]
l.CurIndex = (l.CurIndex+1) % len(l.Servers)
return server
}
  • 加权轮询

    直接参考随机加权[0,0,1,1,1],[0,0,1,1,1],[0,0,1,1,1]…..

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 加权轮询算法
func (l *LoadBalance) SelectServerByRoundWeight() *HttpServer{
server := l.Servers[0]
sum := 0

for i,s := range l.Servers{
sum += s.Weight
if l.CurIndex < sum{
server = s
if l.CurIndex == sum-1 && i!=len(l.Servers)-1{
l.CurIndex++
}else{
l.CurIndex = (l.CurIndex+1) % sum
}
println(l.CurIndex)
break
}
}
return server
}
  • 平滑加权轮询

    在加权轮询的基础上,减少轮询时集中对服务请求的问题。对选中的服务进行降权,减少下次选他的权重,周而复返。

    1
    2
    3
    4
    5
    如 w1:w2=1:3,总权重为4
    选w2,w2权重-总权重 = -1,对w1,w2增加其权重 w1:w2=2:2
    选w2,w2权重-总权重 = -2, 对w1,w2增加其权重 w1:w2=3:1
    选w1,w1权重-总权重 = -1, 对w1,w2增加其权重 w1:w2=0:4
    选w2,w2权重-总权重 = 0, 对w1,w2增加其权重 w1:w2=1:3 --恢复原状
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    type HttpServer struct {
    Host string
    Weight int
    CWeight int // 当前权重
    }

    func NewHttpServer(host string,weight int) *HttpServer {
    return &HttpServer{Host: host,Weight:weight}
    }

    type HttpServers []*HttpServer
    func (s HttpServers) Len() int{return len(s)}
    func (s HttpServers) Less(i, j int) bool{return s[i].CWeight > s[j].CWeight}
    func (s HttpServers) Swap(i, j int){s[i], s[j] = s[j], s[i]}


    type LoadBalance struct {
    Servers HttpServers
    }

    func NewLoadBalance() *LoadBalance {
    return &LoadBalance{Servers:make([]*HttpServer, 0)}
    }

    func(l *LoadBalance) AddServer(server *HttpServer) {
    l.Servers = append(l.Servers, server)
    }

    var LB *LoadBalance
    var sumWeight int
    func init() {
    LB= NewLoadBalance()
    LB.AddServer(NewHttpServer("http://localhost:9091",1))
    LB.AddServer(NewHttpServer("http://localhost:9092",3))

    for i,server := range LB.Servers {
    sumWeight += server.Weight
    }
    }

    // 平滑的轮询算法
    func (l *LoadBalance) SelectServerByRoundPlus() *HttpServer{
    for _,s := range l.Servers{
    s.CWeight += s.Weight
    }
    sort.Sort(l.Servers)
    max := l.Servers[0]
    max.CWeight -= sumWeight
    return max
    }

健康检查

基础健康检查监控程序

使用Head请求,不需要返回Body。定时对每个服务器进行通讯检查,并重新设置其状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
type HttpServer struct {
Host string
Status int // 服务状态,0 down,1 up
FailCount int //失败计数器,到达阈值就失败
SuccessCount int //成功计数器,连续成功即恢复可用状态
}

type HttpServers []*HttpServer
type LoadBalance struct {
Servers HttpServers
}

func checkServers(servers HttpServers){
t := time.NewTicker(time.Second *3)
checker := NewHttpChecker(servers)
for{
select {
case <-t.C:{
checker.Check(time.Second * 2)
for _,server := range servers {
println(server.Host,server.Status)
}
println("--------------------------")
}
}
}
}

var LB *LoadBalance
func init() {
LB= NewLoadBalance()
LB.AddServer(NewHttpServer("http://localhost:9091"))
LB.AddServer(NewHttpServer("http://localhost:9092"))
// 协程运行检查程序
go func() {
checkServers(LB.Servers)
}()
}


// 服务检查
type HttpChecker struct {
Servers HttpServers
}

func NewHttpChecker(servers HttpServers) *HttpChecker {
return &HttpChecker{Servers: servers}
}
// 对服务进行请求超时检查
func (h *HttpChecker) Check(timeout time.Duration){
client := http.Client{}
for _, server := range h.Servers{
res,err := client.Head(server.Host)
if res != nil {
defer res.Body.Close()
}
if err != nil {
server.Status = 0
continue
}
if res.StatusCode != http.StatusOK {
server.Status = 1
}else{
server.Status = 0
}
}
}

带计数器的健康检查监控程序

当一次网络不通时,不立刻将状态置为失败,而是给定一个失败次数,在阈值内先不调整为Down。成功时也是一样,待成功连续计数达到阈值才调整为UP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type HttpChecker struct {
Servers HttpServers
FailMax int // 失败次数阈值
}

func NewHttpChecker(servers HttpServers) *HttpChecker {
return &HttpChecker{Servers: servers,FailMax:5}
}

func (h *HttpChecker) Check(timeout time.Duration){
client := http.Client{}
for _, server := range h.Servers{
res,err := client.Head(server.Host)
if res != nil {
defer res.Body.Close()
}
if err != nil {
h.Fail(server)
continue
}
if res.StatusCode == http.StatusOK {
h.Success(server)
}else{
h.Fail(server)
}
}
}

func (h *HttpChecker) Fail(server *HttpServer) {
if server.FailCount >= h.FailMax{
server.Status = 1
}else{
server.FailCount++
}
}

func (h *HttpChecker) Success(server *HttpServer) {
if server.FailCount > 0{
server.FailCount--
}else{
server.Status = 0
}
}


// 对轮询算法加上健康检查
func IsAllDown(servers HttpServers) bool {
count := 0
for _, s := range servers{
if s.Status == 1{
count++
}
}
return count == len(servers)
}


// 轮询算法
func (l *LoadBalance) SelectServerByRound() *HttpServer{
server := l.Servers[l.CurIndex]
l.CurIndex = (l.CurIndex+1) % len(l.Servers)

// 加上健康检查,递归
if server.Status == 1 && !IsAllDown(l.Servers){
return l.SelectServerByRound()
}

return server
}

基于降权的健康检查监控程序

  1. 给server增加一个降权值:FailWeight
  2. 每次失败,FailWeight += weight*0.2
  3. 加权轮询过程中,真正的权重 Weight = weight - FailWeight,如果是0则不选择(Down)
  4. 健康检查成功,FailWeight = 0

参考:

https://www.bilibili.com/video/BV1Ft4y1y7pq?p=11