基于Go的反向代理简单实现 基础web服务 简单实现两个web服务,分别监听9091和9092端口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "fmt" "log" "net/http" "os" "os/signal" "strings" ) type web1Handler struct {}func (h *web1Handler) ServeHTTP (w http.ResponseWriter,r *http.Request) { w.Write([]byte ("hello web1" )) } type web2Handler struct {}func (h *web2Handler) ServeHTTP (w http.ResponseWriter,r *http.Request) { w.Write([]byte ("hello web2" )) } func main () { c := make (chan os.Signal) go func () { http.ListenAndServe(":9091" , &web1Handler{}) }() go func () { http.ListenAndServe(":9092" , &web2Handler{}) }() signal.Notify(c, os.Interrupt) s := <-c log.Println(s) }
基础代理 构建一个简单的代理服务器,监听8080端口,将/a的请求发送给9091端口服务器,将/b的请求发送给9092端口服务器。其本质就是代理服务器监听请求,帮client做了请求,然后将结果返回给client 和VPN类似。可能在本次案例中,会暴露其效率的问题。但是以Nginx的实现,基于异步事件驱动的模型,可以抗住10w+的并发。本代码只用于体会其反向代理的过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package mainimport ( "io/ioutil" "log" "net/http" "net/url" "os" "os/signal" ) type ProxyHandler struct {} func (h *ProxyHandler) ServeHTTP (w http.ResponseWriter,r *http.Request) { defer func () { if err := recover (); err != nil { w.WriteHeader(http.StatusInternalServerError) log.Println(err) } }() if r.URL.Path == "/a" { ProxyRequest(w,r,"http://localhost:9091" ) return } if r.URL.Path == "/b" { ProxyRequest(w,r,"http://localhost:9092" ) return } w.Write([]byte ("default web page." )) } func ProxyRequest (w http.ResponseWriter, r *http.Request,url string ) { newRequest, _ := http.NewRequest(r.Method,url,r.Body) newResponse,_ := http.DefaultClient.Do(newRequest) defer newResponse.Body.Close() result,_ := ioutil.ReadAll(newResponse.Body) w.Write([]byte (result)) } func main () { c := make (chan os.Signal) go func () { http.ListenAndServe(":8080" , &ProxyHandler{}) }() signal.Notify(c, os.Interrupt) <-c }
如此一个简单的反向代理即实现了,但是还有很多细节需要注意
反向代理时,应该将Header信息、状态码也一并代理转发。
反向代理时,server如何获取真实的IP。
如果使用Go内置的反向代理工具。
反向代理时,如何使用自动化配置代理服务器映射。
发现代理时,如何为client选择使用哪台服务器,即服务选择算法。
状态转发 对上述的代理函数ProxyRequest
做修改,在Request前设置Header、在Response后设置Header、状态码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func ProxyRequest (w http.ResponseWriter, r *http.Request,url string ) { newRequest, _ := http.NewRequest(r.Method,url,r.Body) for k,v := range r.Header{ newRequest.Header.Set(k,v[0 ]) } newResponse,_ := http.DefaultClient.Do(newRequest) for k,v := range newResponse.Header{ w.Header().Set(k,v[0 ]) } w.WriteHeader(newResponse.StatusCode) defer newResponse.Body.Close() result,_ := ioutil.ReadAll(newResponse.Body) w.Write([]byte (result)) }
配置获取真实IP header中x-forwarded-for
属性本没有意义,后来在反向代理中,常用此表示client真实的IP。在有此属性时优先以此值作为真实的IP。由此可以在爬虫时,设置此值骗过一些ip限制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func ProxyRequest (w http.ResponseWriter, r *http.Request,url string ) { newRequest, _ := http.NewRequest(r.Method,url,r.Body) for k,v := range r.Header{ newRequest.Header.Set(k,v[0 ]) } newRequest.Header.Add("x-forwarded-for" ,r.RemoteAddr) newResponse,_ := http.DefaultClient.Do(newRequest) for k,v := range newResponse.Header{ w.Header().Set(k,v[0 ]) } w.WriteHeader(newResponse.StatusCode) defer newResponse.Body.Close() result,_ := ioutil.ReadAll(newResponse.Body) w.Write([]byte (result)) }
在web中根据Header中x-forwarded-for
获取真实IP。
1 2 3 4 5 6 7 8 9 10 11 12 type web1Handler struct {}func (h *web1Handler) GetIp (r *http.Request) string { ips := r.Header.Get("x-forwarded-for" ) if ips != "" { ips_list := strings.Split(ips, "," ) if len (ips_list) > 0 && ips_list[0 ] != "" { return ips_list[0 ] } } return r.RemoteAddr }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (h *web1Handler) ServeHTTP (w http.ResponseWriter,r *http.Request) { auth := r.Header.Get("Authorization" ) if auth == "" { w.Header().Set("WWW-Authenticate" , `Basic realm="input your name and password"` ) w.WriteHeader(http.StatusUnauthorized) return } authList := strings.Split(auth," " ) if len (authList) == 2 && authList[0 ] == "Basic" { res,err := base64.StdEncoding.DecodeString(authList[1 ]) if err == nil && string (res) == "kid:123" { w.Write([]byte (fmt.Sprintf("hello web1, come from %s" ,h.GetIp(r)))) return } w.Write([]byte ("账号密码错误" )) } }
根据设置401状态码和Header中的WWW-Authenticate
属性,可以触发浏览器默认的登录弹窗。
登录后,Header会默认将账号密码以base64(username:password)的形式编码填充到Authorization
属性中。server获取header从中解码拿到username和password。
1 Authorization: Basic a2lkOjEyMw==
base64解码a2lkOjEyMw==
得到:kid:123
自动化配置IP-server映射 以ini文件配置为例,读去env.ini配置文件,根据配置文件选择服务器映射。
1 2 3 4 5 6 7 8 9 [proxy] [proxy.a] path =/apass =http://localhost:9091 [proxy.b] path =/bpass =http://localhost:9092
go-ini读配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package confimport ( "github.com/go-ini/ini" ) var ProxyConfig map [string ]string func init () { ProxyConfig = make (map [string ]string ) EnvConfig,err := ini.Load("env.ini" ) if err != nil { panic (err) } proxy,_ := EnvConfig.GetSection("proxy" ) if proxy != nil { secs := proxy.ChildSections() for _, c := range secs { path,_ := c.GetKey("path" ) pass,_ := c.GetKey("pass" ) if path != nil && pass != nil { ProxyConfig[path.Value()] = pass.Value() } } } }
代理根据配置,自动化映射。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (h *ProxyHandler) ServeHTTP (w http.ResponseWriter,r *http.Request) { defer func () { if err := recover (); err != nil { w.WriteHeader(http.StatusInternalServerError) log.Println(err) } }() for k,v := range ProxyConfig{ if matched,_ := regexp.MatchString(k,r.URL.Path);matched{ ProxyRequest(w,r,v) return } } w.Write([]byte ("default web page." )) }
GO内置的反向代理 使用httputil.NewSingleHostReverseProxy
内置的默认代理,也能实现如上的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (h *ProxyHandler) ServeHTTP (w http.ResponseWriter,r *http.Request) { defer func () { if err := recover (); err != nil { w.WriteHeader(http.StatusInternalServerError) log.Println(err) } }() for k,v := range ProxyConfig{ if matched,_ := regexp.MatchString(k,r.URL.Path);matched{ target,_ := url.Parse(v) proxy := httputil.NewSingleHostReverseProxy(target) proxy.ServeHTTP(w,r) return } } w.Write([]byte ("default web page." )) }
更底层的实现是:Transport.RoundTrip(newRequest)
。
1 2 newResponse,_ := http.DefaultClient.Do(newRequest) newResponse,_ := http.DefaultTransport.RoundTrip(newRequest)
这两个功能相同,下面个更加底层,且可以通过设置http.Transport{}
实现更细粒度的控制。
负载均衡算法 同样的服务,有N台服务器提供服务,如何选择呢?
随机选择
根据IP-Hash选择,保证通讯连接的稳定(每次都为client分配同一个server)session稳定。
随机权重选择,根据服务器情况对服务器选择概率进行加权。
轮询
加权轮询
平滑加权轮询
普通轮询的计数器机制
平滑加权轮询的降权机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package confimport ( "hash/crc32" "math/rand" "time" ) type HttpServer struct { Host string `json:"host"` } func NewHttpServer (host string ) *HttpServer { return &HttpServer{Host: host} } type LoadBalance struct { Servers []*HttpServer } func NewLoadBalance () *LoadBalance { return &LoadBalance{Servers:make ([]*HttpServer, 0 )} } func (l *LoadBalance) AddServer (server *HttpServer) { l.Servers = append (l.Servers, server) } func (l *LoadBalance) SelectServerByRandom () *HttpServer { rand.Seed(time.Now().UnixNano()) index := rand.Intn(len (l.Servers)) return l.Servers[index] } func (l *LoadBalance) SelectServerByIpHash (ip string ) *HttpServer { sum := crc32.ChecksumIEEE([]byte (ip)) index := int (sum) % len (l.Servers) return l.Servers[index] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 type HttpServer struct { Host string Weight int } func NewHttpServer (host string ,weight int ) *HttpServer { return &HttpServer{Host: host,Weight:weight} } type LoadBalance struct { Servers []*HttpServer } func NewLoadBalance () *LoadBalance { return &LoadBalance{Servers:make ([]*HttpServer, 0 )} } func (l *LoadBalance) AddServer (server *HttpServer) { l.Servers = append (l.Servers, server) } var LB *LoadBalancevar ServerIndexs []int func init () { LB= NewLoadBalance() LB.AddServer(NewHttpServer("http://localhost:9091" ,5 )) LB.AddServer(NewHttpServer("http://localhost:9092" ,15 )) for i,server := range LB.Servers { if server.Weight > 0 { for j:=0 ;j<server.Weight;j++{ ServerIndexs = append (ServerIndexs,i) } } } } func (l *LoadBalance) SelectServerByWeightRandom () *HttpServer { rand.Seed(time.Now().UnixNano()) index := rand.Intn(len (ServerIndexs)) return l.Servers[ServerIndexs[index]] } func (l *LoadBalance) SelectServerByWeightRandom2 () *HttpServer { rand.Seed(time.Now().UnixNano()) sumList := make ([]int , len (l.Servers)) sum := 0 for i, s := range l.Servers{ sum += s.Weight sumList[i] = sum } rad := rand.Intn(sum) for i, v := range sumList{ if rad < v{ return l.Servers[i] } } return l.Servers[0 ] }
此处使用了两种方式实现
基于数组空间的随机 ,如web1:web2 = 2:3,则随机空间为 [0,0,1,1,1]
基于数字区间的随机,如web1:web2 = 2:3,则随机数可能落在: [0,2],[2,5]
1 2 3 4 5 6 7 8 9 10 11 type LoadBalance struct { Servers []*HttpServer CurIndex int } func (l *LoadBalance) SelectServerByRound () *HttpServer { server := l.Servers[l.CurIndex] l.CurIndex = (l.CurIndex+1 ) % len (l.Servers) return server }
加权轮询
直接参考随机加权[0,0,1,1,1],[0,0,1,1,1],[0,0,1,1,1]…..
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (l *LoadBalance) SelectServerByRoundWeight () *HttpServer { server := l.Servers[0 ] sum := 0 for i,s := range l.Servers{ sum += s.Weight if l.CurIndex < sum{ server = s if l.CurIndex == sum-1 && i!=len (l.Servers)-1 { l.CurIndex++ }else { l.CurIndex = (l.CurIndex+1 ) % sum } println (l.CurIndex) break } } return server }
健康检查 基础健康检查监控程序 使用Head请求,不需要返回Body。定时对每个服务器进行通讯检查,并重新设置其状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 type HttpServer struct { Host string Status int FailCount int SuccessCount int } type HttpServers []*HttpServertype LoadBalance struct { Servers HttpServers } func checkServers (servers HttpServers) { t := time.NewTicker(time.Second *3 ) checker := NewHttpChecker(servers) for { select { case <-t.C:{ checker.Check(time.Second * 2 ) for _,server := range servers { println (server.Host,server.Status) } println ("--------------------------" ) } } } } var LB *LoadBalancefunc init () { LB= NewLoadBalance() LB.AddServer(NewHttpServer("http://localhost:9091" )) LB.AddServer(NewHttpServer("http://localhost:9092" )) go func () { checkServers(LB.Servers) }() } type HttpChecker struct { Servers HttpServers } func NewHttpChecker (servers HttpServers) *HttpChecker { return &HttpChecker{Servers: servers} } func (h *HttpChecker) Check (timeout time.Duration) { client := http.Client{} for _, server := range h.Servers{ res,err := client.Head(server.Host) if res != nil { defer res.Body.Close() } if err != nil { server.Status = 0 continue } if res.StatusCode != http.StatusOK { server.Status = 1 }else { server.Status = 0 } } }
带计数器的健康检查监控程序 当一次网络不通时,不立刻将状态置为失败,而是给定一个失败次数,在阈值内先不调整为Down。成功时也是一样,待成功连续计数达到阈值才调整为UP。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 type HttpChecker struct { Servers HttpServers FailMax int } func NewHttpChecker (servers HttpServers) *HttpChecker { return &HttpChecker{Servers: servers,FailMax:5 } } func (h *HttpChecker) Check (timeout time.Duration) { client := http.Client{} for _, server := range h.Servers{ res,err := client.Head(server.Host) if res != nil { defer res.Body.Close() } if err != nil { h.Fail(server) continue } if res.StatusCode == http.StatusOK { h.Success(server) }else { h.Fail(server) } } } func (h *HttpChecker) Fail (server *HttpServer) { if server.FailCount >= h.FailMax{ server.Status = 1 }else { server.FailCount++ } } func (h *HttpChecker) Success (server *HttpServer) { if server.FailCount > 0 { server.FailCount-- }else { server.Status = 0 } } func IsAllDown (servers HttpServers) bool { count := 0 for _, s := range servers{ if s.Status == 1 { count++ } } return count == len (servers) } func (l *LoadBalance) SelectServerByRound () *HttpServer { server := l.Servers[l.CurIndex] l.CurIndex = (l.CurIndex+1 ) % len (l.Servers) if server.Status == 1 && !IsAllDown(l.Servers){ return l.SelectServerByRound() } return server }
基于降权的健康检查监控程序
给server增加一个降权值:FailWeight
每次失败,FailWeight += weight*0.2
加权轮询过程中,真正的权重 Weight = weight - FailWeight,如果是0则不选择(Down)
健康检查成功,FailWeight = 0
参考:
https://www.bilibili.com/video/BV1Ft4y1y7pq?p=11