1.经过http.Server等一系列调用后,最终会进入rpc/http.go里ServeHttp方法,最后会调用ServeSingleRequest方法
srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)
2. rpc/server.go里的ServeSingleRequest方法
func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {
s.serveRequest(ctx, codec, true, options)
}
3.rpc/server.go里的serveRequest方法
这个方法将会实现解码用户发来的json字符串,得到方法名和参数列表,然后调用相应的方法执行。
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
// If a single shot request is executing, run and return immediately
if singleShot {
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
return nil
}
// For multi-shot connections, start a goroutine to serve and loop back
pend.Add(1)
go func(reqs []*serverRequest, batch bool) {
defer pend.Done()
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
}(reqs, batch)
}
关键方法有三个:readRequest,execBatch和exec方法
3.1 readRequest方法,主要实现了解码请求一条请求
3.1.1 ReadRequestHeaders
reqs, batch, err := codec.ReadRequestHeaders()
rpc/json.go里的ReadRequestHeaders方法实现了根据传进来的method参数,解析出命名空间和对应的方法。
return parseRequest(incomingMsg)
rpc/json.go里的parseRequest方法的内容:
elems := strings.Split(in.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, false, &methodNotFoundError{in.Method, ""}
}
// regular RPC call
if len(in.Payload) == 0 {
return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id}}, false, nil
}
return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id, params: in.Payload}}, false, nil
这部分代码实现分割方法名,比如eth_getTransaction ,eth为namespace, getTransaction为方法
3.1.2 ParseRequestArguments方法实现解释参数列表
if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 {
if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
requests[i].args = args
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
}
continue
}
3.2 execBatch方法,实现批处理执行方法
3.3 exec方法,执行单个调用
// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{}
var callback func()
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response, callback = s.handle(ctx, codec, req)
}
if err := codec.Write(response); err != nil {
log.Error(fmt.Sprintf("%v\n", err))
codec.Close()
}
// when request was a subscribe request this allows these subscriptions to be actived
if callback != nil {
callback()
}
}
最核心的是调用server的handle方法,主要实现了根据函数名和参数列表,调用该函数。核心代码为:
arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx {
arguments = append(arguments, reflect.ValueOf(ctx))
}
if len(req.args) > 0 {
arguments = append(arguments, req.args...)
// execute RPC method and return result
reply := req.callb.method.Func.Call(arguments)
4.假设调用的是eth_sendTransaction方法,将会进入api/api.go里的sendTransaction方法的逻辑。
api.go定义了所有的rpc方法,api/backend.go里的GetAPIs方法把这些rpc方法封装成4大类service,
分别是命名空间为eth的NewPublicTransactionPoolAPI,NewPublicBlockChainAPI,NewPublicAccountAPI
和命名空间为personal的NewPrivateAccountAPI。
eth的NewPublicTransactionPoolAPI 的SendTransaction方法不需要密码,
而personal的NewPrivateAccountAPI的SendTransaction方法需要密码去解密key。
SendTransaction方法最终会实现把交易插入txpoll的pending队列
5. api/api.go里的命名空间为eth的NewPublicTransactionPoolAPI的SendTransaction方法分析:
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
return submitTransaction(ctx, s.b, signed)
}
5.1 api.go里的submitTransaction方法的核心内容
func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
5.2 node/api_backend.go里的SendTx方法
func (b *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.kkchain.TxPool().AddLocal(signedTx)
}
5.3 core/tx_poll.go里的AddLocal方法
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
5.4 core/tx_poll.go里的addTx方法
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error
// Try to inject the transaction and update any state
replace, err := pool.add(tx, local)
// If we added a new transaction, run promotion checks and return
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables([]common.Address{from})
}
return nil
}
addTx方法里核心方法有两个一个是pool.add 方法,一个pool.promoteExecutables方法。add方法如果是替换pending队列里的tx的情况,则会直接替换pengding里的tx。
而 pool.promoteExecutables实现了把普通队列的tx加入到txpoll的pending队列去。
tx_poll.go的add方法:
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.WithFields(log.Fields{
"hash": hash.String(),
"err": err,
}).Error("Discarding invalid transaction")
return false, err
}
// New transaction isn't replacing a pending one, push into queue
replace, err := pool.enqueueTx(hash, tx)
其中validateTx方法会做签名和是否超过gaslimit的检查
5.5 core/tx_poll.go里的pool.promoteExecutables方法,这个方法每次执行都会按顺序尝试删掉以下的逻辑:
删除时间太久的tx,删除账户余额不足或者超过gas的tx,把剩下的tx加入到pending列表。
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Debugf("Promoting queued transaction,hash: %s", hash.String())
promoted = append(promoted, tx)
}
}
}
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
如果pending的队列大于0,发送tx事件到txFeed中。
5.6 pool.promoteTx 方法实现把tx加入到txpoll的pending队列去。
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.priced.Removed()
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
}
// Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
return true
调用core/tx_list.go的Add方法时,如果当前收集的tx在pending队列已经存在,并且老的tx比新的tx的gasprice更大,则新的tx不会再插入到pending列表中去。
6.txpoll里的tx的处理逻辑:
插入区块分两种情况:
情况1:挖矿的全节点,从txpoll中取出tx进行打包。
情况2:收到别的成功挖矿区块,验证通过后,插入区块
这里讲的是第一种
6.1 在node/node.go的New方法里
node.txPool = core.NewTxPool(txConfig, chainConfig, node.blockchain)
node.miner = miner.New(chainConfig, node.blockchain, node.txPool, node.engine)
其中core/tx_poll.go里的NewTxPool方法最终会调用 pool.loop方法
go pool.loop()
其中miner/miner.go里的miner.New方法构建worker实例和会调用miner.onEvent方法
miner := &Miner{
quitCh: make(chan struct{}),
syncStartCh: make(chan core.StartEvent),
syncDoneCh: make(chan core.DoneEvent),
worker: newWorker(config, bc, txpool, engine),
chain: bc,
syncDone: 1,
}
go miner.onEvent()
其中miner/worker.go 里的newWorker方法会订阅txs和chain head事件,并且开启三个协程。
// Subscribe events from tx pool
w.txsSub = txpool.SubscribeNewTxsEvent(w.txsCh)
// Subscribe events from inbound handler
w.chainHeadSub = bc.SubscribeChainHeadEvent(w.chainHeadCh)
go w.mineLoop()
go w.taskLoop()
go w.waitResult()
// Submit first work to initialize pending state.
w.startCh <- struct{}{}
return w
miner/worker.go里的mineLoop方法的内容为:
func (w *worker) mineLoop() {
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
for {
select {
case txs := <-w.txsCh:
for _, tx := range txs.Txs {
fmt.Println(tx)
}
case <-w.chainHeadCh:
if w.isRunning() {
w.commitTask()
}
case <-w.startCh:
w.commitTask()
case <-w.quitCh:
//Stopped Mining
return
}
}
}
到这里,就能把rpc发送的sendTransaction请求和txpoll连起来了。因为api/api.go里的SendTransaction方法,如果没问题的话,最终会调用core/tx_poll.go里的pool.promoteExecutables方法,然后在方法里会执行
go pool.txFeed.Send(NewTxsEvent{promoted}。当发送Txs的事件消息给txFeed时,所有订阅了txs事件消息的channel都能收到。这时就会触发mineLoop方法里的case txs:=<-w.txsCh,但是为什么这里只是打印出来而已呢?
因为core/tx_poll.go里的pool.promoteExecutables方法里会调用pool.promoteTx(addr, hash, tx)时,已经把tx加入到pending队列。后面只需要调用commitTask,就能实现把txpoll中的pending队列里的tx进行打包。
6.2 commitTask分析:
在最初的启动时候会调用一次w.commitTask(),commitTask会从txpoll中取出pending队列的tx打包成区块,最终会把打包的区块写入w.taskCh
func (w *worker) commitTask() {
//get txs from pending pool
pending, count, _ := w.txpool.Pending()
txs := make(types.Transactions, 0, count)
for _, acctxs := range pending {
for _, tx := range acctxs {
txs = append(txs, tx)
}
}
if len(txs) > 0 {
//apply txs and get block
if w.commitTransactions(txs, w.miner) == false {
return
}
}
// Deep copy receipts here to avoid interaction between different tasks.
receipts := make([]*types.Receipt, len(w.currentCtx.receipts))
for i, l := range w.currentCtx.receipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *l
}
block := types.NewBlock(header, w.currentCtx.txs, receipts)
w.taskCh <- &task{block: block, state: s, receipts: receipts, createdAt: time.Now()}
而在miner/worker.go里的taskLoop方法中,如果<-w.taskCh有新的内容,会调用go w.seal(task, stopCh)
func (w *worker) taskLoop() {
var (
stopCh chan struct{}
)
// interrupt aborts the in-flight sealing task.
interrupt := func() {
if stopCh != nil {
close(stopCh)
stopCh = nil
}
}
for {
select {
case task := <-w.taskCh:
// Reject duplicate sealing work due to resubmitting.
interrupt()
stopCh = make(chan struct{})
go w.seal(task, stopCh)
case <-w.quitCh:
interrupt()
return
}
}
}
worker.go里的w.seal方法会执行共识算法的逻辑,执行完毕后,最后会把结果写入w.resultCh
func (w *worker) seal(t *task, stop <-chan struct{}) {
var (
err error
res *task
)
if t.block, err = w.engine.Execute(w.chain, t.block, stop); t.block != nil {
//log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(),
// "elapsed", time.Since(t.createdAt))
res = t
} else {
if err != nil {
log.Errorf("Block sealing failed,err: %v", err)
}
res = nil
}
select {
case w.resultCh <- res:
case <-w.quitCh:
}
}
而在waitResult的协程中,如果w.resultCh有内容,表示挖矿成功,会调用blockchain的writeBlockWithState方法写入区块链,并且发送txs和chainhead这两种事件消息。
func (w *worker) waitResult() {
for {
select {
case result := <-w.resultCh:
err := w.chain.WriteBlockWithState(block, result.receipts, result.state)
events = append(events, core.ChainHeadEvent{Block: block})
events = append(events, core.NewMinedBlockEvent{Block: block})
w.chain.PostChainEvents(events, logs)
w.engine.PostExecute(w.chain, block)
}
}
到这时,因为发送了chainHead事件,miner/worker.go里的mineLoop方法里w.chainHeadCh将会收到新的内容,从而又重新执行 w.commitTask()。
这样从rpc发送请求到txpoll再到挖矿,挖矿成功,又重新执行commitTask,commitTask将会取出txpoll的pending队列里的所有tx来打包区块,打包成功后,将又进行挖矿,如果挖矿陈工,又会重新执行committask。如此循环。
Pending队列里其实就是下一个可以打包成区块的所有tx。pending的存储格式为pengding[address][Transactions], 之所以这样存储,一个节点可以开多个挖矿账户。
1.发送rpc请求,如果请求的方法名为eth_sendTransaction,进入api.go里的SendTransaction方法逻辑,该方法最终会完成两个事情:
1.1 把tx加入txpoll的pending队列
1.2 发送txs事件消息
2. 当挖矿成功后,worker.go里的w.seal方法会把结果写入w.resultCh中。waitReuslt协程收到会触发发送chainHead事件。
3. worker.go里的mineLoop方法收到w.chainHeadCh里的chainHead事件,会执行commitTask方法。
4. commitTask方法会从txpoll中取中pending[miner address]列表里的所有tx(注意这里的pending列表将会包含第一步收到的rpc交易请求).然后调用evm执行这些tx,得到receipt收据,把这些txs,receipt和区块头打包成区块,最后把区块发送给w.taskCh.
5. worker.go里的taskLoop方法收到w.taskCh里的消息事件,会调用w.seal方法开始挖矿。如果挖矿成功,于是又跳回第二步。