最新消息:一个只会PHP的程序员不是好搬砖的

etcd-raft网络传输组件实现分析

开发工具 时光弧线 27浏览

前言

之前我们提到过,etcd-raft的实现比较特殊,它将raft核心与网络传输、WAL日志、Snapshot等功能分开,成为独立的模块,各模块之间主要通过编程接口进行函数调用,使用channel完成消息传输。这里我们来深入分析下etcd-raft网络消息传输模块的实现原理。

etcd-raft网络传输模块主要用于在一个raft集群的节点之间进行raft协议的消息传输,这里的消息包括raft协议中定义的所有消息类型,例如选主时的投票消息、日志复制消息、snapshot拷贝消息、集群节点变更消息等等等等。

每个节点运行时,不仅可能需要向其他节点发送raft协议消息,如Leader节点,还需要从其他节点接收raft协议消息,如Follower节点。

因此,etcd-raft网络模块的核心功能有:

  • 建立去其他节点的消息发送通道;
  • 打开本节点的消息接收通道。

照例,我们会从“数据结构”、“关键流程”等几个方面描述其内部实现。

数据结构

Transporter

type Transporter interface {
    Start() error
    Handler() http.Handler 
    Send(m []raftpb.Message)
    SendSnapshot(m snap.Message)
    AddRemote(id types.ID, urls []string)
    AddPeer(id types.ID, urls []string)
    RemovePeer(id types.ID)
    RemoveAllPeers()
    UpdatePeer(id types.ID, urls []string)
    ActiveSince(id types.ID) time.Time 
    Stop()
}

与etcd的实现风格类似,定义网络传输模块的接口,应用面向该接口编程,减轻模块间的耦合性。

Transport

type Transport struct {
    DialTimeout time.Duration 
    DialRetryFrequency rate.Limit 
    TLSInfo transport.TLSInfo 
    ID          types.ID 
    URLs        types.URLs 
    ClusterID   types.ID 
    Raft Raft 
    Snapshotter *snap.Snapshotter 
    ServerStats *stats.ServerStats 
    LeaderStats *stats.LeaderStats 
    ErrorC chan error

    streamRt   http.RoundTripper 
    pipelineRt http.RoundTripper 
    mu      sync.RWMutex 
    remotes map[types.ID]*remote

    peers   map[types.ID]Peer 
    prober probing.Prober 
}

Transport是Transporter的一个具体实现,应用程序启动时应该初始化一个该实例接下来并使用该实例进行网络数据传输。

该结构中有几个比较关键的成员:

  • Raft:Raft状态机,网络传输模块仅仅负责协议消息的收发,依赖状态机处理协议消息
  • peers:集群其他节点信息,因为要与其他节点进行通信,必须要维护其他节点信息
  • remotes:暂时不理解其与peers有什么区别

接下来会详细描述其关联数据结构。

Raft

type Raft interface {
    Process(ctx context.Context, m raftpb.Message) error
    IsIDRemoved(id uint64) bool
    ReportUnreachable(id uint64)
    ReportSnapshot(id uint64, status raft.SnapshotStatus)
}

Raft是对Raft状态机的抽象,之所以需要这样一个接口是因为对于网络传输模块来说,如何处理接收到的Raft协议消息是个问题,有了这个状态机,网络传输模块只消将协议消息交给Raft模块即可,比如直接调用Raft.Process。

当然,该状态机需要由应用去实现,如我们前面描述的raft协议的示例程序就实现了该接口。

Peer & peer

type Peer interface {
    send(m raftpb.Message)
    sendSnap(m snap.Message)
    update(urls types.URLs)
    attachOutgoingConn(conn *outgoingConn)
    activeSince() time.Time 
    stop()
}

type peer struct {
    id types.ID 
    r  Raft 

    status *peerStatus
    picker *urlPicker

    msgAppV2Writer *streamWriter
    writer         *streamWriter
    pipeline       *pipeline
    snapSender     *snapshotSender
    msgAppV2Reader *streamReader
    msgAppReader   *streamReader

    recvc chan raftpb.Message 
    propc chan raftpb.Message 
    mu     sync.Mutex 
    paused bool

    cancel context.CancelFunc 
    stopc  chan struct{}
}

Peer抽象了本节点与之通信的集群对等节点,而peer则是Peer的一个具体实现。

peer结构中最关键的是与对端节点进行通信各种通道,如上,包括了pipeline、streamWriter等消息传输方式,目前暂时还不理解为什么要有如此多的消息传输方式,可能是针对不同的消息类型采用相应的传输方式,效率更高。

对于当前节点来说,集群中的每个其他节点在本地都会有一个peer与之对应,peer负责将消息发送至对端节点,接收对端节点消息。

关键流程

消息发送

当应用需要向raft集群的其他节点发送消息时,调用的是Transpotrer.Send()方法,该方法实现如下:

func (t *Transport) Send(msgs []raftpb.Message) {
    for _, m := range msgs {
        if m.To == 0 {
            continue
        }
        to := types.ID(m.To)
        t.mu.RLock()
        p, pok := t.peers[to]
        g, rok := t.remotes[to]
        t.mu.RUnlock()
        if pok {
            if m.Type == raftpb.MsgApp {
               t.ServerStats.SendAppendReq(m.Size())
            }
            p.send(m)
            continue
        }
        if rok {
            g.send(m)
            continue
        }
    }
}

这里的过程还是很简单的,主要就是找到该消息要发往哪个节点peer,并调用peer的send()方法。

// 根据消息类型选择合适的发送通道 
// 例如,snapshot消息选择pipeline发送 
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
    var ok bool 
    if isMsgSnap(m) {
        return p.pipeline.msgc, pipelineMsg
    } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
        return writec, streamAppV2
    } else if writec, ok = p.writer.writec(); ok {
        return writec, streamMsg
    }
    return p.pipeline.msgc, pipelineMsg
}

func (p *peer) send(m raftpb.Message) {
    p.mu.Lock()
    paused := p.paused
    p.mu.Unlock()

    if paused {
        return 
    }
    writec, name := p.pick(m)
    select {
    case writec <- m:
    default:
        p.r.ReportUnreachable(m.To)
        if isMsgSnap(m) {
            p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
        }
    }
}

peer的发送复杂在于需要根据不同的消息类型选择不同的通道,接下来就是将消息塞进该通道的管道writec中,接下来管道的接收者会从管道的另一端取出消息,再行发送。

以pipeline发送通道为例说明,pipeline消息发送通道在启动时会创建若干后台协程监听消息管道,如下:

func (p *pipeline) start() {
    p.stopc = make(chan struct{})
    p.msgc = make(chan raftpb.Message, pipelineBufSize)
    p.wg.Add(connPerPipeline)
    for i := 0; i < connPerPipeline; i++ {
        go p.handle()
    }
}

func (p *pipeline) handle() {
    defer p.wg.Done()
    for {
        select {
        case m := <-p.msgc:
            start := time.Now()
            err := p.post(pbutil.MustMarshal(&m))
            end := time.Now()
            if err != nil {
                ......
            }
        ....
    }
}    

这里就很明白了,handle()方法中会从管道p.msgc获取消息,并调用post()方法将消息发送给对端。

stream消息发送通道处理流程大体也是如此,不再赘述。

消息接收

etcd-raft使用http协议在集群节点之间进行消息传输。应用在启动时需要显示地启动http服务,但是处理的handler是由Transporter提供,以etcd-raft自带的示例应用为例:

func (rc *raftNode) serveRaft() {
    ......
    err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
    ......
}

Handler的实现如下:
func (t *Transport) Handler() http.Handler {
    pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
    streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
    snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
    mux := http.NewServeMux()
    mux.Handle(RaftPrefix, pipelineHandler)
    mux.Handle(RaftStreamPrefix+"/", streamHandler)
    mux.Handle(RaftSnapshotPrefix, snapHandler)
    mux.Handle(ProbingPrefix, probing.NewHandler())
    return mux
}

网络消息传输模块对于不同的url,定义了不同的处理方法。我们同样以pipelineHandler为例说明:

func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
        w.Header().Set("Allow", "POST")
        http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
        return
    }
    w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
    // 请求数据读取以及检查
    ......
    if err := h.r.Process(context.TODO(), m); err != nil {
        ......
    }
    w.WriteHeader(http.StatusNoContent)
}

逻辑也很简单,接收数据并解码raft协议消息,最终调用Raft.Process()来处理消息,而这是由应用实现的,一般来说,Raft.Process()实现也很简单,将该消息简单地交给更底层的raft协议状态机处理即可,关于这个,我们会在接下来的其他部分进行更为详细的说明。

转载请注明:53IT » etcd-raft网络传输组件实现分析