geth-Peer로 등록되는 과정

최호준·2022년 10월 7일
0

Peer로 등록되는 과정을 살펴보자

(srv *Server) Start() 메서드에서 시작된다


// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
	// ...
	
	// setupDiscovery에서 로컬 DB에 저장된 seed node를 읽어온다.
	if err := srv.setupDiscovery(); err != nil {
		return err
	}
	srv.setupDialScheduler()
    // ...
}
  • setupDiscovery에서 ListenV4를 호출하고 ListenV4는 newTable을 호출한다.
func (srv *Server) setupDiscovery() error {
    // ...
	if !srv.NoDiscovery {
        // ...
		ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
        // ... 
		srv.ntab = ntab
		// return받은 ntab에는 seed node들이 들어있다.
		// discmix는  newDialScheduler에서 사용된다.
		srv.discmix.AddSource(ntab.RandomNodes())
    }
    return nil
}
  • newTable은 Table struct를 초기화하고 loadSeedNodes를 호출하여 db에 저장된 seed node를 테이블에 저장한다.
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
    //...
	t := &UDPv4{
		conn:            c,
        / /...
		db:              ln.Database(),
        // ...
		log:             cfg.Log,
	}
    // newTable에서 db에 저장된 seedNode를 읽어온다.
	tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
    //...
	t.tab = tab
    //...
	// srv.run에서는 추가된 peers 관리한다.
	go srv.run()
	return t, nil
}
  • 이제 setupDialScheduler()를 살펴보자
  • 위에서 발견한 db에서 발견한 seednode들을 사용한다.
func (srv *Server) setupDialScheduler() {
	config := dialConfig{
		self:           srv.localnode.ID(),
		maxDialPeers:   srv.maxDialedConns(),
		maxActiveDials: srv.MaxPendingPeers,
		log:            srv.Logger,
		netRestrict:    srv.NetRestrict,
		dialer:         srv.Dialer,
		clock:          srv.clock,
	}
	if srv.ntab != nil {
		config.resolver = srv.ntab
	}
	if config.dialer == nil {
		config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
	}
	// srv.discmix에는 setupDiscovery()가 넣어둔 seednode들이 들어있다.
	srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
	// ...
}
  • newDialScheduler는srv.discmix의 seednode들을 이용하여 goroutine 을 실행한다.
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
	d := &dialScheduler{
		dialConfig:  config.withDefaults(),
		setupFunc:   setupFunc,
		dialing:     make(map[enode.ID]*dialTask),
		static:      make(map[enode.ID]*dialTask),
		peers:       make(map[enode.ID]struct{}),
		doneCh:      make(chan *dialTask),
		nodesIn:     make(chan *enode.Node),
		addStaticCh: make(chan *enode.Node),
		remStaticCh: make(chan *enode.Node),
		addPeerCh:   make(chan *conn),
		remPeerCh:   make(chan *conn),
	}
	d.lastStatsLog = d.clock.Now()
	d.ctx, d.cancel = context.WithCancel(context.Background())
	d.wg.Add(2)
	// readNodes는 it에서 seednode를 d.nodesIn 채널에 넣어준다
	go d.readNodes(it)
	// loop는 dialer의 main loop이다.
	go d.loop(it)
	return d
}
  • readNodes에서는 seednode를 nodesIn 채널에 넣어주고, loop에서는 이 채널에서 나온 node를 처리한다.
// loop is the main loop of the dialer.
func (d *dialScheduler) loop(it enode.Iterator) {
    var (
    nodesCh    chan *enode.Node
    historyExp = make(chan struct{}, 1)
    )
    // ...    
loop:
    for {
        // ...
        // !! nodesCh에 nodesIn을  대입 !!
        nodesCh = d.nodesIn
        // ... 
		select {
		case node := <-nodesCh:
		    // d.readNodes 에서 d.nodesIn 채널에서 넣어준 node를 처리한다.  
			if err := d.checkDial(node); err != nil {
				d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
			} else {
				d.startDial(newDialTask(node, dynDialedConn))
			}

		case c := <-d.addPeerCh:
            // 성공적으로 peer를 생성하면 addPeerCh에 들어있는 c를 꺼내온다. 
			// 여기선 따로 peer를 저장하지 않는다.
			// peer 관리는  srv.run()의 peers라는 변수에서 관리한다.
			if c.is(dynDialedConn) || c.is(staticDialedConn) {
				d.dialPeers++
			}
			id := c.node.ID()
			// d.peers에 peer가 설정됨을 나타낸다.
			d.peers[id] = struct{}{}
            // ... 
			// TODO: cancel dials to connected peers
        // ... 
		}
	}
    // ...
	d.wg.Done()
}

startDial을 살펴보자

// startDial runs the given dial task in a separate goroutine.
func (d *dialScheduler) startDial(task *dialTask) {
	d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
	hkey := string(task.dest.ID().Bytes())
	d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
    // dialScheduler.dialing 필드는 active tasks를 의미한다.
	// 여기서 새로 들어온 task를 active tasks에 넣어준다.
	d.dialing[task.dest.ID()] = task
	go func() {
		task.run(d)
		d.doneCh <- task
	}()
}
func (t *dialTask) run(d *dialScheduler) {
	// ...
	// t.dial은 실제 connection을 형성을 시도한다.
	err := t.dial(d, t.dest)
	// ... 
}
  • dialTask.dialer는 인터페이스 타입이고 여기서 구현체는 tcpDialer이다.
  • d.setupFunc는 setupDialScheduler에서 srv.SetupConn으로 설정했다.
// dial performs the actual connection attempt.
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
    // Dial은 (net.Conn, error) 을 return한다.
	fd, err := d.dialer.Dial(d.ctx, t.dest)
    // ... 
	mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
	// 밑에서 setupFunc를 살펴보자
	return d.setupFunc(mfd, t.flags, dest)
}
  • tcpDialer는 다음과 같다.
  • dest와의 connection을 만들고 net.conn을 return.
func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
	return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
}
  • d.setupFunc에 할당된 함수는 다음과 같다.
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
	c := &conn{fd: fd, flags: flags, cont: make(chan error)}
	// ...
	    // srv.newTransport는 인터페이스타입인 transport를 구현한 rlpxTransport를 return한다.
        //    type rlpxTransport struct {
        //        rmu, wmu sync.Mutex
        //        wbuf     bytes.Buffer
        //        conn     *rlpx.Conn 여기에 fd가 들어간다.(실제 형성된 connection)
		//	}
		c.transport = srv.newTransport(fd, dialDest.Pubkey())
    
	err := srv.setupConn(c, flags, dialDest)
	if err != nil {
		c.close(err)
	}
	return err
}
  • srv.setupConn 위에서 만든 connection을 이용해서 Handshake를 수행하는 과정
  • Handshake의 대략적인 과정은 여기서 확인했다.
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
    // ... 
	// Run the RLPx handshake.
	remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
    // ...
		c.node = dialDest
    // ... 

	// Run the capability negotiation handshake. return their port
	// 서로의 protocol을 확인 
	phs, err := c.doProtoHandshake(srv.ourHandshake)
    // ...
	c.caps, c.name = phs.Caps, phs.Name
	// 핸드쉐이크가 정상적으로 완료된경우 srv.checkpointAddPeer 채널에 c를 넣어준다. 
	// srv는 run 에서 이 c를 처리한다.
	err = srv.checkpoint(c, srv.checkpointAddPeer)
    // ...
	return nil
}

추가된 Peer는 func (srv *Server) run() 의 peers 변수에서 관리한다.


// run is the main loop of the server.
func (srv *Server) run() {
        // ...
	    // peer는 여기에 추가된다.
		peers        = make(map[enode.ID]*Peer)
		inboundCount = 0
        // ...

running:
	for {
		select {
        // ... 
		case c := <-srv.checkpointAddPeer:
			// At this point the connection is past the protocol handshake.
			// Its capabilities are known and the remote identity is verified.
			err := srv.addPeerChecks(peers, inboundCount, c)
			// The handshakes are done and it passed all checks.
			//  srv.launchPeer는 Peer를 생성하고, go srv.runPeer(p) 을 수행한다.
			p := srv.launchPeer(c)
			// peers map에 node id로 새로운 peer를 넣어준다.
			peers[c.node.ID()] = p
			srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
            // dialScheduler.addPeerCh에 c를 넣어준다. c는  dialScheduler.loop 함수에서 처리한다. 
			srv.dialsched.peerAdded(c)
			// ...

		// ... 	
		}
	}
    // ...
}

0개의 댓글