研究Fabric中Etcd的Raft应用
先看一下etcd raft library
中的rafte示例结构():
从图中可以看出,Etcd的raft示例的大致流程:
- 首先需要启动节点RaftNode
- 应用层,通过proprseC 和 confChangeC 通知RaftNode新的提案或者配置变更。
- 每一个RaftNode都会启动一个独立的后台goroutine来完成回放WAL日志、启动网络组件等初始化操作。
- 通过onMessage接收来自核心引擎的提案
- 处理提案,并推送RaftNode:
raftNode.Propose()
- 等待Raft状态机ready
- 收到ready时存储wal和触发snapshot,向从节点sendMessage并
raftNode.Advance()
通知应用层已经保存进度到最后一个Ready - punishEntry并提交区块到存储
- 从节点收到message后,推进状态机。
raftNode.Step()
Fabric的共识服务设计成了可插拔的模块,以此满足了根据不同应用场景切换不同共识选项的需求。在Hyperledger Fabric最新版本中,Fabric系统的共识模块中实现了三种共识算法,其中包括Solo,Kafka以及Raft算法。官方推荐的是使用Raft共识算法,但是为了更好地理解Fabric中的共识模块,我们也简单介绍一下Solo和Kafka这两种共识算法。
- solo共识:假设网络环境中只有一个排序节点,从Peer节点发送来的消息由一个排序节点进行排序和产生区块。由于排序服务只有一个排序节点为所有的peer节点服务,虽然可以肯定保证顺序一致性,但是没有高可用性和可扩展性,所以不适合用于生产环境,只能用于开发和测试环境。
- Kafka共识:Kafka是一个分布式的流式信息处理平台,目标是为实时数据提供统一的、高吞吐、低延迟的性能。Hyperledger Fabric之前版本的核心共识算法通过Kafka集群实现,简单来说,就是通过Kafka对所有交易信息进行排序(如果系统存在多个通道,则对每个通道分别排序)。
- Raft共识:Raft是Hyperledger Fabric在1.4.1版本中引入的,它是一种基于 etcd 的崩溃容错(CFT)排序服务。Raft 遵循 "领导者和追随者" 模型,其中领导者在通道中的排序节点之间动态选出(这个节点集合称为"consenter set"),该领导者将消息复制到跟随者节点。Raft保证即使在小部分(≤ (N-1)/2)节点故障的情况下,系统仍然能正常对外提供服务,所以Raft被称为"崩溃容错"。
其实,Hyperledger Fabric在1.4.1版本以前,它的核心共识算法通过Kafka集群实现,但是在1.4.1版本之后,Fabric推荐使用Raft算法实现节点的共识。其实从提供服务的视角来看,基于Raft和Kafka的排序服务是类似的,他们都是基于CFT(crash fault tolerant)模型的排序服务,并且都使用了主从节点的设置。但是为什么Hyperledger Fabric选择Raft算法呢?我们列举了Raft相较于Kafka所展现出的优势来回答这个问题。
- 第一点,Raft 更容易设置。虽然 Kafka 有很多崇拜者,但即使是那些崇拜者也(通常)会承认部署 Kafka 集群及其所必须的 ZooKeeper 集群会很棘手,需要在 Kafka 基础设施和设置方面拥有高水平的专业知识。此外,使用 Kafka 管理的组件比使用 Raft 管理的组件多,Kafka 有自己的版本,必须与排序节点协调。而使用 Raft,所有内容都会嵌入到排序节点中。
- 第二点,Kafka和zookeeper的设计不适用于大型网络。它们的设计是CFT模型,但局限于运行在比较紧密的主机上。也就是说,需要有一个组织专门运行Kafka集群。鉴于此,当有多个组织使用基于Kafka排序服务的时候,其实没有实现去中心化,因为所有的节点连接的都是由一个组织单独控制的Kafka集群。如果使用Raft算法,每个组织可以贡献排序节点,共同组成排序服务,可以更好的去中心化。
- 第三点,Raft是原生支持的,而Kafka需要经过复杂的步骤部署,并且需要单独学习成本。而且Kafka和Zookeeper的支持相关的issue要通过apache来处理,而不是Hyperledger Fabric。Raft的实现是包含在Fabric社区的,开发支持更加便利。
- 第四点,Raft 是向开发拜占庭容错(BFT)排序服务迈出的第一步。正如我们将看到的,Fabric 开发中的一些决策是由这个驱动的。Fabric使用Raft共识算法是向BFT类算法过渡的步骤。
Hyperledger Fabric对Raft算法的核心实现代码都是放在fabric/orderer/consensus/etcdraft
包下的,这里主要包含几个核心的数据结构.
- Chain接口
- Chain结构体
- node结构体
首先,Chain接口的定义在fabric/orderer/consensus/etcdraft/consensus.go
文件下,它主要定义了排序节点对接收到的客户端发送来的消息的处理操作,它的详细定义如下:
// Chain defines a way to inject messages for ordering.
type Chain interface {
// 负责对普通交易消息进行处理排序。当排序服务在 broadCast 接口收到消息进行校验和过滤之后,就交由对应 Chain 实例进行处理。
Order(env *cb.Envelope, configSeq uint64) error
Configure(config *cb.Envelope, configSeq uint64) error
WaitReady() error
Errored() <-chan struct{}
// Start()负责启动此 Chain 服务。
Start()
Halt()
}
其次,Chain结构体实现了Chain接口,它里面主要定义了一些通道(channel)用于节点间的通信,以便根据通信消息做相应的操作。
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
rpc RPC // 节点与外部节点进行通信的对象,RPC 是一个接口,包含两个方法SendConsensus 和 SendSubmit。前面这种用于节点间 raft 信息的通讯,后者用于转发交易请求给 leader 节点。
raftID uint64
channelID string
lastKNownleader uint64
ActiveNodes atomic.Value
submitC chan *submit // 接收 Orderer 客户端提交的共识请求消息的通道
applyC chan apply // 接收 raft 节点间应用消息的通道
observeC chan<- raft.softState
haltC chan struct{}
doneC chan struct{}
startC chan struct{}
snapC chan *raftpb.Snapshot //接收 raft 节点快照数据的通道
gcC chan *gc
…
Node *node // 封装了底层 raft 库的节点实例
…
}
最后,node结构体主要用于将Fabric自己实现的Raft上层应用和etcd的底层Raft实现连接起来,可以说node结构体是它们之间通信的桥梁,正是它的存在屏蔽了Raft实现的细节。
type node struct {
chainID string
logger *flogging.FabricLogger
metrics *Metrics
unreachableLock sync.RWMutex
unreachable map[uint64]struct{}
tracker *Tracker
storage *RaftStorage
config *raft.Config
rpc RPC
chain *Chain // 前面定义的Fabric自己实现的Chain结构体
tickInterval time.Duration
clock clock.Clock
Metadata *etcdraft.BlockMetadata
subscriberC chan chan uint64
raft.Node // etcd底层的Raft中的节点接口
}
Raft启动
Raft的启动入口位于fabric/orderer/consensus/etcdraft/chain.go
文件中,在Chain的Start()方法中会启动etcdraft/node.go
中的node.start()
,而node.start()方法中进而启动etcd已经封装好的raft.StartNode()方法。
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
...
//启动Node节点
c.Node.start(c.fresh, isJoin)
close(c.startC)
close(c.errorC)
go c.gc() //从MemoryStorage中获取快照,并将其持久化到wal和磁盘。
go c.run()
es := c.newevictionSUSPECTor()
interval := DefaultleaderlessCheckInterval
if c.opts.leaderCheckInterval != 0 {
interval = c.opts.leaderCheckInterval
}
c.periodicChecker = &PeriodicCheck{
Logger: c.logger,
Report: es.confirmSuspicion,
ReportCleared: es.clearSuspicion,
CheckInterval: interval,
Condition: c.SUSPECTeviction,
}
c.periodicChecker.Run()
}
Chain中的Start方法主要完成了启动etcdraft.Node端的循环来初始化Raft集群节点。而且Chain里面通过调用c.run()实现了通过循环处理客户端和Raft底层发送的消息。
我们再来看etcdraft.Node端的Start方法,它作为Chain端和raft/node端的桥梁,会根据Chain中传递的元数据配置信息获取启动Raft节点的ID信息,并且调用底层的Raft.StartNode方法启动节点,并且像Chain端中一样会启动n.run()来循环处理消息。
func (n *node) start(fresh, join bool) {
…
var campaign bool
if fresh {// 是否是新节点标记位
if join {// 是否是新加入节点标记位
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")
} else {
n.logger.Info("Starting raft node as part of a new channel")
sha := sha256.Sum256([]byte(n.chainID))
number, _ := proto.DecodeVarint(sha[24:])
if n.config.ID == number%uint64(len(raftPeers))+1 {
campaign = true
}
}
// 调用raft/node中的启动节点函数,初始化raft
n.Node = raft.StartNode(n.config, raftPeers)
} else {
n.logger.Info("Restarting raft node")
n.Node = raft.RestartNode(n.config)
}
n.subscriberC = make(chan chan uint64)
// run方法中会启动一个循环用来接收raft节点发来的消息,在这里经过进一步处理后,转发给Chain层进行处理,消息的转发机制都是通过通道来完成的。
go n.run(campaign)
}
最后,在etcdraft/node中启动的raft.StartNode()表示进一步启动了Raft底层的Node节点,在这里会进行Raft的初始化,读取配置启动各个节点以及初始化logindex等。与前面的启动流程一样,它同样会开启一个run方法以循环的方法不断监听各通道的信息来实现状态的切换和做出相应的动作。
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)
n := newNode(rn)
go n.run()
return &n
}
chain.run
里面定义了,成为主节点后,监听chan *common.Block
通道的proposal信息,并将proposal信息提交给etcd的Raft.Propose()
func (c *Chain) run() {
...
becomeleader := func() (chan<- *common.Block, context.CancelFunc) {
ch := make(chan *common.Block, c.opts.MaxInflightBlocks)
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch: //监听通道,接受Proposal的区块信息
data := protoutil.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != nil {
c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
return
}
c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)
case <-ctx.Done():
c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
return
}
}
}(ctx, ch)
}
...
}
n.run()
Raft的业务层处理逻辑,和chainmaker类似。主要是对etcd的raft状态机吐出的ready的处理。
func (n *node) run(campaign bool) {
for {
select {
case <-raftTicker.C():
// grab raft Status before ticking it, so `RecentActive` attributes
// are not reset yet.
status := n.Status()
n.Tick()
n.tracker.Check(&status)
case rd := <-n.Ready():
startStoring := n.clock.Now()
// 第一步,存wal
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
duration := n.clock.Since(startStoring).Seconds()
n.metrics.DataPersistDuration.Observe(float64(duration))
if duration > halfElectionTimeout {
n.logger.Warningf("WAL sync took %v seconds and the network is configured to start elections after %v seconds. Your disk is too slow and may cause loss of quorum and trigger leadership election.", duration, electionTimeout)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
}
if notifyleaderChangeC != nil && rd.softState != nil {
if l := atomic.LoadUint64(&rd.softState.Lead); l != raft.None {
select {
case notifyleaderChangeC <- l:
default:
}
notifyleaderChangeC = nil
}
}
// skip empty apply
if len(rd.CommittedEntries) != 0 || rd.softState != nil {
n.chain.applyC <- apply{rd.CommittedEntries, rd.softState}
}
if campaign && rd.softState != nil {
leader := atomic.LoadUint64(&rd.softState.Lead) // etcdraft requires atomic access to this var
if leader != raft.None {
n.logger.Infof("leader %d is present, quit campaign", leader)
campaign = false
close(elected)
}
}
n.Advance()
// Todo(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
case notifyleaderChangeC = <-n.subscriberC:
case <-n.chain.haltC:
raftTicker.Stop()
n.Stop()
n.storage.Close()
n.logger.Infof("Raft node stopped")
close(n.chain.doneC) // close after all the artifacts are closed
return
}
}
}
Fabric Raft机制的交易处理流程 1. 提交提案
首先,客户端将会把已经背书的交易提案以broadcast请求的形式转发给Raft集群的leader进行处理。我们在第二节中也提到了,Fabric中的交易可以分为两类,一类是普通交易,另一类是部署交易(也叫做配置交易)。这两类请求将分别调用不同的函数,即Order和Configure函数来完成交易提案的提交。
// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.normalProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
c.Metrics.ConfigProposalsReceived.Add(1)
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
2. 转发交易提案到leader
我们从上面的源代码中可以注意到,不论是何种交易类型,里面都会调用Submit方法来提交交易提案。在Submit方法中,主要做的事就是将请求消息封装为结构体并且写入指定的一个通道中(submitC)以便传递给Chain进行处理。此外,它还会判断当前节点是否是leader,如果不是,还会将消息重定向给leader节点。
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
…
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}: // 将消息封装并且写入submitC通道
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID { // 当前节点不是leader,则转发消息给leader
if err := c.forwardToleader(lead, req); err != nil {
return err
}
}
…
return nil
}
3. 对交易排序
Chain端从submitC通道中将不断接收交易并将它们进行排序处理。
在ordered方法中,将根据不同类型的消息执行不同的排序操作。对于接收到是通道配置消息,比如通道创建、通道配置更新等。先调用ConsensusSupport对配置消息进行检查和应用,然后直接调用 BlockCutter.Cut() 对报文进行切块,这是因为配置信息都是单独成块;而对于普通交易消息,则直接校验之后,调用 BlockCutter.Ordered() 进入缓存排序,并根据出块规则决定是否出块。
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]* common.Envelope, pending bool, err error) {
if c.isConfig(msg.Payload) {
// 配置消息
…
batch := c.support.BlockCutter().Cut()
batches = [][]*common.Envelope{}
if len(batch) != 0 {
batches = append(batches, batch)
}
batches = append(batches, []*common.Envelope{msg.Payload})
return batches, false, nil
}
// 普通交易信息
if msg.LastValidationSeq < seq {
…
}
batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
return batches, pending, nil
}
4. 打包区块
交易消息经c.ordered处理之后,会得到由BlockCutter返回的数据包bathches(可打包成块的数据)和缓存是否还有数据的信息。如果缓存还有余留数据未出块,则启动计时器,否则重置计时器,这里的计时器由case timer.C处理。
接下来,将会调用propose方法来打包交易为区块。propose会根据batches数据包调用createNextBlock打包出block ,并将block传递给c.ch通道(只有leader具有propose的权限)。而如果当前交易是配置信息,还需要标记处当前正在进行配置更新的状态。
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch) // 根据当前批次创建一个区块
c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b: // 将block传递给c.ch通道,leader可以通过这个通道收到这个区块
default:
c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
}
// if it is config block, then we should wait for the commit of the block
if protoutil.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
}
5. Raft对区块的共识
leader将会前面说的区块通过调用c.Node.Propose将数据传递给底层Raft状态机。这里的Propose就是提议将数据写入到各节点的日志中,这里也是实现节点间共识的入口方法。
Propose就是将日志广播出去,要所有节点都尽量保存起来,但还没有提交,等到leader收到半数以上的节点都响应说已经保存完了,leader这时就可以提交了,下一次Ready的时候就会带上committedindex。
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
6. 保存区块
经过Raft共识后,节点需要将区块写入到本地,这里Raft底层会通过通道的方式传递保存区块到本地的消息(即CommittedEntries不为空的消息)。在这里,Fabric通过实现apply方法完成了保存区块的功能。
func (c *Chain) apply(ents []raftpb.Entry) {
…
for i := range ents {
switch ents[i].Type {
case raftpb.Entrynormal:// 如果是普通entry消息
…
block := protoutil.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index) // 写入区块到本地
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:// 如果是配置entry消息
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
…
if ents[i].Index > c.appliedindex {
c.appliedindex = ents[i].Index
}
}
}
原文地址:https://cloud.tencent.com/developer/article/2065530