Peer로 등록되는 과정을 살펴보자
(srv *Server) Start() 메서드에서 시작된다
func (srv *Server) Start() (err error) {
if err := srv.setupDiscovery(); err != nil {
return err
}
srv.setupDialScheduler()
}
- setupDiscovery에서 ListenV4를 호출하고 ListenV4는 newTable을 호출한다.
func (srv *Server) setupDiscovery() error {
if !srv.NoDiscovery {
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
srv.ntab = ntab
srv.discmix.AddSource(ntab.RandomNodes())
}
return nil
}
- newTable은 Table struct를 초기화하고 loadSeedNodes를 호출하여 db에 저장된 seed node를 테이블에 저장한다.
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
t := &UDPv4{
conn: c,
/ /...
db: ln.Database(),
log: cfg.Log,
}
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
t.tab = tab
go srv.run()
return t, nil
}
- 이제 setupDialScheduler()를 살펴보자
- 위에서 발견한 db에서 발견한 seednode들을 사용한다.
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
dialer: srv.Dialer,
clock: srv.clock,
}
if srv.ntab != nil {
config.resolver = srv.ntab
}
if config.dialer == nil {
config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
}
- newDialScheduler는srv.discmix의 seednode들을 이용하여 goroutine 을 실행한다.
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
d := &dialScheduler{
dialConfig: config.withDefaults(),
setupFunc: setupFunc,
dialing: make(map[enode.ID]*dialTask),
static: make(map[enode.ID]*dialTask),
peers: make(map[enode.ID]struct{}),
doneCh: make(chan *dialTask),
nodesIn: make(chan *enode.Node),
addStaticCh: make(chan *enode.Node),
remStaticCh: make(chan *enode.Node),
addPeerCh: make(chan *conn),
remPeerCh: make(chan *conn),
}
d.lastStatsLog = d.clock.Now()
d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
go d.loop(it)
return d
}
- readNodes에서는 seednode를 nodesIn 채널에 넣어주고, loop에서는 이 채널에서 나온 node를 처리한다.
func (d *dialScheduler) loop(it enode.Iterator) {
var (
nodesCh chan *enode.Node
historyExp = make(chan struct{}, 1)
)
loop:
for {
nodesCh = d.nodesIn
select {
case node := <-nodesCh:
if err := d.checkDial(node); err != nil {
d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
} else {
d.startDial(newDialTask(node, dynDialedConn))
}
case c := <-d.addPeerCh:
if c.is(dynDialedConn) || c.is(staticDialedConn) {
d.dialPeers++
}
id := c.node.ID()
d.peers[id] = struct{}{}
}
}
d.wg.Done()
}
startDial을 살펴보자
func (d *dialScheduler) startDial(task *dialTask) {
d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
hkey := string(task.dest.ID().Bytes())
d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
d.dialing[task.dest.ID()] = task
go func() {
task.run(d)
d.doneCh <- task
}()
}
func (t *dialTask) run(d *dialScheduler) {
err := t.dial(d, t.dest)
}
- dialTask.dialer는 인터페이스 타입이고 여기서 구현체는 tcpDialer이다.
- d.setupFunc는 setupDialScheduler에서 srv.SetupConn으로 설정했다.
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
fd, err := d.dialer.Dial(d.ctx, t.dest)
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
return d.setupFunc(mfd, t.flags, dest)
}
- tcpDialer는 다음과 같다.
- dest와의 connection을 만들고 net.conn을 return.
func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
}
- d.setupFunc에 할당된 함수는 다음과 같다.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{fd: fd, flags: flags, cont: make(chan error)}
c.transport = srv.newTransport(fd, dialDest.Pubkey())
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
- srv.setupConn 위에서 만든 connection을 이용해서 Handshake를 수행하는 과정
- Handshake의 대략적인 과정은 여기서 확인했다.
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
c.node = dialDest
phs, err := c.doProtoHandshake(srv.ourHandshake)
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.checkpointAddPeer)
return nil
}
추가된 Peer는 func (srv *Server) run() 의 peers 변수에서 관리한다.
func (srv *Server) run() {
peers = make(map[enode.ID]*Peer)
inboundCount = 0
running:
for {
select {
case c := <-srv.checkpointAddPeer:
err := srv.addPeerChecks(peers, inboundCount, c)
p := srv.launchPeer(c)
peers[c.node.ID()] = p
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c)
}
}
}