单机版区块链的实现

全部代码都已开源在git上 hyblockchain

前置知识

开始之前,首先要了解一些基础知识,这里简单的介绍一下

区块链的结构

区块(Block)包含:
区块头(Block Header):包含前一个区块哈希、Merkle 根、时间戳、随机数等
交易列表(Transactions):每个区块中打包的所有交易
链(Chain):每个区块通过 previousHash 指向前一个区块形成链

哈希函数

作用:确保数据完整性、防篡改
常用:SHA256(Bitcoin)、Keccak256(Ethereum)
本实验使用的是SHA256

交易系统

  1. 交易结构(Transaction)

    • 发送方地址、接收方地址、金额、签名(可选)
    • 交易池(Mempool):待打包的交易集合
  2. 简单账户模型或UTXO模型

    • 账户模型(如 Ethereum):记录每个地址的余额
    • UTXO 模型(如 Bitcoin):每个未花费输出作为新交易输入

状态树 MPT

使用的是key,value来实现

叶子节点:实际存放值的节点
分支节点:有16个子节点,用来指向多条路径
扩展节点:存储公共前缀,它的下一节点只能是分支节点

数据的持久化

如LevelDB、BoltDB 等

  • 区块数据、交易池、状态等写入数据库
  • 简单项目可以先用文件系统或内存

数据库的实现

使用LevelDB数据库,由Google LevelDB 的GO版本实现 github.com/syndtr/goleveldb/leveldb,用LevelDB封装模块,实现自定义的接口kvstore.KVStore

leveldb 是 kvstore 接口的一种具体实现,为了解耦,适应不同的后端,如:Redis,InMemoryDB等

首先定义kvstore接口

package kvstore

import (
"io"
)

// KVStore 定义了键值存储的接口
type KVStore interface {
// 基本操作
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Has(key []byte) (bool, error)

// 批量操作
Batch() Batch
Write(batch Batch) error

// 迭代器
NewIterator(prefix []byte) Iterator

// 关闭存储
io.Closer
}

// Batch 定义了批量操作的接口
type Batch interface {
Put(key []byte, value []byte)
Delete(key []byte)
Reset()
Len() int
}

// Iterator 定义了迭代器的接口
type Iterator interface {
Next() bool
Key() []byte
Value() []byte
Error() error
Release()
}

用LeveLDB进行封装(实现里面的接口方法)


package leveldb

import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
"hyblockchain/kvstore"
)

// LevelDB 实现了KVStore接口
type LevelDB struct {
db *leveldb.DB
}

// NewLevelDB 创建一个新的LevelDB实例
func NewLevelDB(path string) (kvstore.KVStore, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
return &LevelDB{db: db}, nil
}

// Get 获取指定键的值
func (l *LevelDB) Get(key []byte) ([]byte, error) {
return l.db.Get(key, nil)
}

// Put 存储键值对
func (l *LevelDB) Put(key, value []byte) error {
return l.db.Put(key, value, nil)
}

// Delete 删除指定键
func (l *LevelDB) Delete(key []byte) error {
return l.db.Delete(key, nil)
}

// Has 检查键是否存在
func (l *LevelDB) Has(key []byte) (bool, error) {
return l.db.Has(key, nil)
}

// Batch 创建新的批量操作
func (l *LevelDB) Batch() kvstore.Batch {
return &levelDBBatch{
batch: new(leveldb.Batch),
}
}

// Write 执行批量操作
func (l *LevelDB) Write(batch kvstore.Batch) error {
b, ok := batch.(*levelDBBatch)
if !ok {
return nil
}
return l.db.Write(b.batch, nil)
}

// NewIterator 创建新的迭代器
func (l *LevelDB) NewIterator(prefix []byte) kvstore.Iterator {
iter := l.db.NewIterator(util.BytesPrefix(prefix), nil)
return &levelDBIterator{iter: iter}
}

// Close 关闭数据库连接
func (l *LevelDB) Close() error {
return l.db.Close()
}

// levelDBBatch 实现了Batch接口
type levelDBBatch struct {
batch *leveldb.Batch
}

func (b *levelDBBatch) Put(key, value []byte) {
b.batch.Put(key, value)
}

func (b *levelDBBatch) Delete(key []byte) {
b.batch.Delete(key)
}

func (b *levelDBBatch) Reset() {
b.batch.Reset()
}

func (b *levelDBBatch) Len() int {
return b.batch.Len()
}

// levelDBIterator 实现了Iterator接口
type levelDBIterator struct {
iter iterator.Iterator
}

func (i *levelDBIterator) Next() bool {
return i.iter.Next()
}

func (i *levelDBIterator) Key() []byte {
return i.iter.Key()
}

func (i *levelDBIterator) Value() []byte {
return i.iter.Value()
}

func (i *levelDBIterator) Error() error {
return i.iter.Error()
}

func (i *levelDBIterator) Release() {
i.iter.Release()
}

MPT,储存状态

mpt是默克尔树,用来存储状态,它的结构分为:叶子节点,分支节点,扩展节点,使用key,value对来实现

叶子节点,扩展节点,分支节点

开始实现mpt的时候,首先要定义好它的三种节点,一个node节点是由一个结构体实现的,那么这个结构体包含哪些呢?

首先肯定有节点类型(nodetype),key,value(仅叶子节点使用)
那么考虑到分支节点是拥有16个子节点的数组(为什么分支节点是16子节点,可以在niblle篇章找到答案),所以还得有个16长度大小的数组,同时这个当数组索引为零时,就可以给扩展节点使用作为它的下一个节点。

type NodeType byte

const (
BranchNode NodeType = 0
ExtensionNode NodeType = 1
LeafNode NodeType = 2
)

// Node 表示MPT中的一个节点
type Node struct {
Type NodeType // 节点类型
Key []byte // 压缩路径(用于扩展/叶子节点)
Value []byte // 存储值(仅叶子节点用)
Children [16]*Node // 子节点数组(仅分支节点使用)
Hash []byte // 当前节点的哈希(默认为 nil,需外部生成)
}

接下来还要实现创建这三种节点的函数,根据结构体的内容,很好实现的,比如创建一个分支节点

// NewBranchNode 创建一个新的分支节点
func NewBranchNode() *Node {
return &Node{
Type: BranchNode,
Children: [16]*Node{},
}
}

序列化

从上面可知,我们的节点是结构体,那又怎样实现key value呢,通过序列化来实现。

序列化:就是把一个结构体(如一个节点 Node)转成一串 字节([]byte),可以存入数据库或通过网络传输。

func (m *MPT) serializeNode(n *Node) []byte

它做的就是把不同类型的节点(Leaf / Extension / Branch)变成字节流,以便后续:

  • 写入 LevelDB(db.Put(hash, bytes))
  • 或用于计算哈希(hashNode())

叶子节点的序列化:

case LeafNode:
buf.Write(n.Key) // 写入 Key(注意是 nibbles 格式)
buf.Write(n.Value) // 写入 Value

格式:[Type=2] [Key…] [Value…]

扩展节点的序列化:

case ExtensionNode:
buf.Write(n.Key) // 路由部分 key(是 nibble 数组)
buf.Write(n.Children[0].Hash) // 连接的下一个节点的哈希

格式:[Type=1] [Key…] [Child.Hash](32 字节)

Extension 节点只保存路径和“下一跳”的指针(哈希)

分支节点的序列化:

case BranchNode:
for _, child := range n.Children {
if child != nil {
buf.Write(child.Hash) // 写入子节点的哈希
} else {
buf.Write(make([]byte, 32)) // 空位写入32个 0 占位
}
}
if n.Value != nil {
buf.Write(n.Value) // 如果该分支节点自身存了值(如 key 正好在这里结束),也写入
}

格式:
[Type=0]
[Child0.Hash (32字节)]
[Child1.Hash (32字节)]

[Child15.Hash (32字节)]
[Value(可选)]

serializeNode() 就是把节点对象 ➜ 转成固定格式的 byte slice,便于存储与计算哈希。

不同类型节点序列化格式不同,必须带类型标识,以便未来可以反序列化。

niblle

一个byte是8位,所以一个分支节点就要有256个子节点,很浪费内存,而我们使用nibble代表半个字节(4位),刚好是16个子节点,正好与我们设计的分支节点有16个子节点对应

每一层的分支就是根据下一个 nibble 值来决定走哪个方向。

举例来说:

  • key := []byte(“a”)
  • “a” 的 ASCII 是 0x61,二进制是 01100001
  • 第一个nibble是0110=6,第二个nibble是0001=1
  • nibbles = [6, 1] // 因为 0x61 的高 4 位是 6,低 4 位是 1
func bytesToNibbles(b []byte) []byte {
nibbles := make([]byte, len(b)*2)
for i, v := range b {
nibbles[i*2] = v >> 4 // 高四位
nibbles[i*2+1] = v & 0x0f // 低四位
}
return nibbles
}
//代码解释
nibbles[i*2] = v >> 4 // 0110 0001 >> 4 = 0000 0110 = 0x06 = 6
nibbles[i*2 + 1] = v & 0x0f // 0110 0001 & 0000 1111 = 0000 0001 = 0x01 = 1

计算公共前缀

找出两个字节数组(通常是两个 key 路径)前面有多少个字节是相同的,即它们的“最长公共前缀”有多长,我们构建一个树,有共同前缀的就用扩展节点存储,也是为了方便查找插入与节点的公共前缀.

func commonPrefix(a, b []byte) int {
i := 0
for i < len(a) && i < len(b) && a[i] == b[i] {
i++
}
return i
}

以上就是mpt的核心辅助函数,接下来介绍核心操作了,mpt中的增删查改

get 从mpt中获取值

查找一个值,我们是从上往下的递归查找每次比较当前节点能否匹配 key 的前缀,然后继续递归,每次都会“缩短 key”,最终 key 用光后,如果节点匹配成功,就找到;否则失败

首先获取插入key的niblle数组,沿着 nibble 数组去穿越这棵树

// Get 从MPT中获取值
func (m *MPT) Get(key []byte) ([]byte, error) {
nibbles := bytesToNibbles(key)
return m.get(m.root, nibbles)
}

核心函数是get(),传入m.root(根节点进行递归查找),niblle我们要查询的路径
在get函数中判断节点类型(前置条件是传入的节点不为空)
匹配到叶子节点

  • 说明这是该路径的最后,就不做递归调用,直接判断是否相等
    • 相等就代表已找到,返回value.
    • 不相等返回错误信息
switch n.Type {
case LeafNode:
if bytes.Equal(n.Key, key) {
return n.Value, nil
}
return nil, errors.New("key not found")
}

匹配到扩展节点
扩展节点是存储公共路径的节点

  • 有俩种情况返回错误信息
    • key的长度小于n.key,说明接下来没有路径可以继续计较查找下去了
    • key与n.key的公共前缀不相等,说明此时没有与所查找的节点路径一致
  • 当以上俩种情况都不存在,则继续进行递归调用,匹配下一个节点
  • 此时get的参数有所改变。n.Childern[0]是该扩展节点的下一个孩子节点,key[len(n.Key):]则是继续匹配除去扩展节点的路径
case ExtensionNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
return nil, errors.New("key not found")
}
return m.get(n.Children[0], key[len(n.Key):])

匹配到分支节点
找key[]数组的第一位(key[0])与分支节点匹配,继续向下递归查找

  • 只要key还存在,就继续递归匹配,如果刚好匹配完了key的路径len(key)= 0,就可直接返回value,因为已经找到
  • 都不满足以上所述,就继续递归下去,接着找
  • 此时的get参数,n.Children[key[0]],沿着这个分支节点继续找,对应的我们的参数key[1:],也要去除分支节点即找到的key[0]
    • get(n, [2, 3, 5])
    • n.Type == BranchNode,那么key[0] == 2
    • 于是我们去 n.Children[2] 找
    • 调用下一层:get(n.Children[2], [3, 5])
case BranchNode:
if len(key) == 0 {
return n.Value, nil
}
return m.get(n.Children[key[0]], key[1:])

以上三种查找值,已经介绍完,接下来介绍mpt中delete

delete 从MPT中删除键值对

我们的删除逻辑,还是一样获取要删除的key的niblle数组,是从底向上查找,进行递归,这与get的查找逻辑是相反的
它不是向我们平常一样写代码,直接删除,而是通过return n, nil (并不意味着结构没变)递归地构建新的树结构,从底向上逐层返回。最终返回newRoot 是修改后树的新根。

func (m *MPT) Delete(key []byte) error {
nibbles := bytesToNibbles(key)
newRoot, err := m.delete(m.root, nibbles)
if err != nil {
return err
}
m.root = newRoot
return m.commit()
}

在delete的函数中,还是先要判断节点类型,从最简单的叶子节点开始梳理逻辑

匹配到是叶子节点
路径相等,返回nil,表示要被删除了

switch n.Type {
case LeafNode:
if bytes.Equal(n.Key, key) {
return nil, nil
}
return n, nil
}

匹配到是扩展节点

  • 首先看是否与扩展节点相匹配,如果不是,则原路返回

  • 再进行递归查找,调用 child, err := m.delete(n.Children[0], key[len(n.Key):])
    从前可知,扩展节点下一个节点必须是分支节点,所以在递归后,分支节点的逻辑非常关键,所以先来介绍一下
    匹配到分支节点
    无论时扩展节点递归到分支节点,还是直接匹配到分支分支节点,都必须考虑删除分支节点对应的叶子节点后,是否要考虑压缩,就比如,分支节点刚好有俩个有效叶子节点,删除其中一个后,剩下那个就要考虑与分支节点或者前面还有扩展节点进行一个压缩为一个叶子节点,其核心是路径压缩

  • 首先判断key是否被消耗完了

    • 如果 key 已经消费完(len(key)==0),说明当前分支节点对应的路径上有一个值,删除它(设为 nil)
    • 否则取出 key 的第一个 nibble(key[0]),递归去对应的子节点里继续删除,删除后将子节点更新
case BranchNode:
if len(key) == 0 {
n.Value = nil
} else {
child, err := m.delete(n.Children[key[0]], key[1:])
if err != nil {
return nil, err
}
n.Children[key[0]] = child
}
  • 统计子节点
    • 遍历16个子节点,数有多少个非空的(有效);
    • lastChildIndex 记录最后一个非空节点索引(方便后续压缩用)
nonNilChildren := 0
lastChildIndex := -1
for i, child := range n.Children {
if child != nil {
nonNilChildren++
lastChildIndex = i
}
}
  • 判断是否压缩
if nonNilChildren == 1 && n.Value == nil {
child := n.Children[lastChildIndex]
if child.Type == ExtensionNode {
return NewExtensionNode(append([]byte{byte(lastChildIndex)}, child.Key...), child.Children[0]), nil
}
return NewLeafNode(append([]byte{byte(lastChildIndex)}, child.Key...), child.Value), nil
}
  • 压缩的条件
    • 当前分支节点只剩下 1个有效子节点;
    • 当前分支节点没有自己的 value(即它不是路径终点);
  • 压缩实现
    • 新节点的路径是原来的子节点路径,前面加上该子节点所在分支索引 lastChildIndex(即4bit的路径片段);
    • 如果子节点是 ExtensionNode,就把它的 Key 合并起来,构造新的 ExtensionNode(路径更长,指向下一级);
      • BranchNode -> index 2 → ExtensionNode [3, 4]
      • 压缩后为ExtensionNode [2, 3, 4]
    • 如果子节点是 LeafNode,也类似,构造新的叶子节点,路径合并,值继承
      • BranchNode->index 2 → LeafNode [3, 4] → “Hello”
      • 压缩后为LeafNode [2, 3, 4] → “Hello”
  • 如果都不满足压缩条件,就直接返回当前节点

那么当匹配到分支节点的时候,递归调用后,有是如何呢,让我们继续看

匹配到扩展节点时,要进行递归调用child, err := m.delete(n.Children[0], key[len(n.Key):]) 此时会进入分支节点的逻辑,

  • 如果返回的child是扩展节点,就进行压缩为一个新的扩展节点
  • 如果返回的是分支节点或者叶子节点,说明不能压缩了,保留原结构
case ExtensionNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
return n, nil
}
child, err := m.delete(n.Children[0], key[len(n.Key):])
if err != nil {
return nil, err
}
if child == nil {
return nil, nil
}
if child.Type == ExtensionNode {
return NewExtensionNode(append(n.Key, child.Key...), child.Children[0]), nil
}
return NewExtensionNode(n.Key, child), nil

删除的情况我们都说完了,小结一下,从n.root开始向下查找要删除的节点,找到删除后,再递归返回时,看有没有特殊情况进行压缩,一直递归返回一个新的root,此时删除已经完成

insert 在MPT中插入或更新节点

这个是非常难的逻辑了,很复杂,但是理清逻辑就很好写了
有了之前的递归思想,其实插入的逻辑就是考虑的情况比较多而已,要把每一种的逻辑都要想好

对传入的key,进行转化为niblle数组,方便我们比较路径,最后还是返回的是一个新root,与之前查找删除的逻辑有些区别的是,它先要找到插入的位置,所以就必须使用到之前说介绍的计算公共前缀的方式

匹配到叶子节点
还是看它的公共前缀是否相等,相等就插入

case LeafNode:
common := commonPrefix(n.Key, key)
if common == len(n.Key) && common == len(key) {
n.Value = value
return n, nil
}

如果没有一个与之匹配的叶子节点,即common为零,就新建一个分支节点,分别把叶子节点和插入节点放入

branch := NewBranchNode()
if common == 0 {
if len(n.Key) > 0 {
branch.Children[n.Key[0]] = NewLeafNode(n.Key[1:], n.Value)
} else {
branch.Value = n.Value
}
if len(key) > 0 {
branch.Children[key[0]] = NewLeafNode(key[1:], value)
} else {
branch.Value = value
}
return branch, nil
}

如果有公共前缀,但是不等于len(n.key)或者len(key),就创建一个扩展节点+分支节点

extension := NewExtensionNode(key[:common], branch)
if common < len(n.Key) {
branch.Children[n.Key[common]] = NewLeafNode(n.Key[common+1:], n.Value)
} else {
branch.Value = n.Value
}
if common < len(key) {
branch.Children[key[common]] = NewLeafNode(key[common+1:], value)
} else {
branch.Value = value
}
return extension, nil

叶子节点是很好理解的了

匹配到是扩展节点

要考虑俩种情况,公共前缀正好匹配,公共前缀不匹配。

计算公共前缀,等于len(n.key),再进入递归,继续沿着key[common:]路径找

common := commonPrefix(n.Key, key)
if common == len(n.Key) {
child, err := m.insert(n.Children[0], key[common:], value)
if err != nil {
return nil, err
}
n.Children[0] = child
return n, nil
}

如果不等于len(n.key),以公共前缀作为一个新的扩展节点,剩下的放入建立的分支节点中
分支一:处理扩展节点的剩余节点(common < len(n.key))

if common < len(n.Key) {
child, err := m.insert(n.Children[0], n.Key[common+1:], n.Children[0].Value)
if err != nil {
return nil, err
}
branch.Children[n.Key[common]] = child
}
  • 原扩展路径中有剩余:从 n.Key[common:] 开始递归插入原有子节点

  • 注意:n.Key[common] 是分歧点的第一个字节,要作为 BranchNode 的索引使用

  • 剩下的路径(n.Key[common+1:])挂载到新插入的子节点中

解释为什么这里要递归旧路径剩余的部分:递归 insert,是为了保留原路径的“剩余部分”,并构建新的树结构。递归只是为了让逻辑统一,不必区分是 Leaf 还是 Extension

**分支二:**处理新插入的路径(common < len(key) )

if common < len(key) {
branch.Children[key[common]] = NewLeafNode(key[common+1:], value)
} else {
branch.Value = value
}
  • 如果新插入的 key 在分歧点后还有内容,就将其余的部分作为新 LeafNode 添加进分支中。

  • 如果新 key 恰好终止在分歧点,则直接挂值到 BranchNode.Value

特殊情况没有共同前缀,直接返回分支节点

if common == 0 {
return branch, nil
}

匹配到分支节点
匹配到了一个 BranchNode,说明插入路径之前已经有公共前缀了,当前路径开始要根据 nibble 分支继续走

如果插入的路径已经完全匹配当前的节点,直接就把值赋值上去

case BranchNode:
if len(key) == 0 {
n.Value = value
return n, nil
}

如果不完全匹配,再次进行递归insert

child, err := m.insert(n.Children[key[0]], key[1:], value)
if err != nil {
return nil, err
}
n.Children[key[0]] = child
return n, nil

当路径匹配到 BranchNode,继续根据 nibble 走对应子节点递归插入;如果路径正好结束,就把值挂在当前分支节点上
插入流程图

mpt的简要流程已完成,状态讲完了,接下来继续交易池。

交易池

简单介绍与一下设计的目标

  • 以太坊 TxPool 原理:区分 pendingqueued 交易。

    • pending:交易的 nonce 与链上账户的 nonce 连续(刚好下一笔),可以立刻准备打包。

    • queued:交易的 nonce 超前,前面的交易未提交或未确认,需要排队等待。

  • 交易需要 根据 GasPrice 优先排序,GasPrice 高的交易更优先被矿工打包。

  • 支持 交易替换(nonce 相同、GasPrice 更高的交易替换旧交易)。

  • 以方便快速查询、插入和排序为目标。

交易的结构

交易的结构有俩个组成部分:交易数据和交易签名

type Transaction struct {
txdata
signature
}

交易数据(tx.data) 结合注释看

以太坊中为什么交易数据没有from地址,是因为在以太坊上,发送者地址是通过对交易内容进行签名后,使用签名和交易数据恢复出来的

type txdata struct {
To Address // 目标地址
Nonce uint64 // 交易计数器(防重放)
Value uint64 // 转账金额(单位待定)
Gas uint64 // 交易燃料上限
GasPrice uint64 // 每单位燃料价格
Input []byte // 交易输入数据(合约调用数据)
}

签名(signature) 是基于secp256k1 曲线

type signature struct {
R, S *big.Int // ECDSA 签名的两个大整数参数
V uint8 // 恢复ID,标识公钥恢复的补充信息
}

基于刚刚的为什么以太坊没有from这个字段,是因为恢复出来的,那么就要实现一个函数,进行对from地址的恢复

func (tx Transaction) From() Address {
// 1. 编码交易数据
txdata := tx.txdata
toSign, err := rlp.EncodeToBytes(txdata)
if err != nil {
panic(fmt.Errorf("RLP encoding failed: %v", err))
}
fmt.Println("Encoded txdata:", hexutil.Encode(toSign))

// 2. 计算消息哈希
msg := sha3.Keccak256(toSign)

// 3. 构造签名 sig: R(32字节) || S(32字节) || V(1字节)
sig := make([]byte, 65)

rBytes := tx.R.Bytes()
sBytes := tx.S.Bytes()

// 注意:Bytes() 可能不足 32 字节,需要左边补 0
copy(sig[32-len(rBytes):32], rBytes)
copy(sig[64-len(sBytes):64], sBytes)
sig[64] = tx.V

// 4. 恢复公钥
pubKey, err := secp256k1.RecoverPubkey(msg, sig)
if err != nil {
panic(fmt.Errorf("recover pubkey failed: %v", err))
}

// 5. 计算地址(例如 keccak(pubkey)[12:])
return PubKeyToAddress(pubKey)
}

  • 对交易数据 txdataRLP 编码和 Keccak256 哈希

  • 将签名中的 R, S, V 正确拼接成一个 65 字节的 sig 数组

  • 调用 secp256k1.RecoverPubkey() 进行恢复

  • 返回正确的发送者地址(from

交易池的设计

设计一个txpool接口,想一下交易池中需要什么功能,首先是不是需要接受交易,弹出交易,其次要更新当前状态树的根哈希,去报交易池状态与链同步,交易池发出一个交易后,是不是还要进行一个广播,那么这个接口该怎么写呢

type TxPool interface {
NewTx(tx *types.Transaction) TxPool

Pop() *types.Transaction

SetStatRoot(h hash.Hash)

NotifyTxEvent(txs []*types.Transaction)
}
功能点 说明
📥 接收交易(NewTx) 根据 nonce/gasPrice 分类,存入队列
📤 弹出交易(Pop) 为区块打包模块提供优质交易
📊 状态同步(SetStatRoot) 绑定状态树根,便于切换状态版本
📢 通知事件(NotifyTxEvent) 广播交易给外部系统(如 P2P 网络)

我们可以通过新建一个DefaultPool来是实现交易池里详细的操作

txpool数据结构

我们都知道,交易池有俩种状态,pendingqueue,它们是通过交易的nonce值来判断进入哪一种状态,实现一下如果是只有一个地址里面的交易,我们是不是通过对nonce值的排序就很好确认是否该进入pending状态,但是通常与一个交易池里不会有只有一个地址的交易,这是时候如果还只通过判断nonce值来确认状态的话,就不合理了。有一个事实,应该大家都知道,矿工会优先打包gas费高的交易,那么是不是我们就可以设计一个列表,按照gas费大小来等待被打包。

比如有A,B俩种地址,它们分别有不同的交易(nonce值在前,gas费在后):

  • A1(2,9),A2(3,4),A3(6,9),B1(7,8),B2(2,11),B3(8,8)
    • 按照同种地址按nonce大小排序,不同地址按照gas费大小排序的话,那么此时的打包顺序:B1,A1,B3,A2 ,剩下的因为nonce值不能连续,进入queue

这时我们想一下,采用哪种结构去实现这个逻辑呢

首先queue的结构,用数组实现可以吗

为什么不可以————,数组的插入和查找,是不是都要历遍后才能知道,时间复杂度为O(n),效率低下。不适合交易量大的交易池

那么我们通过对mpt的学习,知道key,value,实际上是不是一个map映射,如果我们采用map映射的结构设计queue,单说查找插入效率是不是为O(1)。因为我们的queue储存的不同交易地址的交易列表,所以key为地址,value就为对应的交易列表

queued   map[types.Address][]*types.Transaction

再来想一下刚刚的例子。

B1,A1,B3,A2 ,待打包的交易,也就是pending,如果也用map结构设计,如:

pending   map[types.Address][]*types.Transaction

那我们排好的打包顺序是不是就不能就不能体现了,因为你还是一个地址对应的交易,我们要的B1,A1,B3,A2执行顺序就表示不出来,这个时候是不是就想,如果后面时排好的交易列表,那我们的key又如何设计呢,所以如果我们还用这个map结构设计pending的话,就不能很好的表示。

这个时候就想,如果就把pending设计为交易列表,不用map映射,就可以了吗?

又来考虑一个新的问题,pending为交易列表,当一个新的交易来时,如果判断它进入pending还是queue

nonce?,那么你又如何确定pending最后的一个交易的地址呢,就算它是同一地址,你比较nonce,如果交易的noncepending最后一个交易nonce还小,那么它就要插入到前面去,这是你又如何判断它的插入的位置呢,并且还是在不考虑gas的情况下,更不要说如何确定进入queue

这个时候,如果我们把相邻的相同地址用一个盒子框住,每次比较的时候就和盒子的最后一个交易比较,就比刚刚比较容易多了

例子:

  • A1(2,10),A2(3,9),A3(4,8),B1(7,8),B2(2,11),B3(8,9)

  • 排序后:B1,A1,A2,B3,A4

  • 此时把B1看成为一个盒子1,A1,A2看成一个盒子2,B3看成一个盒子3,A3看成为一个盒子4

  • 也就是说这个交易列表是一个有多个盒子数组的列表

type pendingTxs []SortedTxs

txs pendingTxs

那么一个盒子SortedTxs的接口如何设计呢

首先 ,盒子添加一个交易,除去一个交易,比较的是时候,是不是还要gas,nonce,所以先设计车呢这样的,后续需要的时候的再来添加

type SortedTxs interface {
GasPrice() uint64
Push(tx *types.Transaction)
Pop() *types.Transaction
Nonce() uint64
}

既然定义了接口,就要去实现
**GasPrice()**,返回该盒子的第一个交易的 gasPrice

func (sorted DefaultSortedTxs) GasPrice() uint64 {
first := sorted[0]
return first.GasPrice()
}

因为你在 放入的时候是按 gasPrice 从高到低排序的,所以第一个交易就是 gasPrice 最大的交易

而且在同一个地址的交易,由于nonce小优先被执行,所以你后面的交易gas如果设计的比它大也没有什么用,所以也是一个 gasPrice 从高到低排序

Push():向当前交易组添加一笔新的交易,然后按 gasPrice 从高到低重新排序。

func (sorted *DefaultSortedTxs) Push(tx *types.Transaction) {
*sorted = append(*sorted, tx)
sort.Slice(sorted, func(i, j int) bool {
return (*sorted)[i].GasPrice() > (*sorted)[j].GasPrice()
})
}

**Pop()**:弹出当前交易组中的gasPrice 最高的交易(第一个交易)


func (sorted *DefaultSortedTxs) Pop() *types.Transaction {
if len(*sorted) == 0 {
return nil
}
tx := (*sorted)[0]
*sorted = (*sorted)[1:]
return tx
}

Nonce():获取这组交易的最后一个交易的 Nonce,表示这组交易的“最后 nonce

func (sorted DefaultSortedTxs) Nonce() uint64 {
return sorted[len(sorted)-1].Nonce()
}

在比较 SortedTxs 组时,你可以根据nonce判断它是否是账户当前应该执行的下一笔交易。
也就是说这一组 SortedTxs 是该账户待打包交易中,最早的一组连续。

现在又来考虑,当一个交易来的时候,因为我们设计的交易列表的地址不同,是不是还是不好的快速查找插入哪个地址的盒子中,这个时候就要用到我们的map,设计一个pending,使用map结构,用地址对应每个盒子数组,当一个交易来的时候,就能快速找到要插入的地址,然后再在这个地址里的盒子数组里比较,是不是连续的nonce值,不是,就放入queue

这里我们使用指针,当pending结构更改后,我们的交易列表由于指针的影响,也会对应的更改。

pendings map[types.Address][]SortedTxs

由于之前新建一个DefaultPool来实现交易池里详细的操作,所以我们先实现它的结构

type DefaultSortedTxs []*types.Transaction

type DefaultPool struct {
StatDB statdb.StatDB
all map[hash.Hash]bool
txs pendingTxs
pendings map[types.Address][]SortedTxs
queued map[types.Address][]*types.Transaction
}

StatDB statdb.StatDB :交易池依赖的状态数据库

  • 这是一个接口,负责获取和更新账户的链上状态(比如 nonce)。

  • 在交易进池时,需要知道发送方地址当前的 nonce 才能判断交易是:

    • 有效(pending)

    • 未来的(queued)

    • 重复/过期(丢弃)

比如后续会使用到的

account := pool.StatDB.Load(tx.From())
if account.Nonce >= tx.Nonce() {
// nonce 太小 → 交易已处理或无效
return
}

all map[hash.Hash]bool:去重用哈希集合,避免重复交易

  • 存储所有已进入交易池的交易哈希(tx hash

  • 用于防止同一笔交易被重复加入

交易池实际操作

在之前我们定义了txpool的接口,也就是我们交易池实际的操作接口,知道交易池结构了,现在来一一实现

首先来看,交易池中处理新交易的核心逻辑NewTx()

NewTx:将一笔新的交易 tx 放入交易池中适当的位置(pending / queue / 替换)

逻辑:

  • nonce == 当前账户的nonce+1,可以连续执行,就放入pending
  • nonce > 当前账户的nonce+1,不可连续执行,放入queue
  • nonce <= 当前账户的nonce+1,这时就看交易的gas大小,如果新来的交易gas大,即发生替换
func (pool *DefaultPool) NewTx(tx *types.Transaction) {
account := pool.StatDB.Load(tx.From())
if account.Nonce >= tx.Nonce() {
return
}
nonce := account.Nonce
blks := pool.pendings[tx.From()]
if len(blks) > 0 {
last := blks[len(blks)-1]
nonce = last.Nonce()
}
if tx.Nonce() > nonce+1 {
pool.addQueue(tx)
} else if tx.Nonce() == nonce+1 {
//push
pool.pushpending(blks, tx)
} else {
//替换
pool.replacePending(blks, tx)
}

}

ok,此时,我们就实现pending,queue,替换的逻辑就可以了

pending

实际上就是把交易 tx 插入到 pending 区中,并更新全局交易列表 pool.txs,同时保持按 GasPrice 高优先顺序。

首先判断你要插入的blks盒子数组是否为空,如果为首次插入,则创建一个新的blk单个盒子,把交易放入,再把这个blk放入blks,更新pending,此时我们的交易列表txs也要更新,最后再排序。

我们在go语言中的排序,是需要实现这三个结构,就可以直接使用了

func (p pendingTxs) Len() int { return len(p) }

func (p pendingTxs) Less(i, j int) bool {
return p[i].GasPrice() < p[j].GasPrice()
}

func (p pendingTxs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

比较在交易列表txs,比较每个盒子第一个交易的gas值大小,然后再排序

blks为空的情况下:

if len(blks) == 0 {
blk := make(DefaultSortedTxs, 0) // 创建一个新的交易块(空切片)
blk = append(blk, tx) // 添加交易 tx 到这个块中
blks = append(blks, blk) // 把这个块添加到当前账户的交易组中
pool.pendings[tx.From()] = blks // 写入 pending map(账户 => 交易块列表)
pool.txs = append(pool.txs, blk) // 全局交易池中也追加该块
sort.Sort(pool.txs) // 重新按 gasPrice 排序

如果已经存在blks了,就拿出该地址的最后一个盒子

  • 先判断是否gas是否合理,合理就直接调用的盒子push
  • 如果不合理,就还是的新建一个blk,逻辑和上面一样
} else {
last := blks[len(blks)-1] // 拿出当前账户最后一个交易块

if last.GasPrice() <= tx.GasPrice() {
last.Push(tx) // 如果 gasPrice 合理,直接推入该块
} else {
blk := make(DefaultSortedTxs, 0) // 否则创建新的交易块
blk = append(blk, tx)
blks = append(blks, blk)
pool.pendings[tx.From()] = blks
pool.txs = append(pool.txs, blk)
sort.Sort(pool.txs)
}
}

queue:把当前交易 tx 放入 当前账户的 Queue 队列 中,并按照 Nonce 升序排序

首先根据我们设计queue map的结构,取交易账户的queue队列,直接把这个交易放入该队列,再按照nonce从小到大排序再更新queue队列

func (pool DefaultPool) addQueue(tx *types.Transaction) {
list := pool.queued[tx.From()]
list = append(list, tx)
//sort
sort.Slice(list, func(i, j int) bool {
return list[i].Nonce() < list[j].Nonce()
})
pool.queued[tx.From()] = list
}

替换是出现在pending时,就是nonce符合pending队列,如果此时pending中有相同的nonce的交易,但是这个交易的gas小于要放入的交易,此时就发生替换

func (pool DefaultPool) replacePending(blks []SortedTxs, tx *types.Transaction) {
for _, blk := range blks {
if blk.Nonce() >= tx.Nonce() {
//替换
blk.Replace(tx)
break
}
}
}

pending 区查找是否已有相同 nonce 的交易,如果有就尝试替换(通常是 gasPrice 更高的新交易),因为是在盒子里替换,所以我们可以在盒子的接口再实现一个替换的逻辑,这样就可以直接可以直接利用

盒子:给刚设计盒子接口的添加一个替换

type SortedTxs interface {
GasPrice() uint64
Push(tx *types.Transaction)
Replace(tx *types.Transaction)
Pop() *types.Transaction
Nonce() uint64
}

然后我们再实现这个Replace的逻辑

历遍这个盒子 ,找到nonce值相同的交易,再比较gas大小,大的就直接发生替换,如果没有找到相同的nonce交易,就直接添加在这个盒子的后面就可以了,

func (sorted DefaultSortedTxs) Replace(tx *types.Transaction) {
for i, t := range sorted {
if t.Nonce() == tx.Nonce() {
if tx.GasPrice() > t.GasPrice() {
sorted[i] = tx
}
return // 不管是否替换,都 return,避免重复添加
}
}
// 没找到,append 并按 nonce 升序排序
sorted = append(sorted, tx)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Nonce() < sorted[j].Nonce()
})
}

再实现剩下的txpool的接口,根据功能依次实现。

func (pool *DefaultPool) Pop() *types.Transaction {
if len(pool.txs) == 0 {
return nil
}
// 取全池中 gasPrice 最高的 DefaultSortedTxs
block := pool.txs[len(pool.txs)-1]
tx := block.Pop()

// 如果 block 为空,移除
if len(*block.(*DefaultSortedTxs)) == 0 {
pool.txs = pool.txs[:len(pool.txs)-1]
}

return tx
}

func (pool *DefaultPool) SetStatRoot(root hash.Hash) {
pool.StatDB.SetRoot(root)
}

func NewDefaultPool(stat statdb.StatDB) *DefaultPool {
return &DefaultPool{
StatDB: stat,
all: make(map[hash.Hash]bool),
txs: make(pendingTxs, 0),
pendings: make(map[types.Address][]SortedTxs),
queued: make(map[types.Address][]*types.Transaction),
}
}

这个交易池就设计好了,基本上就是一个单机版的区块链了。

区块

P2P